File: //usr/share/filebeat/module/elasticsearch/audit/ingest/pipeline.yml
description: Pipeline for parsing elasticsearch audit logs
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
copy_from: "@timestamp"
field: event.created
- grok:
field: message
patterns:
- ^%{CHAR:first_char}
pattern_definitions:
CHAR: .
- pipeline:
if: ctx.first_char != '{'
name: '{< IngestPipeline "pipeline-plaintext" >}'
- pipeline:
if: ctx.first_char == '{'
name: '{< IngestPipeline "pipeline-json" >}'
- set:
field: event.kind
value: event
- set:
field: event.category
value: database
- set:
if: "ctx?.elasticsearch?.audit?.event_type != null"
field: event.type
value: access
- script:
lang: painless
source: >-
def successEvents = ['authentication_success', 'access_granted', 'run_as_granted', 'connection_granted'];
if (ctx?.elasticsearch?.audit?.event_type != null && successEvents.contains(ctx.elasticsearch.audit.event_type)) {
ctx.event.outcome = 'success';
} else {
ctx.event.outcome = 'failure';
}
if (ctx?.event.action != null && successEvents.contains(ctx.event.action)) {
ctx.event.outcome = 'success';
} else {
ctx.event.outcome = 'failure';
}
- set:
field: host.id
value: "{{elasticsearch.node.id}}"
ignore_empty_value: true
- set:
field: host.name
value: "{{elasticsearch.node.name}}"
ignore_empty_value: true
- append:
field: related.user
value: "{{user.name}}"
if: "ctx?.user?.name != null"
- append:
field: related.user
value: "{{user.effective.name}}"
if: "ctx?.user?.effective?.name != null"
- remove:
field: elasticsearch.audit.@timestamp
- remove:
field:
- first_char
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'