File: //usr/share/filebeat/module/zeek/mysql/ingest/pipeline.yml
description: Pipeline for normalizing Zeek mysql.log
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
field: event.created
value: '{{@timestamp}}'
- date:
field: zeek.mysql.ts
formats:
- UNIX
- ISO8601
- remove:
field: zeek.mysql.ts
- append:
field: related.ip
value: "{{source.ip}}"
if: "ctx?.source?.ip != null"
- append:
field: related.ip
value: "{{destination.ip}}"
if: "ctx?.destination?.ip != null"
- geoip:
field: destination.ip
target_field: destination.geo
ignore_missing: true
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: destination.ip
target_field: destination.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- rename:
field: destination.as.asn
target_field: destination.as.number
ignore_missing: true
- rename:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
- append:
field: event.type
value: access
if: "ctx?.zeek?.mysql?.cmd != null && (ctx.zeek.mysql.cmd == 'connect' || ctx.zeek.mysql.cmd == 'connect_out')"
- append:
field: event.type
value: change
if: "ctx?.zeek?.mysql?.cmd != null && (ctx.zeek.mysql.cmd == 'init_db' || ctx.zeek.mysql.cmd == 'change_user' || ctx.zeek.mysql.cmd == 'set_option' || ctx.zeek.mysql.cmd == 'drop_db' || ctx.zeek.mysql.cmd == 'create_db' || ctx.zeek.mysql.cmd == 'process_kill' || ctx.zeek.mysql.cmd == 'delayed_insert')"
- append:
field: event.type
value: info
if: "ctx?.zeek?.mysql?.cmd != null && ctx.zeek.mysql.cmd != 'init_db' && ctx.zeek.mysql.cmd != 'change_user' && ctx.zeek.mysql.cmd != 'set_option' && ctx.zeek.mysql.cmd != 'drop_db' && ctx.zeek.mysql.cmd != 'create_db' && ctx.zeek.mysql.cmd != 'process_kill' && ctx.zeek.mysql.cmd != 'delayed_insert' && ctx.zeek.mysql.cmd != 'connect' && ctx.zeek.mysql.cmd != 'connect_out'"
- append:
field: event.type
value: start
if: "ctx?.zeek?.mysql?.cmd != null && ctx.zeek.mysql.cmd == 'connect'"
- append:
field: event.type
value: end
if: "ctx?.zeek?.mysql?.cmd != null && ctx.zeek.mysql.cmd == 'connect_out'"
- append:
field: event.category
value: session
if: "ctx?.zeek?.mysql?.cmd != null && (ctx.zeek.mysql.cmd == 'connect' || ctx.zeek.mysql.cmd == 'connect_out')"
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'