File: //usr/share/filebeat/module/kafka/log/ingest/pipeline.yml
description: Pipeline for parsing Kafka log messages
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- grok:
field: message
trace_match: true
patterns:
- (?m)%{TIMESTAMP_ISO8601:kafka.log.timestamp}. %{LOGLEVEL:log.level} +%{JAVALOGMESSAGE:message}
\(%{JAVACLASS:kafka.log.class}\)$[ \n]*(?'kafka.log.trace.full'.*)
- (?m)\[%{TIMESTAMP_ISO8601:kafka.log.timestamp}\] \[%{LOGLEVEL:log.level} ?\] \[%{NOTSPACE:kafka.log.thread}\] \[%{NOTSPACE:kafka.log.class}\] \- %{GREEDYDATA:message}
- grok:
field: message
pattern_definitions:
KAFKA_COMPONENT: '[^\]]*'
patterns:
- \[%{KAFKA_COMPONENT:kafka.log.component}\][,:.]? +%{JAVALOGMESSAGE:message}
on_failure:
- set:
field: kafka.log.component
value: unknown
- grok:
field: kafka.log.trace.full
ignore_missing: true
patterns:
- '%{JAVACLASS:kafka.log.trace.class}:\s*%{JAVALOGMESSAGE:kafka.log.trace.message}'
on_failure:
- remove:
field: kafka.log.trace
- remove:
field: kafka.log.trace.full
ignore_missing: true
- set:
copy_from: '@timestamp'
field: event.created
- date:
if: ctx.event.timezone == null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- date:
if: ctx.event.timezone != null
field: kafka.log.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
timezone: '{{ event.timezone }}'
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- remove:
field: kafka.log.timestamp
- set:
field: event.kind
value: event
- script:
lang: painless
source: >-
def errorLevels = ["ERROR", "FATAL"];
if (ctx?.log?.level != null) {
if (errorLevels.contains(ctx.log.level)) {
ctx.event.type = "error";
} else {
ctx.event.type = "info";
}
}
on_failure:
- set:
field: error.log
value: '{{ _ingest.on_failure_message }}'