File: //usr/share/filebeat/module/elasticsearch/audit/ingest/pipeline-plaintext.yml
description: Pipeline for parsing elasticsearch audit logs in plaintext format
processors:
- grok:
field: message
pattern_definitions:
ES_TIMESTAMP: \[%{TIMESTAMP_ISO8601:elasticsearch.audit.@timestamp}\]
ES_NODE_NAME: (\[%{DATA:elasticsearch.node.name}\])?
ES_AUDIT_LAYER: \[%{WORD:elasticsearch.audit.layer}\]
ES_AUDIT_EVENT_TYPE: \[%{WORD:elasticsearch.audit.event_type}\]
ES_AUDIT_ORIGIN_TYPE: (origin_type\=\[%{WORD:elasticsearch.audit.origin.type}\])?
ES_AUDIT_ORIGIN_ADDRESS: (origin_address\=\[%{IPORHOST:source.ip}\])?
ES_AUDIT_PRINCIPAL: (principal\=\[%{DATA:user.name}\])?
ES_AUDIT_REALM: (realm\=\[%{WORD:elasticsearch.audit.realm}\])?
ES_AUDIT_ROLES: (roles\=\[%{DATA:elasticsearch.audit.user.roles}\])?
ES_AUDIT_ACTION: (action\=\[%{DATA:elasticsearch.audit.action}(\[%{DATA:elasticsearch.audit.sub_action}\])?\])?
ES_AUDIT_URI: (uri=\[%{DATA:url.original}\])?
ES_AUDIT_URI_PARAMS: (params=\[%{DATA:elasticsearch.audit.url.params}\])?
ES_AUDIT_INDICES: (indices\=\[%{DATA:elasticsearch.audit.indices}\])?
ES_AUDIT_REQUEST: (request\=\[%{WORD:elasticsearch.audit.request.name}\])?
ES_AUDIT_REQUEST_BODY: (request_body\=\[%{DATA:http.request.body.content}\])?
patterns:
- '%{ES_TIMESTAMP}\s*%{ES_NODE_NAME}\s*%{ES_AUDIT_LAYER}\s*%{ES_AUDIT_EVENT_TYPE}\s*%{ES_AUDIT_ORIGIN_TYPE},?\s*%{ES_AUDIT_ORIGIN_ADDRESS},?\s*%{ES_AUDIT_PRINCIPAL},?\s*%{ES_AUDIT_REALM},?\s*%{ES_AUDIT_ROLES},?\s*%{ES_AUDIT_ACTION},?\s*%{ES_AUDIT_INDICES},?\s*%{ES_AUDIT_URI},?\s*%{ES_AUDIT_URI_PARAMS},?\s*%{ES_AUDIT_REQUEST},?\s*%{ES_AUDIT_REQUEST_BODY}$'
- split:
field: elasticsearch.audit.user.roles
separator: ','
ignore_missing: true
- split:
field: elasticsearch.audit.indices
separator: ','
ignore_missing: true
- script:
lang: painless
source: if (ctx.elasticsearch.audit.sub_action != null) { ctx.elasticsearch.audit.action
+= '[' + ctx.elasticsearch.audit.sub_action + ']' }
- remove:
field: elasticsearch.audit.sub_action
ignore_missing: true
- date:
if: ctx.event.timezone == null
field: elasticsearch.audit.@timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd'T'HH:mm:ss,SSS
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
- date:
if: ctx.event.timezone != null
field: elasticsearch.audit.@timestamp
target_field: '@timestamp'
formats:
- yyyy-MM-dd'T'HH:mm:ss,SSS
timezone: '{{ event.timezone }}'
on_failure:
- append:
field: error.message
value: '{{ _ingest.on_failure_message }}'
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'