File: //usr/share/filebeat/module/azure/platformlogs/ingest/pipeline.yml
description: Pipeline for parsing azure platform logs.
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- rename:
field: azure
target_field: azure-eventhub
ignore_missing: true
- script:
source: ctx.message = ctx.message.replace(params.empty_field_name, '')
params:
empty_field_name: '"":"",'
ignore_failure: true
- json:
field: message
target_field: azure.platformlogs
- rename:
field: azure.platformlogs.identity
target_field: azure.platformlogs.identity_name
ignore_missing: true
description: 'Rename the field to `identity_name` to avoid conflicts with the `identity` containing a JSON object.'
if: "ctx.azure?.platformlogs?.identity instanceof String"
- date:
field: azure.platformlogs.time
target_field: '@timestamp'
ignore_failure: true
formats:
- ISO8601
- date:
field: azure.platformlogs.EventTimeString
target_field: '@timestamp'
ignore_failure: true
formats:
- ISO8601
- "M/d/yyyy h:mm:ss a XXX"
- rename:
field: message
target_field: event.original
- remove:
field: azure.platformlogs.time
ignore_missing: true
- rename:
field: azure.platformlogs.resourceId
target_field: azure.resource_id
ignore_missing: true
- rename:
field: azure.platformlogs.Region
target_field: cloud.region
ignore_missing: true
- json:
field: azure.platformlogs.EventProperties
target_field: azure.platformlogs.properties
ignore_failure: true
- remove:
if: ctx.azure.platformlogs.properties != null
field:
- azure.platformlogs.EventProperties
ignore_missing: true
- json:
field: azure.platformlogs.properties.log
target_field: azure.platformlogs.properties.log
ignore_failure: true
- rename:
field: azure.platformlogs.properties.log
if: "ctx.azure?.platformlogs?.properties?.log != null && ctx.azure?.platformlogs?.properties?.log instanceof String"
target_field: message
ignore_missing: true
- rename:
field: azure.platformlogs.EventName
target_field: event.action
ignore_missing: true
- grok:
field: azure.platformlogs.callerIpAddress
patterns:
- \[%{IPORHOST:source.ip}\]:%{INT:source.port:int}
- "%{IPORHOST:source.ip}:%{INT:source.port:int}"
- "%{IPORHOST:source.ip}"
ignore_missing: true
ignore_failure: true
- remove:
field: azure.platformlogs.callerIpAddress
if: 'ctx.source?.ip != null'
ignore_missing: true
- set:
field: client.ip
value: '{{source.ip}}'
ignore_empty_value: true
- append:
field: related.ip
value: '{{source.ip}}'
allow_duplicates: false
if: 'ctx.source?.ip != null'
- rename:
field: azure.platformlogs.level
target_field: log.level
ignore_missing: true
- rename:
field: azure.platformlogs.durationMs
target_field: event.duration
ignore_missing: true
- script:
lang: painless
source: if (ctx.event.duration!= null) {ctx.event.duration = ctx.event.duration
* params.param_nano;}
params:
param_nano: 1000000
ignore_failure: true
- rename:
field: azure.platformlogs.location
target_field: geo.name
ignore_missing: true
- script:
lang: painless
source: >-
if (ctx?.azure?.platformlogs?.properties?.eventCategory != null) {
ctx.azure.platformlogs.event_category = ctx.azure.platformlogs.properties.eventCategory;
}
else if (ctx?.azure?.platformlogs?.properties?.policies != null) {
ctx.azure.platformlogs.event_category = 'Policy';
}
else {
ctx.azure.platformlogs.event_category = 'Administrative';
}
ignore_failure: true
- rename:
field: azure.platformlogs.resultType
target_field: azure.platformlogs.result_type
ignore_missing: true
- convert:
field: azure.platformlogs.result_type
target_field: event.outcome
type: string
if: "ctx?.azure?.platformlogs?.result_type != null && ctx.azure.platformlogs.result_type instanceof String && (ctx.azure.platformlogs.result_type.toLowerCase() == 'success' || ctx.azure.platformlogs.result_type.toLowerCase() == 'failure')"
- convert:
field: azure.platformlogs.properties.result
target_field: event.outcome
type: string
if: "ctx?.event?.outcome == null && ctx?.azure?.platformlogs?.properties?.result != null && ctx?.azure?.platformlogs?.properties?.result instanceof String && ['success', 'failure', 'unknown'].contains(ctx.azure?.platformlogs?.properties?.result)"
- convert:
field: azure.platformlogs.Status
target_field: event.outcome
type: string
if: "ctx?.event?.outcome == null && ctx?.azure?.platformlogs?.Status != null && ctx?.azure?.platformlogs?.Status instanceof String && ['success', 'failure', 'unknown', 'Succeeded', 'Failed'].contains(ctx.azure?.platformlogs?.Status)"
- rename:
field: azure.platformlogs.operationName
target_field: azure.platformlogs.operation_name
ignore_missing: true
- convert:
field: azure.platformlogs.operation_name
target_field: event.action
type: string
ignore_missing: true
- rename:
field: azure.platformlogs.resultSignature
target_field: azure.platformlogs.result_signature
ignore_missing: true
- rename:
field: azure.platformlogs.correlationId
target_field: azure.correlation_id
ignore_missing: true
- rename:
field: azure.platformlogs.properties.statusCode
target_field: azure.platformlogs.properties.status_code
ignore_missing: true
- rename:
field: azure.platformlogs.Status
target_field: azure.platformlogs.status
ignore_missing: true
- geoip:
field: source.ip
target_field: geo
ignore_missing: true
- script:
lang: painless
ignore_failure: true
params:
"write":
type:
- change
"read":
type:
- access
"delete":
type:
- deletion
"action":
type:
- change
source: >-
if (ctx?.azure?.platformlogs?.category == null) {
return;
}
def hm = new HashMap(params.get(ctx.azure.platformlogs.category.toLowerCase()));
hm.forEach((k, v) -> ctx.event[k] = v);
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- set:
field: event.kind
value: event
- pipeline:
name: '{< IngestPipeline "azure-shared-pipeline" >}'
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'