File: //usr/share/filebeat/module/threatintel/anomalithreatstream/ingest/pipeline.yml
---
description: Pipeline for parsing Anomali ThreatStream
processors:
#
# Set basic ECS fields.
#
- set:
field: event.ingested
value: "{{{ _ingest.timestamp }}}"
- fingerprint:
fields:
- event.dataset
- json.id
target_field: "_id"
ignore_missing: true
- set:
field: event.kind
value: enrichment
- set:
field: event.category
value: threat
- set:
field: event.type
value: indicator
- set:
field: threat.feed.name
value: "[Filebeat] Anomali ThreatStream"
- set:
field: threat.feed.dashboard_id
value: "ad9c7430-72de-11eb-a3e3-b3cc7c78a70f"
#
# Map itype field to STIX 2.0 Cyber Observable values (threat.indicator.type).
#
- script:
lang: painless
if: "ctx.json.itype != null"
description: >
Map itype field to STIX 2.0 Cyber Observable values (threat.indicator.type).
params:
actor_ip: ipv4-addr
adware_domain: domain-name
anon_proxy: ipv4-addr
anon_vpn: ipv4-addr
apt_domain: domain-name
apt_email: email-addr
apt_ip: ipv4-addr
apt_md5: file
apt_subject: email
apt_ua: url
apt_url: url
bot_ip: ipv4-addr
brute_ip: ipv4-addr
c2_domain: domain-name
c2_ip: ipv4-addr
c2_url: url
comm_proxy_domain: domain-name
comm_proxy_ip: ipv4-addr
compromised_domain: domain-name
compromised_ip: ipv4-addr
compromised_url: url
crypto_hash: file
crypto_ip: ipv4-addr
crypto_pool: domain
crypto_url: url
crypto_wallet: file
ddos_ip: ipv4-addr
disposable_email_domain: domain-name
dyn_dns: domain-name
exfil_domain: domain-name
exfil_ip: ipv4-addr
exfil_url: url
exploit_domain: domain-name
exploit_ip: ipv4-addr
exploit_url: url
free_email_domain: domain-name
geolocation_url: url
hack_tool: file
i2p_ip: ipv4-addr
ipcheck_url: url
mal_domain: domain-name
mal_email: email-addr
mal_ip: ipv4-addr
mal_md5: file
mal_sslcert_sh1: x509-certificate
mal_sslcert_sha1: x509-certificate
mal_ua: url
mal_url: url
p2pcnc: ipv4-addr
parked_domain: domain-name
parked_ip: ipv4-addr
parked_url: url
pastesite_url: url
phish_domain: domain-name
phish_email: email-addr
phish_ip: ipv4-addr
phish_url: url
proxy_ip: ipv4-addr
scan_ip: ipv4-addr
sinkhole_domain: domain-name
sinkhole_ip: ipv4-addr
spam_domain: domain-name
spam_email: email-addr
spam_ip: ipv4-addr
spam_url: url
speedtest_url: url
ssh_ip: ipv4-addr
suppress: suppress
suspicious_domain: domain-name
suspicious_email: email-addr
suspicious_ip: ipv4-addr
suspicious_reg_email: email-addr
suspicious_url: url
tor_ip: ipv4-addr
torrent_tracker_url: url
vpn_domain: domain-name
vps_ip: ipv4-addr
whois_bulk_reg_email: email-addr
whois_privacy_domain: domain-name
whois_privacy_email: email-addr
source: >
String mapping = params[ctx.json.itype];
if (mapping != null) {
ctx["threatintel_indicator_type"] = mapping;
}
on_failure:
- append:
field: error.message
value: 'Unable to determine indicator type from "{{{ json.itype }}}": {{{ _ingest.on_failure_message }}}'
- rename:
field: threatintel_indicator_type
target_field: threat.indicator.type
ignore_missing: true
#
# Detect ipv6 for ipv4-addr types.
#
- set:
field: threat.indicator.type
value: ipv6-addr
if: 'ctx.threat?.indicator?.type == "ipv4-addr" && ctx.json?.srcip != null && ctx.json.srcip.contains(":")'
#
# Map first and last seen dates.
#
- date:
field: json.date_first
target_field: threat.indicator.first_seen
formats:
- ISO8601
if: "ctx.json?.date_first != null"
on_failure:
- append:
field: error.message
value: 'Error parsing date_first field value "{{{ json.date_first }}}": {{{ _ingest.on_failure_message }}}'
- date:
field: json.date_last
target_field: threat.indicator.last_seen
formats:
- ISO8601
if: "ctx.json?.date_last != null"
on_failure:
- append:
field: error.message
value: 'Error parsing date_last field value "{{{ json.date_last }}}": {{{ _ingest.on_failure_message }}}'
#
# Map IP geolocation fields.
#
- convert:
field: json.lat
target_field: threat.indicator.geo.location.lat
type: double
if: "ctx.json?.lat != null && ctx.json?.lon != null"
on_failure:
- append:
field: error.message
value: 'Cannot convert lat field "{{{ json.lat }}}" to double: {{{ _ingest.on_failure_message }}}'
- convert:
field: json.lon
target_field: threat.indicator.geo.location.lon
type: double
if: "ctx.json?.lat != null && ctx.json?.lon != null"
on_failure:
- append:
field: error.message
value: 'Cannot convert lon field "{{{ json.lon }}}" to double: {{{ _ingest.on_failure_message }}}'
#
# Map classification field to Traffic Light Protocol (TLP).
# Currently:
# public => White ("Disclosure is not limited.")
# private => Amber ("Limited disclosure, restricted to participants’ organizations.").
#
- append:
field: threat.indicator.marking.tlp
value: Amber
if: 'ctx.json?.classification == "private"'
- append:
field: threat.indicator.marking.tlp
value: White
if: 'ctx.json?.classification == "public"'
#
# Convert confidence field (-1..100) to ECS confidence (0..10).
#
- script:
lang: painless
if: ctx.json?.confidence != null
description: >
Normalize confidence level.
source: >
def value = ctx.json.confidence;
if (value <= 0.0 || value > 100.0) {
ctx["threatintel_indicator_confidence"] = "None";
return;
}
if (value >= 1.0 && value <= 29.0) {
ctx["threatintel_indicator_confidence"] = "Low";
return;
}
if (value >= 30.0 && value <= 69.0) {
ctx["threatintel_indicator_confidence"] = "Med";
return;
}
if (value >= 70 && value <= 100) {
ctx["threatintel_indicator_confidence"] = "High";
return;
}
on_failure:
- append:
field: error.message
value: "failed to normalize confidence value `{{{ json.confidence }}}`: {{{ _ingest.on_failure_message }}}"
- rename:
field: threatintel_indicator_confidence
target_field: threat.indicator.confidence
ignore_missing: true
#
# Convert asn field.
#
- convert:
field: json.asn
target_field: threat.indicator.as.number
type: long
ignore_missing: true
on_failure:
- append:
field: error.message
value: "Cannot convert asn field `{{{ json.asn }}}` to long: {{{ _ingest.on_failure_message }}}"
- rename:
field: json.org
target_field: threat.indicator.as.organization.name
ignore_missing: true
- rename:
field: json.email
target_field: threat.indicator.email.address
ignore_missing: true
- rename:
field: json.srcip
target_field: threat.indicator.ip
ignore_missing: true
- uri_parts:
field: json.url
target_field: threat.indicator.url
keep_original: true
remove_if_successful: true
if: "ctx.json?.url != null"
on_failure:
- append:
field: error.message
value: "Cannot parse url field `{{{ json.url }}}`: {{{ _ingest.on_failure_message }}}"
- set:
field: threat.indicator.url.full
value: "{{{threat.indicator.url.original}}}"
ignore_empty_value: true
- rename:
field: json.domain
target_field: threat.indicator.url.domain
ignore_missing: true
if: ctx.threat?.indicator?.url?.domain == null
- rename:
field: json.country
target_field: threat.indicator.geo.country_iso_code
ignore_missing: true
#
# md5 field can actually contain different kinds of hash.
# Map to file.hash.* depending on hash length.
#
- rename:
field: json.md5
target_field: threat.indicator.file.hash.md5
if: "ctx.json?.md5 != null && ctx.json.md5.length() == 32"
- rename:
field: json.md5
target_field: threat.indicator.file.hash.sha1
if: "ctx.json?.md5 != null && ctx.json.md5.length() == 40"
- rename:
field: json.md5
target_field: threat.indicator.file.hash.sha256
if: "ctx.json?.md5 != null && ctx.json.md5.length() == 64"
- rename:
field: json.md5
target_field: threat.indicator.file.hash.sha512
if: "ctx.json?.md5 != null && ctx.json.md5.length() == 128"
- rename:
field: json.source
target_field: threat.indicator.provider
ignore_missing: true
#
# Map field severity to event severity as follows:
# low => 3
# medium => 5
# high => 7
# very-high => 9
#
- set:
field: event.severity
value: 3
if: 'ctx.json?.severity == "low"'
- set:
field: event.severity
value: 5
if: 'ctx.json?.severity == "medium"'
- set:
field: event.severity
value: 7
if: 'ctx.json?.severity == "high"'
- set:
field: event.severity
value: 9
if: 'ctx.json?.severity == "very-high"'
#
# Field trusted_circles_ids is a comma-separated string
# that can contain leading and trailing separators (i.e. ",123,").
# Need a script processor as split processor doesn't support
# removing non-trailing separators.
#
- script:
lang: painless
if: "ctx.json?.trusted_circle_ids != null && ctx.json?.trusted_circle_ids instanceof String"
description: >
Convert trusted_circles_ids from CSV to an array.
source: >
def lst = Stream.of(ctx.json.trusted_circle_ids.splitOnToken(',')).filter(s -> !s.isEmpty()).collect(Collectors.toList());
if (lst.size() > 0) {
ctx.json.trusted_circle_ids = lst;
} else {
ctx.json.remove('trusted_circle_ids');
}
#
# Split detail field and append each component to ECS tags field.
#
- split:
field: json.detail
separator: '(?<!\\),' # unescaped comma.
ignore_missing: true
on_failure:
- append:
field: error.message
value: "failed to split detail field '{{{json.detail}}}': {{{_ingest.on_failure_message}}}"
- foreach:
field: json.detail
ignore_missing: true
processor:
append:
field: tags
value: "{{{ _ingest._value }}}"
#
# Convert certain fields to the correct value
#
- convert:
field: json.id
type: string
if: "ctx.json?.id != null"
- convert:
field: json.source_feed_id
type: string
if: "ctx.json?.source_feed_id != null"
- convert:
field: json.update_id
type: string
if: "ctx.json?.update_id != null"
- convert:
field: json.import_session_id
type: string
if: "ctx.json?.import_session_id != null"
#
# Remove fields converted to an ECS field.
#
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
ignore_failure: true
ignore_missing: true
- remove:
field:
- json.asn
- json.date_first
- json.date_last
- json.detail
- json.lat
- json.lon
ignore_missing: true
#
# Save fields without an ECS mapping under `threatintel.anomalithreatstream`.
#
- rename:
field: json
target_field: anomali.threatstream
on_failure:
- append:
field: error.message
value: "{{ _ingest.on_failure_message }}"