> logstash

Configure Logstash for log ingestion, parsing, transformation, and output to Elasticsearch and other destinations. Use when a user needs to build log processing pipelines, write Grok patterns, parse unstructured logs, enrich events, or set up multi-pipeline Logstash deployments.

fetch
$curl "https://skillshub.wtf/TerminalSkills/skills/logstash?format=md"
SKILL.mdlogstash

Logstash

Overview

Build Logstash pipelines to ingest, parse, transform, and route log data. Covers Grok pattern writing, multi-pipeline configuration, input/output plugins, and performance tuning for production deployments.

Instructions

Task A: Basic Pipeline Configuration

# /etc/logstash/conf.d/main.conf — Basic log processing pipeline
input {
  beats {
    port => 5044
    ssl_enabled => false
  }

  tcp {
    port => 5000
    codec => json_lines
    tags => ["tcp-input"]
  }
}

filter {
  if [fields][type] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"'
      }
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    geoip {
      source => "client_ip"
      target => "geo"
    }
    useragent {
      source => "user_agent"
      target => "ua"
    }
    mutate {
      remove_field => ["message", "timestamp", "user_agent"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[fields][type]}-%{+YYYY.MM.dd}"
    manage_template => true
    template_overwrite => true
  }
}

Task B: Advanced Grok Patterns

# /etc/logstash/conf.d/app-logs.conf — Parse application log formats
filter {
  if [fields][type] == "app" {
    # Parse structured JSON logs
    if [message] =~ /^\{/ {
      json {
        source => "message"
        target => "app"
      }
    } else {
      # Parse custom app log format: 2026-02-19 12:00:00.123 [INFO] [req-abc123] OrderService - Order created
      grok {
        match => {
          "message" => "^%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:request_id}\] %{DATA:class} - %{GREEDYDATA:log_message}"
        }
      }
    }

    # Enrich with severity mapping
    translate {
      field => "level"
      destination => "severity_number"
      dictionary => {
        "TRACE" => "1"
        "DEBUG" => "5"
        "INFO" => "9"
        "WARN" => "13"
        "ERROR" => "17"
        "FATAL" => "21"
      }
    }

    # Extract duration from log messages like "processed in 245ms"
    grok {
      match => { "log_message" => "processed in %{NUMBER:duration_ms:float}ms" }
      tag_on_failure => []
    }

    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601"]
      target => "@timestamp"
    }
  }
}
# /etc/logstash/patterns/custom — Custom Grok pattern definitions
JAVA_STACKTRACE (?:(?:\s+at\s+[\w.$]+\([^)]*\)\n?)+)
POSTGRES_LOG %{TIMESTAMP_ISO8601:timestamp} %{WORD:timezone} \[%{INT:pid}\] %{WORD:user}@%{WORD:database} %{LOGLEVEL:level}:  %{GREEDYDATA:message}
SPRING_LOG %{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:level}\s+%{INT:pid}\s+---\s+\[%{DATA:thread}\]\s+%{DATA:logger}\s+:\s+%{GREEDYDATA:message}

Task C: Multi-Pipeline Configuration

# /etc/logstash/pipelines.yml — Run multiple independent pipelines
- pipeline.id: nginx-pipeline
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2
  pipeline.batch.size: 250

- pipeline.id: app-pipeline
  path.config: "/etc/logstash/conf.d/app-logs.conf"
  pipeline.workers: 4
  pipeline.batch.size: 500

- pipeline.id: audit-pipeline
  path.config: "/etc/logstash/conf.d/audit.conf"
  pipeline.workers: 1
  queue.type: persisted
  queue.max_bytes: 4gb
# /etc/logstash/conf.d/audit.conf — Audit log pipeline with dead letter queue
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["audit-events"]
    group_id => "logstash-audit"
    codec => json
    consumer_threads => 3
  }
}

filter {
  fingerprint {
    source => ["user_id", "action", "@timestamp"]
    target => "event_fingerprint"
    method => "SHA256"
  }
  mutate {
    add_field => {
      "[@metadata][index]" => "audit-%{+YYYY.MM}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][index]}"
    document_id => "%{event_fingerprint}"
    action => "create"
  }
}

Task D: Docker Deployment

# docker-compose.yml — Logstash with custom config and patterns
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    environment:
      - LS_JAVA_OPTS=-Xms1g -Xmx1g
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    volumes:
      - ./logstash/conf.d:/etc/logstash/conf.d
      - ./logstash/patterns:/etc/logstash/patterns
      - ./logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml
    ports:
      - "5044:5044"
      - "5000:5000"
      - "9600:9600"   # Monitoring API

Task E: Performance Monitoring

# Check Logstash pipeline stats
curl -s "http://localhost:9600/_node/stats/pipelines" | \
  jq '.pipelines | to_entries[] | {
    pipeline: .key,
    events_in: .value.events.in,
    events_out: .value.events.out,
    events_filtered: .value.events.filtered,
    queue_size: .value.queue.events_count
  }'
# Check hot threads for performance issues
curl -s "http://localhost:9600/_node/hot_threads?human=true"

Best Practices

  • Use persisted queues (queue.type: persisted) for pipelines processing critical data
  • Test Grok patterns with grokdebugger in Kibana before deploying
  • Use tag_on_failure => [] for optional Grok matches to avoid _grokparsefailure tags
  • Separate pipelines by data source to isolate failures and tune workers independently
  • Set pipeline.batch.size higher (500-1000) for throughput, lower (125) for latency
  • Use [@metadata] fields for routing logic — they are not sent to outputs

> related_skills --same-repo

> zustand

You are an expert in Zustand, the small, fast, and scalable state management library for React. You help developers manage global state without boilerplate using Zustand's hook-based stores, selectors for performance, middleware (persist, devtools, immer), computed values, and async actions — replacing Redux complexity with a simple, un-opinionated API in under 1KB.

> zoho

Integrate and automate Zoho products. Use when a user asks to work with Zoho CRM, Zoho Books, Zoho Desk, Zoho Projects, Zoho Mail, or Zoho Creator, build custom integrations via Zoho APIs, automate workflows with Deluge scripting, sync data between Zoho apps and external systems, manage leads and deals, automate invoicing, build custom Zoho Creator apps, set up webhooks, or manage Zoho organization settings. Covers Zoho CRM, Books, Desk, Projects, Creator, and cross-product integrations.

> zod

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

> zipkin

Deploy and configure Zipkin for distributed tracing and request flow visualization. Use when a user needs to set up trace collection, instrument Java/Spring or other services with Zipkin, analyze service dependencies, or configure storage backends for trace data.

┌ stats

installs/wk0
░░░░░░░░░░
github stars17
███░░░░░░░
first seenMar 17, 2026
└────────────

┌ repo

TerminalSkills/skills
by TerminalSkills
└────────────

┌ tags

└────────────