File: //usr/share/filebeat/module/netflow/log/ingest/pipeline.yml
---
description: Pipeline for Filebeat NetFlow
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
# IP Geolocation Lookup
- geoip:
if: ctx.source?.geo == null
field: source.ip
target_field: source.geo
ignore_missing: true
- geoip:
if: ctx.destination?.geo == null
field: destination.ip
target_field: destination.geo
ignore_missing: true
# IP Autonomous System (AS) Lookup
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: destination.ip
target_field: destination.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- rename:
field: destination.as.asn
target_field: destination.as.number
ignore_missing: true
- rename:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
- set:
field: network.direction
value: inbound
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: outbound
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: internal
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: external
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: unknown
if: 'ctx?.network?.direction == null'
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"