File: //usr/share/filebeat/module/juniper/srx/ingest/pipeline.yml
# This module only supports syslog messages in the format "structured-data + brief"
# https://www.juniper.net/documentation/en_US/junos/topics/reference/configuration-statement/structured-data-edit-system.html
description: Pipeline for parsing junipersrx firewall logs
processors:
- grok:
field: message
patterns:
- '^<%{POSINT:syslog_pri}>(\d{1,3}\s)?(?:%{TIMESTAMP_ISO8601:_temp_.raw_date})\s%{SYSLOGHOST:syslog_hostname}\s%{PROG:syslog_program}\s(?:%{POSINT:syslog_pid}|-)?\s%{WORD:log_type}\s\[(?:[^=]+\s)?%{GREEDYDATA:log.original}\]$'
# split Juniper-SRX fields
- kv:
field: log.original
field_split: " (?=[a-z0-9\\_\\-]+=)"
value_split: "="
prefix: "juniper.srx."
ignore_missing: true
ignore_failure: false
trim_value: "\""
# Converts all kebab-case key names to snake_case
- script:
lang: painless
source: >-
ctx.juniper.srx = ctx?.juniper?.srx.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().replace('-', '_'), e -> e.getValue()));
#
# Parse the date
#
- date:
if: "ctx?.event?.timezone == null"
field: _temp_.raw_date
target_field: "@timestamp"
formats:
- yyyy-MM-dd HH:mm:ss
- yyyy-MM-dd HH:mm:ss z
- yyyy-MM-dd HH:mm:ss Z
- ISO8601
- date:
if: "ctx?.event?.timezone != null"
timezone: "{{ event.timezone }}"
field: _temp_.raw_date
target_field: "@timestamp"
formats:
- yyyy-MM-dd HH:mm:ss
- yyyy-MM-dd HH:mm:ss z
- yyyy-MM-dd HH:mm:ss Z
- ISO8601
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
# Can possibly be omitted if there is a solution for the equal signs and the calculation of the start time.
# -> juniper.srx.elapsed_time
- rename:
field: juniper.srx.elapsed_time
target_field: juniper.srx.duration
if: "ctx?.juniper?.srx?.elapsed_time != null"
# Sets starts, end and duration when start and duration is known
- script:
lang: painless
if: ctx?.juniper?.srx?.duration != null
source: >-
ctx.event.duration = Integer.parseInt(ctx.juniper.srx.duration) * 1000000000L;
ctx.event.start = ctx['@timestamp'];
ZonedDateTime start = ZonedDateTime.parse(ctx.event.start);
ctx.event.end = start.plus(ctx.event.duration, ChronoUnit.NANOS);
# Removes all empty fields
- script:
lang: painless
params:
values:
- "None"
- "UNKNOWN"
- "N/A"
- "-"
source: >-
ctx?.juniper?.srx.entrySet().removeIf(entry -> params.values.contains(entry.getValue()));
#######################
## ECS Event Mapping ##
#######################
- set:
field: event.module
value: juniper
- set:
field: event.dataset
value: juniper.srx
- convert:
field: syslog_pri
type: long
target_field: event.severity
ignore_failure: true
- rename:
field: log.original
target_field: event.original
ignore_missing: true
#####################
## ECS Log Mapping ##
#####################
# https://www.juniper.net/documentation/en_US/junos/topics/reference/general/syslog-interpreting-msg-generated-structured-data-format.html#fac_sev_codes
- set:
field: "log.level"
if: '["0", "8", "16", "24", "32", "40", "48", "56", "64", "72", "80", "88", "96", "104", "112", "128", "136", "144", "152", "160", "168", "176", "184"].contains(ctx.syslog_pri)'
value: emergency
- set:
field: "log.level"
if: '["1", "9", "17", "25", "33", "41", "49", "57", "65", "73", "81", "89", "97", "105", "113", "129", "137", "145", "153", "161", "169", "177", "185"].contains(ctx.syslog_pri)'
value: alert
- set:
field: "log.level"
if: '["2", "10", "18", "26", "34", "42", "50", "58", "66", "74", "82", "90", "98", "106", "114", "130", "138", "146", "154", "162", "170", "178", "186"].contains(ctx.syslog_pri)'
value: critical
- set:
field: "log.level"
if: '["3", "11", "19", "27", "35", "43", "51", "59", "67", "75", "83", "91", "99", "107", "115", "131", "139", "147", "155", "163", "171", "179", "187"].contains(ctx.syslog_pri)'
value: error
- set:
field: "log.level"
if: '["4", "12", "20", "28", "36", "44", "52", "60", "68", "76", "84", "92", "100", "108", "116", "132", "140", "148", "156", "164", "172", "180", "188"].contains(ctx.syslog_pri)'
value: warning
- set:
field: "log.level"
if: '["5", "13", "21", "29", "37", "45", "53", "61", "69", "77", "85", "93", "101", "109", "117", "133", "141", "149", "157", "165", "173", "181", "189"].contains(ctx.syslog_pri)'
value: notification
- set:
field: "log.level"
if: '["6", "14", "22", "30", "38", "46", "54", "62", "70", "78", "86", "94", "102", "110", "118", "134", "142", "150", "158", "166", "174", "182", "190"].contains(ctx.syslog_pri)'
value: informational
- set:
field: "log.level"
if: '["7", "15", "23", "31", "39", "47", "55", "63", "71", "79", "87", "95", "103", "111", "119", "135", "143", "151", "159", "167", "175", "183", "191"].contains(ctx.syslog_pri)'
value: debug
##########################
## ECS Observer Mapping ##
##########################
- set:
field: observer.vendor
value: Juniper
- set:
field: observer.product
value: SRX
- set:
field: observer.type
value: firewall
- rename:
field: syslog_hostname
target_field: observer.name
ignore_missing: true
- rename:
field: juniper.srx.packet_incoming_interface
target_field: observer.ingress.interface.name
ignore_missing: true
- rename:
field: juniper.srx.destination_interface_name
target_field: observer.egress.interface.name
ignore_missing: true
- rename:
field: juniper.srx.source_interface_name
target_field: observer.ingress.interface.name
ignore_missing: true
- rename:
field: juniper.srx.interface_name
target_field: observer.ingress.interface.name
ignore_missing: true
- rename:
field: juniper.srx.source_zone_name
target_field: observer.ingress.zone
ignore_missing: true
- rename:
field: juniper.srx.source_zone
target_field: observer.ingress.zone
ignore_missing: true
- rename:
field: juniper.srx.destination_zone_name
target_field: observer.egress.zone
ignore_missing: true
- rename:
field: juniper.srx.destination_zone
target_field: observer.egress.zone
ignore_missing: true
- rename:
field: syslog_program
target_field: juniper.srx.process
ignore_missing: true
- rename:
field: log_type
target_field: juniper.srx.tag
ignore_missing: true
#############
## Cleanup ##
#############
- remove:
field:
- message
- _temp_.raw_date
- juniper.srx.duration
- juniper.srx.dir_disp
- juniper.srx.srczone
- juniper.srx.dstzone
- juniper.srx.duration
- syslog_pri
ignore_missing: true
################################
## Product Specific Pipelines ##
################################
- pipeline:
name: '{< IngestPipeline "flow" >}'
if: "ctx.juniper?.srx?.process == 'RT_FLOW'"
- pipeline:
name: '{< IngestPipeline "utm" >}'
if: "ctx.juniper?.srx?.process == 'RT_UTM'"
- pipeline:
name: '{< IngestPipeline "idp" >}'
if: "ctx.juniper?.srx?.process == 'RT_IDP'"
- pipeline:
name: '{< IngestPipeline "ids" >}'
if: "ctx.juniper?.srx?.process == 'RT_IDS'"
- pipeline:
name: '{< IngestPipeline "atp" >}'
if: "ctx.juniper?.srx?.process == 'RT_AAMW'"
- pipeline:
name: '{< IngestPipeline "secintel" >}'
if: "ctx.juniper?.srx?.process == 'RT_SECINTEL'"
#########################
## ECS Related Mapping ##
#########################
- append:
if: 'ctx.source?.ip != null'
field: related.ip
value: '{{source.ip}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx.destination?.ip != null'
field: related.ip
value: '{{destination.ip}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx.source?.nat?.ip != null'
field: related.ip
value: '{{source.nat.ip}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx?.destination?.nat?.ip != null'
field: related.ip
value: '{{destination.nat.ip}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx.url?.domain != null'
field: related.hosts
value: '{{url.domain}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx.source?.domain != null'
field: related.hosts
value: '{{source.domain}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx.destination?.domain != null'
field: related.hosts
value: '{{destination.domain}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx?.source?.user?.name != null'
field: related.user
value: '{{source.user.name}}'
ignore_failure: true
allow_duplicates: false
- append:
if: 'ctx?.destination?.user?.name != null'
field: related.user
value: '{{destination.user.name}}'
ignore_failure: true
allow_duplicates: false
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'