File: //usr/share/filebeat/module/panw/panos/ingest/pipeline.yml
description: "Pipeline for Palo Alto Networks PAN-OS Logs"
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
# # warn if event.original has already been set. this is most likely due to logstash ecs_compatibiliy setting.
- append:
if: ctx.event?.original != null
field: error.message
value: 'event.original is set before start of ingest pipeline'
# keep message as event.original if it has not already been set.
- rename:
field: message
target_field: event.original
ignore_failure: true
# Get the timezone from the IETF header if present. Otherwise the timezone
# value added by the add_locale processor will be used.
- grok:
field: _temp_.ietf_header
patterns:
- '%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE:event.timezone}?'
ignore_failure: true
# Set @timestamp to the time when the entry was generated at the data plane.
- date:
if: "ctx.event.timezone == null"
field: "_temp_.generated_time"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
- date:
if: "ctx.event.timezone != null"
field: "_temp_.generated_time"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
timezone: "{{ event.timezone }}"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
# event.created is the time the event was received at the management plane.
- date:
if: "ctx.event.timezone == null && ctx.event.created != null "
field: "event.created"
target_field: "event.created"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
- date:
if: "ctx.event.timezone != null && ctx.event.created != null "
field: "event.created"
target_field: "event.created"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
timezone: "{{ event.timezone }}"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
# event.start (traffic only) is the time the session started.
- date:
if: "ctx.event.timezone == null && ctx.event.start != null"
field: "event.start"
target_field: "event.start"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
- date:
if: "ctx.event.timezone != null && ctx.event.start != null"
field: "event.start"
target_field: "event.start"
timezone: "{{ event.timezone }}"
formats:
- "yyyy/MM/dd HH:mm:ss"
- "strict_date_optional_time_nanos"
on_failure: [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
# convert integer fields as the output of the CSV processor is always a string.
- convert: { type: long, ignore_missing: true, field: client.bytes }
- convert: { type: long, ignore_missing: true, field: client.packets }
- convert: { type: long, ignore_missing: true, field: client.port }
- convert: { type: long, ignore_missing: true, field: server.bytes }
- convert: { type: long, ignore_missing: true, field: server.packets }
- convert: { type: long, ignore_missing: true, field: server.port }
- convert: { type: long, ignore_missing: true, field: source.bytes }
- convert: { type: long, ignore_missing: true, field: source.packets }
- convert: { type: long, ignore_missing: true, field: source.port }
- convert: { type: long, ignore_missing: true, field: destination.bytes }
- convert: { type: long, ignore_missing: true, field: destination.packets }
- convert: { type: long, ignore_missing: true, field: destination.port }
- convert: { type: long, ignore_missing: true, field: network.bytes }
- convert: { type: long, ignore_missing: true, field: network.packets }
- convert: { type: long, ignore_missing: true, field: event.duration }
- convert: { type: long, ignore_missing: true, field: _temp_.labels }
- convert: { type: long, ignore_missing: true, field: panw.panos.sequence_number }
- convert: { type: long, ignore_missing: true, field: source.nat.port }
- convert: { type: long, ignore_missing: true, field: destination.nat.port }
- convert: { type: long, ignore_missing: true, field: client.nat.port }
- convert: { type: long, ignore_missing: true, field: server.nat.port }
- convert: { type: integer, ignore_missing: true, field: panw.panos.factorno }
- convert: { type: integer, ignore_missing: true, field: panw.panos.repeatcnt }
- convert: { type: integer, ignore_missing: true, field: panw.panos.timeout }
- community_id:
ignore_missing: true
- community_id:
target_field: panw.panos.network.nat.community_id
ignore_missing: true
ignore_failure: true
source_ip: source.nat.ip
source_port: source.nat.port
destination_ip: destination.nat.ip
destination_port: destination.nat.port
# Remove PCAP ID when zero (no packet capture).
- remove:
if: 'ctx?.panw?.panos?.network?.pcap_id == "0"'
field:
- panw.panos.network.pcap_id
# Extract 'flags' bitfield into labels.
- script:
lang: painless
if: 'ctx?._temp_?.labels != null && ctx._temp_.labels != 0'
params:
pcap_included: 0x80000000
ipv6_session: 0x02000000
ssl_decrypted: 0x01000000
url_filter_denied: 0x00800000
nat_translated: 0x00400000
captive_portal: 0x00200000
x_forwarded_for: 0x00080000
http_proxy: 0x00040000
container_page: 0x00008000
temporary_match: 0x00002000
symmetric_return: 0x00000800
source: >
def labels = ctx?.labels;
if (labels == null) {
labels = new HashMap();
ctx['labels'] = labels;
}
long value = ctx._temp_.labels;
for (entry in params.entrySet()) {
if ((value & entry.getValue()) != 0) {
labels[entry.getKey()] = true;
}
}
# normalize event.duration and determine event.end.
- script:
lang: painless
if: 'ctx?.event?.duration != null'
params:
NANOS_IN_A_SECOND: 1000000000
source: >
long nanos = ctx['event']['duration'] * params.NANOS_IN_A_SECOND;
ctx['event']['duration'] = nanos;
def start = ctx.event?.start;
if (start != null) {
ctx.event['end'] = ZonedDateTime.parse(start).plusNanos(nanos);
}
## TRAFFIC
- pipeline:
if: ctx?.panw?.panos?.type == "TRAFFIC"
name: '{< IngestPipeline "traffic" >}'
## THREAT
- pipeline:
if: ctx?.panw?.panos?.type == "THREAT"
name: '{< IngestPipeline "threat" >}'
## GLOBAL PROTECT
- pipeline:
if: ctx?.panw?.panos?.type == "GLOBALPROTECT"
name: '{< IngestPipeline "globalprotect" >}'
## USER ID
- pipeline:
if: ctx?.panw?.panos?.type == "USERID"
name: '{< IngestPipeline "userid" >}'
## HIPMATCH
- pipeline:
if: ctx?.panw?.panos?.type != null && ["HIP-MATCH", "HIPMATCH"].contains(ctx?.panw?.panos?.type)
name: '{< IngestPipeline "hipmatch" >}'
- append:
field: event.type
allow_duplicates: false
value: allowed
if: "ctx?.panw?.panos?.action != null && ['alert', 'allow', 'continue'].contains(ctx.panw.panos.action)"
- append:
field: event.type
allow_duplicates: false
value: denied
if: "ctx?.panw?.panos?.action != null && ['deny', 'drop', 'reset-client', 'reset-server', 'reset-both', 'block-url', 'block-ip', 'random-drop', 'sinkhole', 'block'].contains(ctx.panw.panos.action)"
- set:
field: event.outcome
value: success
# event.action for traffic logs.
- set:
field: event.action
value: flow_started
if: 'ctx?.panw?.panos?.sub_type == "start"'
- append:
field: event.type
allow_duplicates: false
value:
- start
- connection
if: 'ctx?.panw?.panos?.sub_type == "start"'
- set:
field: event.action
value: flow_terminated
if: 'ctx?.panw?.panos?.sub_type == "end"'
- append:
field: event.type
allow_duplicates: false
value:
- end
- connection
if: 'ctx?.panw?.panos?.sub_type == "end"'
- set:
field: event.action
value: flow_dropped
if: 'ctx?.panw?.panos?.sub_type == "drop"'
- append:
field: event.type
allow_duplicates: false
value:
- denied
- connection
if: 'ctx?.panw?.panos?.sub_type == "drop"'
- set:
field: event.action
value: flow_denied
if: 'ctx?.panw?.panos?.sub_type == "deny"'
- append:
field: event.type
allow_duplicates: false
value:
- denied
- connection
if: 'ctx?.panw?.panos?.sub_type == "deny"'
# event.action for threat logs.
- set:
field: event.action
value: data_match
if: 'ctx?.panw?.panos?.sub_type == "data"'
- set:
field: event.action
value: file_match
if: 'ctx?.panw?.panos?.sub_type == "file"'
- set:
field: event.action
value: flood_detected
if: 'ctx?.panw?.panos?.sub_type == "flood"'
- set:
field: event.action
value: packet_attack
if: 'ctx?.panw?.panos?.sub_type == "packet"'
- set:
field: event.action
value: scan_detected
if: 'ctx?.panw?.panos?.sub_type == "scan"'
- set:
field: event.action
value: spyware_detected
if: 'ctx?.panw?.panos?.sub_type == "spyware"'
- set:
field: event.action
value: url_filtering
if: 'ctx?.panw?.panos?.sub_type == "url"'
- set:
field: event.action
value: virus_detected
if: 'ctx?.panw?.panos?.sub_type == "virus"'
- set:
field: event.action
value: exploit_detected
if: 'ctx?.panw?.panos?.sub_type == "vulnerability"'
- set:
field: event.action
value: wildfire_verdict
if: 'ctx?.panw?.panos?.sub_type == "wildfire"'
- set:
field: event.action
value: wildfire_virus_detected
if: 'ctx?.panw?.panos?.sub_type == "wildfire-virus"'
# Set numeric log.level from event.severity.
- set:
field: "event.severity"
if: 'ctx?.log?.level == "critical"'
value: 1
- set:
field: "event.severity"
if: 'ctx?.log?.level == "high"'
value: 2
- set:
field: "event.severity"
if: 'ctx?.log?.level == "medium"'
value: 3
- set:
field: "event.severity"
if: 'ctx?.log?.level == "low"'
value: 4
- set:
field: "event.severity"
if: 'ctx?.log?.level == "informational"'
value: 5
# Normalize event.outcome.
# These values appear in the TRAFFIC docs but look like a mistake.
- lowercase:
field: panw.panos.action
ignore_missing: true
- gsub:
field: panw.panos.action
pattern: \s
replacement: "-"
ignore_missing: true
# Build related.ip array from src/dest/NAT IPs.
- append:
if: 'ctx?.source?.ip != null'
field: related.ip
allow_duplicates: false
value:
- '{{source.ip}}'
- append:
if: 'ctx?.destination?.ip != null'
field: related.ip
allow_duplicates: false
value:
- '{{destination.ip}}'
- append:
if: 'ctx?.source?.nat?.ip != null'
field: related.ip
allow_duplicates: false
value:
- '{{source.nat.ip}}'
- append:
if: 'ctx?.destination?.nat?.ip != null'
field: related.ip
allow_duplicates: false
value:
- '{{destination.nat.ip}}'
# Geolocation for source.
- geoip:
if: 'ctx?.source?.ip != null'
field: source.ip
target_field: source.geo
# Geolocation for destination.
- geoip:
if: 'ctx?.destination?.ip != null'
field: destination.ip
target_field: destination.geo
# IP Autonomous System (AS) Lookup
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: destination.ip
target_field: destination.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- rename:
field: destination.as.asn
target_field: destination.as.number
ignore_missing: true
- rename:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
# Set source|destination.geo.name from panw's srcloc|dstloc
- rename:
if: 'ctx.source?.geo?.name == null'
field: _temp_.srcloc
target_field: source.geo.name
ignore_missing: true
- rename:
if: 'ctx.destination?.geo?.name == null'
field: _temp_.dstloc
target_field: destination.geo.name
ignore_missing: true
# Append NAT community_id to network.community_id
- append:
if: 'ctx?.panw?.panos?.network?.nat?.community_id != null'
field: network.community_id
allow_duplicates: false
value:
- '{{panw.panos.network.nat.community_id}}'
- set:
field: rule.name
value: "{{panw.panos.ruleset}}"
ignore_empty_value: true
# Set url and file values
- rename:
if: 'ctx?.panw?.panos?.sub_type != "url"'
field: url.original
target_field: file.name
ignore_missing: true
- grok:
field: url.original
patterns:
- '(%{URIPROTO:url.scheme}\:\/\/)?(%{USERNAME:url.username}(\:%{PASSWORD:url.password})?\@)?%{DOMAIN:url.domain}(\:%{POSINT:url.port})?(%{PATH:url.path})?(\?%{QUERY:url.query})?(\#%{ANY:url.fragment})?'
ignore_missing: true
pattern_definitions:
USERNAME: '[^\:]*'
PASSWORD: '[^@]*'
DOMAIN: '[^\/\?#\:]*'
PATH: '[^\?#]*'
QUERY: '[^#]*'
ANY: '.*'
if: 'ctx?.url?.original != null && ctx?.url?.original != "-/" && ctx?.url?.original != ""'
- grok:
field: url.path
patterns:
- '%{FILENAME}((?:\.%{ANY})*(\.%{ANY:url.extension}))?'
ignore_missing: true
pattern_definitions:
FILENAME: '[^\.]+'
ANY: '.*'
if: 'ctx?.url?.path != null && ctx?.url?.path != ""'
- grok:
field: file.name
patterns:
- '%{FILENAME}((?:\.%{ANY})*(\.%{ANY:file.extension}))?'
ignore_missing: true
pattern_definitions:
FILENAME: '[^\.]+'
ANY: '.*'
if: 'ctx?.file?.name != null && ctx?.file?.name != ""'
- script:
lang: painless
description: Copy source.user to user
source: >
def clone(def ref) {
if (ref == null) return ref;
if (ref instanceof Map) {
ref = ref.entrySet().stream().collect(
Collectors.toMap(
e -> e.getKey(),
e -> clone(e.getValue())
)
);
} else if (ref instanceof List) {
ref = ref.stream().map(e -> clone(e)).collect(
Collectors.toList()
);
}
return ref;
}
def u = ctx?.source?.user;
if (u != null) {
ctx["user"] = clone(u);
}
- append:
field: related.user
allow_duplicates: false
value: "{{client.user.name}}"
if: "ctx?.client?.user?.name != null"
- append:
field: related.user
allow_duplicates: false
value: "{{source.user.name}}"
if: "ctx?.source?.user?.name != null"
- append:
field: related.user
allow_duplicates: false
value: "{{server.user.name}}"
if: "ctx?.server?.user?.name != null"
- append:
field: related.user
allow_duplicates: false
value: "{{destination.user.name}}"
if: "ctx?.destination?.user?.name != null"
- append:
field: related.user
allow_duplicates: false
value: "{{url.username}}"
if: "ctx?.url?.username != null && ctx?.url?.username != ''"
- append:
field: related.hash
allow_duplicates: false
value: "{{panw.panos.file.hash}}"
if: "ctx?.panw?.panos?.file?.hash != null"
- append:
field: related.hosts
allow_duplicates: false
value: "{{observer.hostname}}"
if: "ctx?.observer?.hostname != null && ctx.observer?.hostname != ''"
- append:
field: related.hosts
allow_duplicates: false
value: "{{host.name}}"
if: "ctx?.host?.name != null && ctx.host?.name != ''"
- append:
field: related.hosts
allow_duplicates: false
value: "{{url.domain}}"
if: "ctx?.url?.domain != null && ctx.url?.domain != ''"
# Remove temporary fields.
- remove:
field:
- _temp_
ignore_missing: true
# Remove NAT fields when translation was not done.
- remove:
field:
- source.nat.ip
- source.nat.port
- client.nat.ip
- client.nat.port
if: 'ctx?.source?.nat?.ip == "0.0.0.0" && ctx?.source?.nat?.port == 0'
- remove:
field:
- destination.nat.ip
- destination.nat.port
- server.nat.ip
- server.nat.port
if: 'ctx?.destination?.nat?.ip == "0.0.0.0" && ctx?.destination?.nat?.port == 0'
on_failure:
- append:
field: "error.message"
value: "{{ _ingest.on_failure_message }}"
- remove:
field:
- _temp_
ignore_missing: true