A parser consists in a sequence of stages organized under a pipeline that modifies the event on the fly.
The following pipeline is made of three stages (stage1
, stage2
and stage3
)
with the execution of stage2
and stage3
conditonned to a filter that evaluates the value of the event field message.log_type
at the end of stage2
.
pipeline:
- name: stage1
- name: stage2
filter: '{{stage1.message.log_type == "network"}}'
- name: stage3
filter: '{{stage1.message.log_type == "process"}}'
A stage is a parsing step that denotes changes in the event that participate in the same semantic definition. A stage can create, update and delete fields by chaining execution of actions.
For example, the following snippet shows a stage named my_stage
which consists in two actions.
my_stage:
actions:
- set:
destination.ip: 127.0.0.1
source.ip: 127.0.0.2
- delete:
- event.data
Common stages are provided to ease development of new intakes.
external
attribute must be used to reference a common stage along with its optional properties.
pipeline:
- name: parsed_event
external:
name: example.external-stage
properties:
prop1: val1
prop2: val2
The json.parse-json
stage can be used to deserialize a json from a string.
Per default, the message
field is parsed but this property can be overwritten to specify any field.
Example
In this example, the parser produces an event with fields that take their value from a deserialized json field. Below is the example of the input event.
{
"message": "{'protocol':'tcp','traffic':{'source':'127.0.0.1','target':'8.8.8.8'}}"
}
Below is the parsing pipeline that deserialize the message
field and set the source.ip
and destination.ip
fields.
pipeline:
- name: parsed_event
external:
name: json.parseJSON
- name: network
stages:
network:
actions:
- set:
destination.ip: "{{parsed_event.message.traffic.target}}"
source.ip: "{{parsed_event.message.traffic.source}}"
The following shows the produced event.
{
"message": "{'protocol':'tcp','traffic':{'source':'127.0.0.1','target':'8.8.8.8'}}",
"source": {
"ip": "127.0.0.1"
},
"destination": {
"ip": "8.8.8.8"
}
}
The kv.parse-kv
stage can be used to deserialize a key-value string.
Per default, the message
field of the original event is parsed but this property can be overwritten to specify any field. Besides, default field separator is any whitespace \s
and value separator is =
however both can be overwritten with respective properties: item_sep
and value_sep
.
Example
In this example, the parser produces an event with fields and values parsed from a key-value string. Below is the example of the input event, a classical postfix
event.
{
"message": "[email protected], relay=mail.isp.com, delay=xxx, delays=xxx, dsn=2.0.0, status=sent (250 2.0.0 OK)"
}
Below is the parsing pipeline that deserialize the message
field and set the destination.user.email
field.
pipeline:
- name: parsed_event
external:
name: kv.parseKV
properties:
item_sep: ',\s'
- name: email
stages:
email:
actions:
- set:
destination.user.email: "{{parsed_event.message.to}}"
The following shows the produced event.
{
"message": "[email protected], relay=mail.isp.com, delay=xxx, delays=xxx, dsn=2.0.0, status=sent (250 2.0.0 OK)",
"destination": {
"user": {
"email": "[email protected]"
}
}
}
The grok.match
stage can be used to match a field against a Grok pattern.
The grok pattern must be specified by means of the pattern
property.
Per default, the message
field is parsed but this property can be overwritten to specify any field.
Parsing's result replaces the parsed content.
Custom patterns can be specified with the custom_patterns
property.
Example
In this example, the parser produces an event with fields and values parsed from a groked string. Below is the example of the input event.
{
"message": "64.3.89.2 took 300 ms"
}
Below is the parsing pipeline that deserializes the message
field and set the destination.ip
field.
pipeline:
- name: parsed_event
external:
name: grok.match
properties:
input_field: original.message
output_field: message
pattern: '%{IP:client} took %{NUMBER:duration} ms'
- name: set_ip
stages:
set_ip:
actions:
- set:
destination.ip: '{{parsed_event.message.client}}'
The following shows the produced event.
{
"message": "64.3.89.2 took 300 ms",
"destination": {
"ip": "64.3.89.2"
}
}
The date.parse
stage can be used to parse a date field.
This stage accepts, as optional properties, the format to parse the date (by default, the stage try to autodetect the format) and the IANA timezone of the parsed date (by default, "UTC").
Per default, the @timestamp
field of the original message is parsed but this property can be overwritten to specify any field. The result of the resulting date is inserted in the message
field of the current stage.
Example
In this example, the parser produces an event with a parsed date. Below is the example of the input event.
{
"date": "May 21, 2021 at 11:04:35"
}
Below is the parsing pipeline that deserializes the date
field and set the @timestamp
field.
pipeline:
- name: parsed_date
external:
name: date.parse
properties:
input_field: original.date
output_field: date
format: "%B %d, %Y at %H:%M:%S"
timezone: "America/New_York"
- name: set-date
stages:
set_date:
actions:
- set:
@timestamp: '{{parsed_date.date}}'
The following shows the produced event.
{
"@timestamp": "2021-05-21T11:04:35Z",
"date": "May 21, 2021 at 11:04:35"
}
The dsv.parse-dsv
stage can be used to extract values from a delimiter-separated values string.
This stage accepts the list of columns and, as optional, the delimiter (by default, the delimiter is the comma ',').
Per default, the message
field of the original message is parsed but this property can be overwritten to specify any field.
Example
In this example, the parser produces an event with a delimiter-separated values string. Below is the example of the input event.
{
"message": "2020/12/04 16:47:48;LOGIN;jenkins;2305"
}
Below is the parsing pipeline that deserializes the message
field and set the user
field.
pipeline:
- name: parsed_dsv
external:
name: dsv.parse-dsv
properties:
columnnames:
- date
- action
- username
- user_id
delimiter: ";"
- name: set_user_id
stages:
set_user_id:
actions:
- set:
user.name: "{{parsed_dsv.message.username}}"
user.id: "{{parsed_dsv.message.user_id}}"
The following shows the produced event.
{
"message": "2020/12/04 16:47:48;LOGIN;jenkins;2305",
"user": {
"id": "2305",
"name": "jenkins"
}
}
The xml.parse-xml
stage can be used to transform a xml string into a dictionary.
By default, the message
field is parsed but this property can be overwritten to specify any field.
Example
In this example, the parser produces an event with fields that take their value from a deserialized xml field. Below is the example of the input event.
{
"message": "<Event><Id>1234</Id><Provider><Name>system</Name><Id>6</Id></Provider>"
}
Below is the parsing pipeline that deserializes the message
field and sets the agent.name
and agent.id
fields.
pipeline:
- name: parsed_event
external:
name: xml.parse-xml
- name: agent
stages:
agent:
actions:
- set:
agent.id: "{{parsed_event.message.Event.Provider.Id}}"
agent.name: "{{parsed_event.message.Event.Provider.Name}}"
The following shows the produced event.
{
"message": "<Event><Id>1234</Id><Provider><Name>system</Name><Id>6</Id></Provider>"
"agent": {
"id": 6,
"name": "system"
}
}
The xml.parse-windows-event
stage can be used to transform a xml windows event into a dictionary.
By default, the message
field is parsed but this property can be overwritten to specify any field.
The parser produces an event in two parts:
- The system part, holding data extracted from the System tag
- The data part, flattening the data tags as key-value structure
The following event:
<Event><System><EventID>1234</EventID><Execution ProcessID="592" ThreadID="6452"/></System><EventData><Data Name="Key">Value</Data></EventData></Event>
will be transformed in the following structure:
{
"System": {
"EventID": "1234",
"Execution": {
"ProcessID": "592",
"ThreadID": "6452"
}
},
"EventData": {
"Key": "Value"
}
}
Example
Given the previous event as the input of the parser.
{
"message": "<Event><System><EventID>1234</EventID><Execution ProcessID=\"592\" ThreadID=\"6452\"/></System><EventData><Data Name=\"Key\">Value</Data></EventData></Event>"
}
Below is the parsing pipeline that deserialize the message
field and set the fields.
pipeline:
- name: parsed_event
external:
name: xml.parse-windows-event
- name: set-fields
stages:
set-fields:
actions:
- set:
process.pid: "{{parsed_event.message.System.Execution.ProcessID}}"
process.thread.id: "{{parsed_event.message.System.Execution.ThreadID}}"
event.id: "{{parsed_event.message.System.EventID}}"
custom.key: "{{parsed_event.message.EventData.Key}}"
The following shows the produced event.
{
"message": "<Event><System><EventID>1234</EventID><Execution ProcessID=\"592\" ThreadID=\"6452\"/></System><EventData><Data Name=\"Key\">Value</Data></EventData></Event>",
"process": {
"pid": 592,
"thread": {
"id": 6452
}
},
"event": { "id": 1234 },
"custom": { "key": "Value" }
}
The cef.parse-cef
stage can be used to parse CEF messages.
By default, the message
field is parsed but this property can be overwritten to specify any field.
This stage will extract the following keys from the header definition:
CEFVersion
: The CEF versionDeviceVendor
: The vendor of the product that generated the logDeviceProduct
: The product that generated the logDeviceVersion
: The version of the productDeviceEventClassID
: An unique identifierName
: A human-readable description of the eventSeverity
: The severity of the event
and will extract each key-value from the extension.
Example
In this example, the parser produces an event with fields that take their value from a CEF message. Below is an example of the input event.
{
"message": "CEF:0|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232"
}
Below is the parsing pipeline that deserializes the message
field and sets some fields.
pipeline:
- name: parsed_event
external:
name: cef.parse-cef
- name: agent
stages:
agent:
actions:
- set:
event.id: "{{parsed_event.message.DeviceEventClassID}}"
event.severity: "{{parsed_event.message.Severity}}"
source.ip: "{{parsed_event.message.src}}"
source.port: "{{parsed_event.message.spt}}"
destination.ip: "{{parsed_event.message.dst}}"
observer.vendor: "{{parsed_event.message.DeviceVendor}}"
observer.product: "{{parsed_event.message.DeviceProduct}}"
observer.version: "{{parsed_event.message.DeviceVersion}}"
The following shows the produced event.
{
"message": "CEF:0|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232"
"source": {
"ip": "10.0.0.1",
"port": 1232
},
"destination": {
"ip": "2.1.2.2"
},
"event": {"id": "100", "severity": 10},
"observer": {
"vendor": "Security",
"product": "threatmanager",
"version": "1.0"
}
}
The leef.parse-leef
stage can be used to parse LEEF messages (supported version of LEEF are 1 and 2)
Per default, the message
field is parsed but this property can be overwritten to specify any field.
This stage will extract the following keys from the header definition:
LEEFVersion
: The LEEF versionDeviceVendor
: The vendor of the product that generated the logDeviceProduct
: The product that generated the logDeviceVersion
: The version of the productDeviceEventClassID
: An unique identifier
and will extract each key-value from the extension.
Example
In this example, the parser produces an event with fields that take their value from a CEF message. Below is the example of an input event.
{
"message": "LEEF:2|Security|threatmanager|1.0|100|^|src=10.0.0.1^dst=2.1.2.2^spt=1232"
}
Below is the parsing pipeline that deserializes the message
field and sets some fields.
pipeline:
- name: parsed_event
external:
name: leef.parse-leef
- name: agent
stages:
agent:
actions:
- set:
event.id: "{{parsed_event.message.DeviceEventClassID}}"
source.ip: "{{parsed_event.message.src}}"
source.port: "{{parsed_event.message.spt}}"
destination.ip: "{{parsed_event.message.dst}}"
observer.vendor: "{{parsed_event.message.DeviceVendor}}"
observer.product: "{{parsed_event.message.DeviceProduct}}"
observer.version: "{{parsed_event.message.DeviceVersion}}"
The following shows the produced event.
{
"message": "LEEF:2|Security|threatmanager|1.0|100|^|src=10.0.0.1^dst=2.1.2.2^spt=1232"
"source": {
"ip": "10.0.0.1",
"port": 1232
},
"destination": {
"ip": "2.1.2.2"
},
"event": {"id": "100"},
"observer": {
"vendor": "Security",
"product": "threatmanager",
"version": "1.0"
}
}
An action is an elementary operation that can create, update and delete fields. The execution of an action can be conditionned to a filter.
Sets the value of one or more fields in the final version of the event.
The field in the final version of the event can be specified with a dotted path (i.e. field1
, field1.sub-field1
, …).
The value can either be a constant (i.e. 'my-constant'
, 42
, …) or a reference to the value of another field in the stage (i.e. {{stage1.my-field1.attribute2}}
).
If the value cannot be computed or is empty, the field is not modified.
Example:
- set:
source.ip: 127.0.0.1
destination.ip: { { stage1.target.ip } }
filter: '{{stage1.log_type == "network"}}'
Sets value of one or more fields according the value of a source field and a dictionary that connect values.
The field in event's final version can be specified with a dotted path (i.e. field1
, field1.sub-field1
, …).
An optional fallback value can be defined. If the value of the source field doesn't match any entry of the mapping dictionary, this fallback value will be used to set the target field. If no fallback value is defined and the value of the source field doesn't match any entries, the target field will not be created in the final event.
Example:
- translate:
dictionary:
200: "OK"
201: "Created"
204: "No Content"
400: "Bad Request"
401: "Unauthorized"
403: "Forbidden"
404: "Not Found"
500: "Internal Server Error"
501: "Not Implemented"
502: "Bad Gateway"
503: "Service Unavailable"
504: "Gateway Timeout"
mapping:
http.response.status_code: http.response.status_message
api.status_code: api.status_message
fallback: "Request Processed"
filter: '{{stage1.log_type == "network"}}'
Each stage generates a layer with a set of produced fields in the stage.
You can refer one of these field from another field, a filter or the input of a next common stage.
To refer a field, use a jinja placeholder (ie {{xxxx}}
) with the name of the referred layed (ie, the name of the stage) as the prefix then the path to the field in dot-notation
(eg, to refer the value of the field date
from the parsed_date
stage, use {{parsed_date.date}}
).
For incoming events, a first layer named original
is created by Ingest and the ECS envelop as received.
Reference to another field can be extended with filters. Filters are separated from the field path by a pipe symbol (|). Multiple filters can be chained. The output of a filter is applied to the next.
For example, {{stage1.username |strip |upper}}
removes the whitespace and returns the uppercase value of the username
variable computed in stage1
.
Ingest makes available the jinja built-in filters. Most popular filters are:
filter | description |
---|---|
abs |
returns the absolute value of the variable. |
capitalize |
returns the first character uppercase, all others lowercase. |
float |
converts the variable in float |
int |
converts the variable in int |
length |
returns the number of items |
lower |
returns the value all lowercase |
max |
returns the largest item from the variable |
min |
returns the smallest item from the variable |
strip |
returns the variable removed from heading and leading whitespaces |
upper |
returns the value all uppercase |
Ingest extends these built-in filters with a set of custom filters:
filter | description |
---|---|
basename |
returns the base name of a path (support unix and windows path) |
dirname |
returns the directory name of a path (support unix and windows path) |
to_rfc3339(value: Any, format=None) |
converts and formats any date as rfc3339 string |
to_iso8601(value: Any, format=None) |
converts and formats any date as iso8601 string |
re_match |
tests the value against an regular expression (the whole value) |
re_search |
tests if a subset of the value match the regular expression |
Deletes fields in the final version of the event.
Example:
- delete:
- source.ip
- destination.ip
filter: '{{stage1.log_type != "network"}}'
- See Azure Front Door parser as an introduction.
- See AWS Flow logs parser for textual extraction.
- See Wallix Bastion parser for filters usage.
- See ProofPoint TAP parser and Azure Windows parser for complex parsers.