File: //usr/share/filebeat/module/zeek/intel/ingest/pipeline.yml
---
description: Pipeline for normalizing Zeek intel.log.
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
field: event.created
value: '{{@timestamp}}'
- date:
field: zeek.intel.ts
formats:
- UNIX
- ISO8601
- remove:
field: zeek.intel.ts
# IP Geolocation Lookup
- geoip:
if: ctx.source?.geo == null
field: source.ip
target_field: source.geo
ignore_missing: true
properties:
- city_name
- continent_name
- country_iso_code
- country_name
- location
- region_iso_code
- region_name
- geoip:
if: ctx.destination?.geo == null
field: destination.ip
target_field: destination.geo
ignore_missing: true
properties:
- city_name
- continent_name
- country_iso_code
- country_name
- location
- region_iso_code
- region_name
# IP Autonomous System (AS) Lookup
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: destination.ip
target_field: destination.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- rename:
field: destination.as.asn
target_field: destination.as.number
ignore_missing: true
- rename:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
- append:
field: "related.ip"
value: "{{source.ip}}"
if: "ctx?.source?.ip != null"
- append:
field: "related.ip"
value: "{{destination.ip}}"
if: "ctx?.destination?.ip != null"
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"