File: //usr/share/filebeat/module/nginx/ingress_controller/ingest/pipeline.yml
description:
Pipeline for parsing Nginx ingress controller access logs. Requires the
geoip and user_agent plugins.
processors:
- set:
field: event.ingested
value: "{{_ingest.timestamp}}"
- rename:
field: message
target_field: event.original
- grok:
field: event.original
patterns:
- (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.ingress_controller.remote_ip_list}|%{NOTSPACE:source.address})
- (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.ingress_controller.time}\] "%{DATA:nginx.ingress_controller.info}"
%{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long}
"(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" %{NUMBER:nginx.ingress_controller.http.request.length:long}
%{NUMBER:nginx.ingress_controller.http.request.time:double} \[%{DATA:nginx.ingress_controller.upstream.name}\]
\[%{DATA:nginx.ingress_controller.upstream.alternative_name}\] (%{UPSTREAM_ADDRESS_LIST:nginx.ingress_controller.upstream_address_list}|-)
(%{UPSTREAM_RESPONSE_LENGTH_LIST:nginx.ingress_controller.upstream.response.length_list}|-) (-|%{UPSTREAM_RESPONSE_TIME_LIST:nginx.ingress_controller.upstream.response.time_list})
(-|%{UPSTREAM_RESPONSE_STATUS_CODE_LIST:nginx.ingress_controller.upstream.response.status_code_list}) %{GREEDYDATA:nginx.ingress_controller.http.request.id}
pattern_definitions:
NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?
NGINX_NOTSEPARATOR: "[^\t ,:]+"
NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))*
UPSTREAM_ADDRESS_LIST: (unix:%{NOTSPACE}|(?:%{IP}(:%{NUMBER})?)("?,?\s*(?:%{IP}(:%{NUMBER})?))*)
UPSTREAM_RESPONSE_LENGTH_LIST: (?:%{NUMBER})("?,?\s*(?:%{NUMBER}))*
UPSTREAM_RESPONSE_TIME_LIST: (%{NUMBER}|(-|(?:%{NUMBER})),\s+(-|(?:%{NUMBER}))(\"?,?\s*(-|(?:%{NUMBER})))*)
UPSTREAM_RESPONSE_STATUS_CODE_LIST: (%{NUMBER}|(-|(?:%{NUMBER})),\s+(-|(?:%{NUMBER}))(\"?,?\s*(-|(?:%{NUMBER})))*)
IP: (?:\[?%{IPV6}\]?|%{IPV4})
ignore_missing: true
- grok:
field: nginx.ingress_controller.info
patterns:
- "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}"
- ""
ignore_missing: true
- uri_parts:
field: url.original
ignore_failure: true
- set:
field: url.domain
value: "{{destination.domain}}"
if: ctx.url?.domain == null && ctx.destination?.domain != null
- remove:
field: nginx.ingress_controller.info
- split:
field: nginx.ingress_controller.remote_ip_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream_address_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.length_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.time_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.status_code_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.origin
separator: '"?,?\s+'
ignore_missing: true
- set:
field: source.address
if: ctx.source?.address == null
value: ""
- set:
field: http.request.id
value: '{{{nginx.ingress_controller.http.request.id}}}'
ignore_empty_value: true
ignore_failure: true
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.length_list != null && ctx.nginx.ingress_controller.upstream.response.length_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.length_list.length == null) {
return;
}
int last_length = 0;
for (def item : ctx.nginx.ingress_controller.upstream.response.length_list) {
last_length = Integer.parseInt(item);
}
ctx.nginx.ingress_controller.upstream.response.length = last_length;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.length = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.time_list != null && ctx.nginx.ingress_controller.upstream.response.time_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.time_list.length == null) {
return;
}
float res_time = 0;
for (def item : ctx.nginx.ingress_controller.upstream.response.time_list) {
if (item != '-') {
res_time = res_time + Float.parseFloat(item);
}
}
ctx.nginx.ingress_controller.upstream.response.time = res_time;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.time = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.status_code_list != null && ctx.nginx.ingress_controller.upstream.response.status_code_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.status_code_list.length == null) {
return;
}
int last_status_code;
for (def item : ctx.nginx.ingress_controller.upstream.response.status_code_list) {
if (item != '-') {
last_status_code = Integer.parseInt(item);
}
}
ctx.nginx.ingress_controller.upstream.response.status_code = last_status_code;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.status_code = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream_address_list != null && ctx.nginx.ingress_controller.upstream_address_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream_address_list.length == null) {
return;
}
def last_upstream = "";
for (def item : ctx.nginx.ingress_controller.upstream_address_list) {
last_upstream = item;
}
ctx.nginx.ingress_controller.upstream.address = last_upstream;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.address = null;
}
- grok:
field: nginx.ingress_controller.upstream.address
patterns:
- "^%{IPV4:nginx.ingress_controller.upstream.ip}:%{NUMBER:nginx.ingress_controller.upstream.port}$"
- "^\\[%{IPV6:nginx.ingress_controller.upstream.ip}\\]:%{NUMBER:nginx.ingress_controller.upstream.port}$"
- "^%{IPV6NOCOMPRESS:nginx.ingress_controller.upstream.ip}:%{NUMBER:nginx.ingress_controller.upstream.port}$"
- "^%{IPV6:nginx.ingress_controller.upstream.ip}%{IPV6PORTSEP}%{NUMBER:nginx.ingress_controller.upstream.port}$"
- "^%{IPV6:nginx.ingress_controller.upstream.ip}%{IPV6PORTSEP}%{POSINT:nginx.ingress_controller.upstream.port}$"
pattern_definitions:
IPV6NOCOMPRESS: '([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}'
IPV6PORTSEP: '(?: port |[p#.])'
ignore_missing: true
ignore_failure: true
- convert:
field: nginx.ingress_controller.upstream.ip
type: ip
ignore_missing: true
on_failure:
- remove:
field: nginx.ingress_controller.upstream.ip
- convert:
field: nginx.ingress_controller.upstream.port
type: long
ignore_missing: true
on_failure:
- remove:
field: nginx.ingress_controller.upstream.port
- remove:
field: nginx.ingress_controller.upstream.address
ignore_failure: true
- script:
if: ctx.nginx?.ingress_controller?.remote_ip_list != null && ctx.nginx.ingress_controller.remote_ip_list.length > 0
lang: painless
source: >-
boolean isPrivate(def dot, def ip) {
try {
StringTokenizer tok = new StringTokenizer(ip, dot);
int firstByte = Integer.parseInt(tok.nextToken());
int secondByte = Integer.parseInt(tok.nextToken());
if (firstByte == 10) {
return true;
}
if (firstByte == 192 && secondByte == 168) {
return true;
}
if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {
return true;
}
if (firstByte == 127) {
return true;
}
return false;
}
catch (Exception e) {
return false;
}
}
try {
ctx.source.address = null;
if (ctx.nginx.ingress_controller.remote_ip_list == null) {
return;
}
def found = false;
for (def item : ctx.nginx.ingress_controller.remote_ip_list) {
if (!isPrivate(params.dot, item)) {
ctx.source.address = item;
found = true;
break;
}
}
if (!found) {
ctx.source.address = ctx.nginx.ingress_controller.remote_ip_list[0];
}
}
catch (Exception e) {
ctx.source.address = null;
}
params:
dot: .
- remove:
field: source.address
if: ctx.source.address == null
- grok:
field: source.address
patterns:
- ^%{IP:source.ip}$
ignore_failure: true
- set:
copy_from: "@timestamp"
field: event.created
- date:
field: nginx.ingress_controller.time
target_field: "@timestamp"
formats:
- dd/MMM/yyyy:H:m:s Z
on_failure:
- append:
field: error.message
value: "{{ _ingest.on_failure_message }}"
- remove:
field: nginx.ingress_controller.time
- user_agent:
field: user_agent.original
ignore_missing: true
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- set:
field: event.kind
value: event
- append:
field: event.category
value: web
- append:
field: event.type
value: info
- set:
field: event.outcome
value: success
if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code < 400"
- set:
field: event.outcome
value: failure
if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code >= 400"
- append:
field: related.ip
value: "{{source.ip}}"
if: "ctx?.source?.ip != null"
allow_duplicates: false
- append:
field: related.ip
value: "{{destination.ip}}"
if: "ctx?.destination?.ip != null"
allow_duplicates: false
- append:
field: related.ip
value: "{{nginx.ingress_controller.upstream.ip}}"
if: "ctx?.nginx?.ingress_controller?.upstream?.ip != null"
allow_duplicates: false
- append:
field: related.user
value: "{{user.name}}"
if: "ctx?.user?.name != null"
- script:
lang: painless
description: This script processor iterates over the whole document to remove fields with null values.
source: |
void handleMap(Map map) {
for (def x : map.values()) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
map.values().removeIf(v -> v == null);
}
void handleList(List list) {
for (def x : list) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
}
handleMap(ctx);
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"