File: //usr/share/filebeat/module/zookeeper/audit/ingest/pipeline.yml
description: Pipeline for parsing ZooKeeper audit messages
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- rename:
field: message
target_field: event.original
ignore_missing: true
- grok:
field: event.original
patterns:
- '%{TIMESTAMP_ISO8601:zookeeper.audit.timestamp}%{SPACE}%{LOGLEVEL:log.level}%{SPACE}%{CALLER_CLASS:log.logger}:%{SPACE}%{GREEDYDATA:message}'
pattern_definitions:
CALLER_CLASS: (%{JAVACLASS}|%{NOTSPACE})
- set:
copy_from: '@timestamp'
field: event.created
if: ctx?.zookeeper?.audit?.timestamp != null
- date:
if: ctx?.zookeeper?.audit?.timestamp != null && ctx.event.timezone == null
field: zookeeper.audit.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- date:
if: ctx?.zookeeper?.audit?.timestamp != null && ctx.event.timezone != null
field: zookeeper.audit.timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd HH:mm:ss,SSS
timezone: '{{ event.timezone }}'
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- kv:
field: message
field_split: "\\s+"
value_split: "="
target_field: zookeeper.audit
- remove:
field:
- message
- zookeeper.audit.timestamp
ignore_missing: true
- rename:
field: zookeeper.audit.operation
target_field: event.action
ignore_missing: true
- set:
field: event.outcome
value: "{{zookeeper.audit.result}}"
ignore_empty_value: true
if: '["success","failure"].contains(ctx.zookeeper?.audit?.result)'
- rename:
field: zookeeper.audit.ip
target_field: client.address
ignore_missing: true
- convert:
field: client.address
target_field: client.ip
type: ip
ignore_missing: true
- geoip:
field: client.ip
target_field: client.geo
if: ctx?.client?.ip != null
- geoip:
database_file: GeoLite2-ASN.mmdb
field: client.ip
target_field: client.as
properties:
- asn
- organization_name
ignore_missing: true
if: ctx?.client?.ip != null
- set:
field: user.id
value: "{{zookeeper.audit.user}}"
ignore_empty_value: true
- split:
field: zookeeper.audit.user
separator: ","
ignore_missing: true
- append:
field: related.user
value: '{{user.id}}'
if: ctx?.user?.id != null
- append:
field: related.ip
value: '{{client.ip}}'
if: ctx?.client?.ip != null
on_failure:
- set:
field: error.log
value: '{{ _ingest.on_failure_message }}'