File: //usr/share/filebeat/module/threatintel/threatq/ingest/pipeline.yml
---
description: Pipeline for parsing ThreatQ Threat Intel
processors:
####################
# Event ECS fields #
####################
- set:
field: event.ingested
value: "{{_ingest.timestamp}}"
- set:
field: event.kind
value: enrichment
- set:
field: event.category
value: threat
- set:
field: event.type
value: indicator
###############
# Parse dates #
###############
- rename:
field: message
target_field: event.original
ignore_missing: true
- json:
field: event.original
target_field: json
- fingerprint:
fields:
- json.id
- json.indicator_id
target_field: "_id"
ignore_missing: true
- date:
target_field: "@timestamp"
field: "json.updated_at"
formats:
- "yyyy-MM-dd HH:mm:ss"
if: "ctx.json.updated_at != null"
ignore_failure: true
- date:
target_field: "threatq.created_at"
field: "json.created_at"
formats:
- "yyyy-MM-dd HH:mm:ss"
if: "ctx.json.created_at != null"
ignore_failure: true
- date:
target_field: "threatq.expires_at"
field: "json.expires_at"
formats:
- "yyyy-MM-dd HH:mm:ss"
if: "ctx.json.expires_at != null"
ignore_failure: true
- date:
target_field: "threatq.expires_calculated_at"
field: "json.expires_calculated_at"
formats:
- "yyyy-MM-dd HH:mm:ss"
if: "ctx.json.expires_calculated_at != null"
ignore_failure: true
- date:
target_field: "threatq.published_at"
field: "json.published_at"
formats:
- "yyyy-MM-dd HH:mm:ss"
if: "ctx.json.published_at != null"
ignore_failure: true
#####################
# Threat ECS Fields #
#####################
- set:
field: threat.feed.name
value: "[Filebeat] ThreatQuotient"
- set:
field: threat.feed.dashboard_id
value: "ad9c7430-72de-11eb-a3e3-b3cc7c78a70f"
- rename:
field: json.type.name
target_field: threat.indicator.type
ignore_missing: true
- rename:
field: json.description
target_field: threat.indicator.description
ignore_missing: true
- script:
lang: painless
if: ctx.json?.score != null
description: >
Normalize confidence level.
source: >
def value = ctx.json.score;
if (value <= 0.0 || value > 100.0) {
ctx.threat.indicator.confidence = "None";
return;
}
if (value >= 1.0 && value <= 29.0) {
ctx.threat.indicator.confidence = "Low";
return;
}
if (value >= 30.0 && value <= 69.0) {
ctx.threat.indicator.confidence = "Med";
return;
}
if (value >= 70 && value <= 100) {
ctx.threat.indicator.confidence = "High";
return;
}
- rename:
field: json.status.name
target_field: threatq.status
ignore_missing: true
- rename:
field: json.value
target_field: threatq.indicator_value
ignore_missing: true
#########################################
# Map indicator types and values to ECS #
#########################################
# Indicator type: Email Address
- set:
field: threat.indicator.email.address
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'Email Address'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: email-addr
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'Email Address'"
# Indicator type: FQDN
- set:
field: threat.indicator.domain
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'FQDN'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: domain-name
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'FQDN'"
# Indicator type: IP Address
- set:
field: threat.indicator.ip
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'IP Address'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: ipv4-addr
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'IP Address'"
# Indicator type: IPv6 Address
- set:
field: threat.indicator.domain
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'IPv6 Address'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: ipv6-addr
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'IPv6 Address'"
# Indicator type: MD5
- set:
field: threat.indicator.file.hash.md5
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'MD5'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: file
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'MD5'"
# Indicator type: SHA-1
- set:
field: threat.indicator.file.hash.sha1
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-1'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: file
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-1'"
# Indicator type: SHA-256
- set:
field: threat.indicator.file.hash.sha256
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-256'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: file
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-256'"
# Indicator type: SHA-512
- set:
field: threat.indicator.file.hash.sha512
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-512'"
ignore_empty_value: true
- set:
field: threat.indicator.type
value: file
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'SHA-512'"
# Indicator type: URL
- uri_parts:
field: threatq.indicator_value
target_field: threat.indicator.url
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'URL' && ctx.threatq?.indicator_value != null"
remove_if_successful: true
- set:
field: threat.indicator.type
value: url
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'URL'"
# Indicator type: x509 Serial
- set:
field: threat.indicator.x509.serial_number
copy_from: threatq.indicator_value
if: "ctx.threat?.indicator?.type != null && ctx.threat?.indicator?.type == 'x509 Serial'"
ignore_empty_value: true
###################################
# Map indicator providers and TLP #
###################################
- script:
if: "ctx.json?.sources != null && ctx.json?.sources instanceof List && ctx.json?.sources.size() > 0"
lang: painless
description: "Extract TLP and providers from source"
source: |-
def providers = new ArrayList();
def tlps = new ArrayList();
for (source in ctx.json.sources) {
if (source == null) {
return;
}
if (source.containsKey("provider") && source["provider"] != null) {
providers.add(source["provider"]);
}
if (source.containsKey("tlp_name") && source["tlp_name"] != null) {
tlps.add(source["tlp_name"]);
}
}
if (tlps.size() > 0) {
if (ctx.threat.indicator.marking == null) {
ctx.threat.indicator.marking = new HashMap();
}
ctx.threat.indicator.marking.tlp = tlps;
}
if (providers.size() > 0) {
if (ctx.threat.indicator.provider == null) {
ctx.threat.indicator.provider = new HashMap();
}
ctx.threat.indicator.provider = providers;
}
############################
# Map indicator attributes #
############################
- foreach:
description: Change attribute names to lowercase
field: json.attributes
ignore_missing: true
processor:
lowercase:
field: "_ingest._value.name"
- foreach:
description: Replaces spaces with underscore in attribute names
field: json.attributes
ignore_missing: true
processor:
gsub:
field: "_ingest._value.name"
pattern: " "
replacement: "_"
- foreach:
description: Append attributes
field: json.attributes
ignore_missing: true
processor:
append:
field: threatq.attributes.{{{ _ingest._value.name }}}
value: "{{{ _ingest._value.value }}}"
#############################
# Map indicator adversaries #
#############################
- foreach:
field: json.adversaries
ignore_missing: true
processor:
append:
field: threatq.adversaries
value: "{{{ _ingest._value.name }}}"
######################
# Cleanup processors #
######################
# Setting indicator type to unknown if it does not match anything
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
ignore_failure: true
ignore_missing: true
- set:
field: threat.indicator.type
value: unknown
if: ctx.threat?.indicator?.type == null
- script:
lang: painless
if: ctx.threat != null
source: |
void handleMap(Map map) {
for (def x : map.values()) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
map.values().removeIf(v -> v == null);
}
void handleList(List list) {
for (def x : list) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
}
handleMap(ctx);
# Removing fields not needed anymore, either because its copied somewhere else, or is not relevant to this event
- remove:
field:
- json
- message
ignore_missing: true
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"