Skip to content

Commit

Permalink
Field name conditions
Browse files Browse the repository at this point in the history
* Include/ExcludeFieldCondition is now a field name condition.
* DetectionItemConditions additionally process field name conditions.
* Tracking of processing items applied to Sigma rule field list items.
* Added FieldNameProcessingItemAppliedCondition.
* Fix: all processing transformation classes unified to dataclasses.
* Release 0.8.0 because it's a breaking change.
* Documentation
  • Loading branch information
thomaspatzke committed Aug 10, 2022
1 parent b0e8dc5 commit 535c3a9
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 45 deletions.
31 changes: 25 additions & 6 deletions docs/Processing_Pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,15 @@ Example:
Conditions
**********

There are two types of conditions: rule conditions which are evaluated to the whole rule and
detection item conditions that are evaluated for each detection item.
.. versionadded:: 0.8.0
Field name conditions.

There are three types of conditions:

* Rule conditions are evaluated to the whole rule.
* Detection item conditions are evaluated for each detection item.
* Field name conditions are evaluated for field names that can be located in detection items or in
the field name list of a Sigma rule.

Rule Conditions
===============
Expand All @@ -132,16 +139,27 @@ Detection Item Conditions
:header-rows: 1

"Identifier", "Class"
"include_fields", "IncludeFieldCondition"
"exclude_fields", "ExcludeFieldCondition"
"match_string", "MatchStringCondition"
"processing_item_applied", "DetectionItemProcessingItemAppliedCondition"

.. autoclass:: sigma.processing.conditions.IncludeFieldCondition
.. autoclass:: sigma.processing.conditions.ExcludeFieldCondition
.. autoclass:: sigma.processing.conditions.MatchStringCondition
.. autoclass:: sigma.processing.conditions.DetectionItemProcessingItemAppliedCondition

Field Name Conditions
=====================

.. csv-table:: Field Name Identifiers
:header-rows: 1

"Identifier", "Class"
"include_fields", "IncludeFieldCondition"
"exclude_fields", "ExcludeFieldCondition"
"processing_item_applied", "FieldNameProcessingItemAppliedCondition"

.. autoclass:: sigma.processing.conditions.IncludeFieldCondition
.. autoclass:: sigma.processing.conditions.ExcludeFieldCondition
.. autoclass:: sigma.processing.conditions.FieldNameProcessingItemAppliedCondition

Base Classes
============

Expand All @@ -154,6 +172,7 @@ and not be distributed via the main pySigma distribution.

.. autoclass:: sigma.processing.conditions.RuleProcessingCondition
.. autoclass:: sigma.processing.conditions.DetectionItemProcessingCondition
.. autoclass:: sigma.processing.conditions.FieldNameProcessingCondition
.. autoclass:: sigma.processing.conditions.ValueProcessingCondition

.. _transformations:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pySigma"
version = "0.7.3"
version = "0.8.0"
license = "LGPL-2.1-only"
description = "Sigma rule processing and conversion tools"
authors = ["Thomas Patzke <[email protected]>"]
Expand Down
41 changes: 32 additions & 9 deletions sigma/processing/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ class RuleProcessingCondition(ABC):
def match(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule : SigmaRule) -> bool:
"""Match condition on Sigma rule."""

class FieldNameProcessingCondition(ABC):
"""
Base class for conditions on field names in detection items, Sigma rule field lists and other
use cases that require matching on field names without detection item context.
"""
@abstractmethod
def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> bool:
"The method match is called for each field name and must return a bool result."

@dataclass
class DetectionItemProcessingCondition(ABC):
"""
Expand Down Expand Up @@ -117,9 +126,9 @@ class RuleProcessingItemAppliedCondition(RuleProcessingCondition):
def match(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule : SigmaRule) -> bool:
return rule.was_processed_by(self.processing_item_id)

### Detection Item Condition Classes ###
### Field Name Condition Classes ###
@dataclass
class IncludeFieldCondition(DetectionItemProcessingCondition):
class IncludeFieldCondition(FieldNameProcessingCondition):
"""
Matches on field name if it is contained in fields list. The parameter 'type' determines if field names are matched as
plain string ("plain") or regular expressions ("re").
Expand All @@ -142,19 +151,19 @@ def __post_init__(self):
else:
raise SigmaConfigurationError(f"Invalid detection item field name condition type '{self.type}', supported types are 'plain' or 're'.")

def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool:
if detection_item.field is None:
def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: Optional[str]) -> bool:
if field is None:
return False
elif self.type == "plain":
return detection_item.field in self.fields
return field in self.fields
else: # regular expression matching
try:
return any((
pattern.match(detection_item.field)
pattern.match(field)
for pattern in self.patterns
))
except Exception as e:
msg = f" (while processing detection item: field={str(detection_item.field)} value={str(detection_item.value)})"
msg = f" (while processing field '{field}'"
if len (e.args) > 1:
e.args = (e.args[0] + msg,) + e.args[1:]
else:
Expand All @@ -167,6 +176,7 @@ class ExcludeFieldCondition(IncludeFieldCondition):
def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool:
return not super().match(pipeline, detection_item)

### Detection Item Condition Classes ###
@dataclass
class MatchStringCondition(ValueProcessingCondition):
"""
Expand Down Expand Up @@ -205,6 +215,16 @@ class DetectionItemProcessingItemAppliedCondition(DetectionItemProcessingConditi
def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool:
return detection_item.was_processed_by(self.processing_item_id)

@dataclass
class FieldNameProcessingItemAppliedCondition(DetectionItemProcessingCondition):
"""
Checks if processing item was applied to a field name.
"""
processing_item_id : str

def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> bool:
return pipeline.field_was_processed_by(field, self.processing_item_id)

### Condition mappings between rule identifier and class

rule_conditions : Dict[str, RuleProcessingCondition] = {
Expand All @@ -213,8 +233,11 @@ def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detect
"processing_item_applied": RuleProcessingItemAppliedCondition,
}
detection_item_conditions : Dict[str, DetectionItemProcessingCondition] = {
"include_fields": IncludeFieldCondition,
"exclude_fields": ExcludeFieldCondition,
"match_string": MatchStringCondition,
"processing_item_applied": DetectionItemProcessingItemAppliedCondition,
}
field_name_conditions : Dict[str, DetectionItemProcessingCondition] = {
"include_fields": IncludeFieldCondition,
"exclude_fields": ExcludeFieldCondition,
"processing_item_applied": FieldNameProcessingItemAppliedCondition,
}
69 changes: 61 additions & 8 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from collections import defaultdict
from dataclasses import dataclass, field
from functools import partial
from typing import List, Literal, Mapping, Set, Any, Callable, Iterable, Dict, Tuple, Optional
from sigma.processing.tracking import FieldMappingTracking
from sigma.rule import SigmaDetectionItem, SigmaRule
from sigma.processing.transformations import transformations, Transformation
from sigma.processing.conditions import rule_conditions, RuleProcessingCondition, detection_item_conditions, DetectionItemProcessingCondition
from sigma.processing.conditions import rule_conditions, RuleProcessingCondition, detection_item_conditions, DetectionItemProcessingCondition, field_name_conditions, FieldNameProcessingCondition
from sigma.exceptions import SigmaConfigurationError
import yaml

Expand All @@ -23,6 +25,9 @@ class ProcessingItem:
detection_item_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all
detection_item_condition_negation : bool = False
detection_item_conditions : List[DetectionItemProcessingCondition] = field(default_factory=list)
field_name_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all
field_name_condition_negation : bool = False
field_name_conditions : List[FieldNameProcessingCondition] = field(default_factory=list)
identifier : Optional[str] = None

@classmethod
Expand All @@ -44,6 +49,11 @@ def from_dict(cls, d : dict):
d.get("detection_item_conditions", list()),
detection_item_conds := list()
),
(
field_name_conditions,
d.get("field_name_conditions", list()),
field_name_conds := list()
),
):
for i, cond_def in enumerate(cond_defs):
try:
Expand Down Expand Up @@ -72,9 +82,11 @@ def from_dict(cls, d : dict):
}
rule_condition_linking = condition_linking[d.get("rule_cond_op", "and")] # default: conditions are linked with and operator
detection_item_condition_linking = condition_linking[d.get("detection_item_cond_op", "and")] # same for detection item conditions
field_name_condition_linking = condition_linking[d.get("field_name_cond_op", "and")] # same for field name conditions

rule_condition_negation = d.get("rule_cond_not", False)
detection_item_condition_negation = d.get("detection_item_cond_not", False)
field_name_condition_negation = d.get("field_name_cond_not", False)

# Transformation
try:
Expand All @@ -90,14 +102,14 @@ def from_dict(cls, d : dict):
params = {
k: v
for k, v in d.items()
if k not in {"rule_conditions", "rule_cond_op", "rule_cond_not", "detection_item_conditions", "detection_item_cond_op", "detection_item_cond_not", "type", "id"}
if k not in {"rule_conditions", "rule_cond_op", "rule_cond_not", "detection_item_conditions", "detection_item_cond_op", "detection_item_cond_not", "field_name_conditions", "field_name_cond_op", "field_name_cond_not", "type", "id"}
}
try:
transformation = transformation_class(**params)
except (SigmaConfigurationError, TypeError) as e:
raise SigmaConfigurationError("Error in transformation: " + str(e)) from e

return cls(transformation, rule_condition_linking, rule_condition_negation, rule_conds, detection_item_condition_linking, detection_item_condition_negation, detection_item_conds, identifier)
return cls(transformation, rule_condition_linking, rule_condition_negation, rule_conds, detection_item_condition_linking, detection_item_condition_negation, detection_item_conds, field_name_condition_linking, field_name_condition_negation, field_name_conds, identifier)

def __post_init__(self):
self.transformation.set_processing_item(self) # set processing item in transformation object after it is instantiated
Expand All @@ -120,15 +132,33 @@ def apply(self, pipeline : "ProcessingPipeline", rule : SigmaRule) -> Tuple[Sigm
return False

def match_detection_item(self, pipeline : "ProcessingPipeline", detection_item : SigmaDetectionItem) -> bool:
"""Evalutates detection item conditions from processing item to detection item and returns
result."""
cond_result = self.detection_item_condition_linking([
"""
Evalutates detection item and field name conditions from processing item to detection item
and returns result.
"""
detection_item_cond_result = self.detection_item_condition_linking([
condition.match(pipeline, detection_item)
for condition in self.detection_item_conditions
])
if self.detection_item_condition_negation:
cond_result = not cond_result
return not self.detection_item_conditions or cond_result
detection_item_cond_result = not detection_item_cond_result

field_name_cond_result = self.match_field_name(pipeline, detection_item.field)

return detection_item_cond_result and field_name_cond_result

def match_field_name(self, pipeline : "ProcessingPipeline", field : Optional[str]) -> bool:
"""
Evaluate field name conditions on field names and return result.
"""
field_name_cond_result = self.field_name_condition_linking([
condition.match(pipeline, field)
for condition in self.field_name_conditions
])
if self.field_name_condition_negation:
field_name_cond_result = not field_name_cond_result

return field_name_cond_result

@dataclass
class ProcessingPipeline:
Expand All @@ -152,6 +182,7 @@ class ProcessingPipeline:
# TODO: move this to parameters or return values of apply().
applied : List[bool] = field(init=False, compare=False, default_factory=list) # list of applied items as booleans. If True, the corresponding item at the same position was applied
applied_ids : Set[str] = field(init=False, compare=False, default_factory=set) # set of identifiers of applied items, doesn't contains items without identifier
field_name_applied_ids : Dict[str, Set[str]] = field(init=False, compare=False, default_factory=partial(defaultdict, set)) # Mapping of field names from rule fields list to set of applied processing items
field_mappings : FieldMappingTracking = field(init=False, compare=False, default_factory=FieldMappingTracking) # Mapping between initial field names and finally mapped field name.
state : Mapping[str, Any] = field(init=False, compare=False, default_factory=dict) # pipeline state: allows to set variables that can be used in conversion (e.g. indices, data model names etc.)

Expand Down Expand Up @@ -185,6 +216,7 @@ def apply(self, rule : SigmaRule) -> SigmaRule:
"""Apply processing pipeline on Sigma rule."""
self.applied = list()
self.applied_ids = set()
self.field_name_applied_ids = defaultdict(set)
self.field_mappings = FieldMappingTracking()
self.state = dict()
for item in self.items:
Expand All @@ -194,6 +226,27 @@ def apply(self, rule : SigmaRule) -> SigmaRule:
self.applied_ids.add(itid)
return rule

def track_field_processing_items(self, src_field : str, dest_field : List[str], processing_item_id : Optional[str]) -> None:
"""
Track processing items that were applied to field names. This adds the processing_item_id to
the set of applied processing items from src_field and assigns a copy of this set ass
tracking set to all fields in dest_field.
"""
applied_identifiers : Set = self.field_name_applied_ids[src_field]
if processing_item_id is not None:
applied_identifiers.add(processing_item_id)
del self.field_name_applied_ids[src_field]
for field in dest_field:
self.field_name_applied_ids[field] = applied_identifiers.copy()

def field_was_processed_by(self, field : Optional[str], processing_item_id : str) -> bool:
"""
Check if field name was processed by a particular processing item.
"""
if field is None:
return False
return processing_item_id in self.field_name_applied_ids[field]

def __add__(self, other : Optional["ProcessingPipeline"]) -> "ProcessingPipeline":
"""Concatenate two processing pipelines and merge their variables."""
if other is None:
Expand Down
22 changes: 20 additions & 2 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from functools import partial
from sigma.conditions import ConditionOR, SigmaCondition
from typing import Any, Iterable, List, Dict, Optional, Union, Pattern, Iterator
from typing import Any, Iterable, List, Dict, Optional, Set, Union, Pattern, Iterator
from dataclasses import dataclass, field
import dataclasses
import random
Expand Down Expand Up @@ -37,6 +38,7 @@ def processing_item_applied(self, d : Union[SigmaRule, SigmaDetection, SigmaDete
"""Mark detection item or detection as applied."""
d.add_applied_processing_item(self.processing_item)

@dataclass
class DetectionItemTransformation(Transformation):
"""
Iterates over all detection items of a Sigma rule and calls the apply_detection_item method
Expand Down Expand Up @@ -76,6 +78,7 @@ def apply(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule
for detection in rule.detection.detections.values():
self.apply_detection(detection)

@dataclass
class FieldMappingTransformationBase(DetectionItemTransformation):
"""
Transformation that is applied to detection items and additionally the field list of a Sigma
Expand All @@ -88,10 +91,24 @@ def apply_field_name(self, field : str) -> List[str]:
a list of strings that are expanded into a new field list.
"""

def _apply_field_name(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> List[str]:
"""
Evaluate field name conditions and perform transformation with apply_field_name() method if
condition matches, else return original value.
"""
if self.processing_item is None or self.processing_item.match_field_name(pipeline, field):
result = self.apply_field_name(field)
if self.processing_item is not None:
pipeline.track_field_processing_items(field, result, self.processing_item.identifier)
return result
else:
return [ field ]

def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule) -> None:
_apply_field_name = partial(self._apply_field_name, pipeline)
rule.fields = [
item
for mapping in map(self.apply_field_name, rule.fields)
for mapping in map(_apply_field_name, rule.fields)
for item in mapping
]
return super().apply(pipeline, rule)
Expand Down Expand Up @@ -149,6 +166,7 @@ def apply_value(self, field : str, val : SigmaType) -> Optional[Union[SigmaType,
The type annotation of the val argument is used to skip incompatible values.
"""

@dataclass
class ConditionTransformation(Transformation):
"""
Iterates over all rule conditions and calls the apply_condition method for each condition. Automatically
Expand Down
4 changes: 2 additions & 2 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ def test_convert_dropped_detection_item_and():
ProcessingPipeline([
ProcessingItem(
DropDetectionItemTransformation(),
detection_item_conditions=[ IncludeFieldCondition(fields=["EventID"]) ],
field_name_conditions=[ IncludeFieldCondition(fields=["EventID"]) ],
),
]),
)
Expand All @@ -1104,7 +1104,7 @@ def test_convert_dropped_detection_item_or():
ProcessingPipeline([
ProcessingItem(
DropDetectionItemTransformation(),
detection_item_conditions=[ IncludeFieldCondition(fields=["EventID"]) ],
field_name_conditions=[ IncludeFieldCondition(fields=["EventID"]) ],
),
]),
)
Expand Down
Loading

0 comments on commit 535c3a9

Please sign in to comment.