File: //usr/share/filebeat/module/elasticsearch/slowlog/ingest/pipeline.yml
description: Pipeline for parsing elasticsearch slow logs.
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
copy_from: "@timestamp"
field: event.created
- grok:
field: message
patterns:
- ^%{CHAR:first_char}
pattern_definitions:
CHAR: .
- pipeline:
if: ctx.first_char != '{'
name: '{< IngestPipeline "pipeline-plaintext" >}'
- pipeline:
if: ctx.first_char == '{'
name: '{< IngestPipeline "pipeline-json" >}'
- remove:
field:
- elasticsearch.slowlog.timestamp
- elasticsearch.server.@timestamp
ignore_missing: true
- script:
lang: painless
source: ctx.event.duration = Math.round(ctx.elasticsearch.slowlog.duration * params.scale)
params:
scale: 1000000
if: ctx.elasticsearch.slowlog?.duration != null
- remove:
field: elasticsearch.slowlog.duration
ignore_missing: true
- set:
field: event.kind
value: event
- set:
field: event.category
value: database
- script:
lang: painless
source: >-
def errorLevels = ['FATAL', 'ERROR'];
if (ctx?.log?.level != null) {
if (errorLevels.contains(ctx.log.level)) {
ctx.event.type = 'error';
} else {
ctx.event.type = 'info';
}
}
- set:
field: host.name
value: "{{elasticsearch.node.name}}"
ignore_empty_value: true
- set:
field: host.id
value: "{{elasticsearch.node.id}}"
ignore_empty_value: true
- remove:
field:
- first_char
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'