From 535c3a9a4df208706689b3a8bbf22a8c001f900e Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Thu, 11 Aug 2022 00:14:20 +0200 Subject: [PATCH] Field name conditions * Include/ExcludeFieldCondition is now a field name condition. * DetectionItemConditions additionally process field name conditions. * Tracking of processing items applied to Sigma rule field list items. * Added FieldNameProcessingItemAppliedCondition. * Fix: all processing transformation classes unified to dataclasses. * Release 0.8.0 because it's a breaking change. * Documentation --- docs/Processing_Pipelines.rst | 31 ++++++++--- pyproject.toml | 2 +- sigma/processing/conditions.py | 41 ++++++++++---- sigma/processing/pipeline.py | 69 +++++++++++++++++++++--- sigma/processing/transformations.py | 22 +++++++- tests/test_conversion_base.py | 4 +- tests/test_processing_conditions.py | 38 ++++++++----- tests/test_processing_pipeline.py | 17 ++++++ tests/test_processing_transformations.py | 42 +++++++++++++-- 9 files changed, 221 insertions(+), 45 deletions(-) diff --git a/docs/Processing_Pipelines.rst b/docs/Processing_Pipelines.rst index 852a0392..f48e46ea 100644 --- a/docs/Processing_Pipelines.rst +++ b/docs/Processing_Pipelines.rst @@ -107,8 +107,15 @@ Example: Conditions ********** -There are two types of conditions: rule conditions which are evaluated to the whole rule and -detection item conditions that are evaluated for each detection item. +.. versionadded:: 0.8.0 + Field name conditions. + +There are three types of conditions: + +* Rule conditions are evaluated to the whole rule. +* Detection item conditions are evaluated for each detection item. +* Field name conditions are evaluated for field names that can be located in detection items or in + the field name list of a Sigma rule. Rule Conditions =============== @@ -132,16 +139,27 @@ Detection Item Conditions :header-rows: 1 "Identifier", "Class" - "include_fields", "IncludeFieldCondition" - "exclude_fields", "ExcludeFieldCondition" "match_string", "MatchStringCondition" "processing_item_applied", "DetectionItemProcessingItemAppliedCondition" -.. autoclass:: sigma.processing.conditions.IncludeFieldCondition -.. autoclass:: sigma.processing.conditions.ExcludeFieldCondition .. autoclass:: sigma.processing.conditions.MatchStringCondition .. autoclass:: sigma.processing.conditions.DetectionItemProcessingItemAppliedCondition +Field Name Conditions +===================== + +.. csv-table:: Field Name Identifiers + :header-rows: 1 + + "Identifier", "Class" + "include_fields", "IncludeFieldCondition" + "exclude_fields", "ExcludeFieldCondition" + "processing_item_applied", "FieldNameProcessingItemAppliedCondition" + +.. autoclass:: sigma.processing.conditions.IncludeFieldCondition +.. autoclass:: sigma.processing.conditions.ExcludeFieldCondition +.. autoclass:: sigma.processing.conditions.FieldNameProcessingItemAppliedCondition + Base Classes ============ @@ -154,6 +172,7 @@ and not be distributed via the main pySigma distribution. .. autoclass:: sigma.processing.conditions.RuleProcessingCondition .. autoclass:: sigma.processing.conditions.DetectionItemProcessingCondition +.. autoclass:: sigma.processing.conditions.FieldNameProcessingCondition .. autoclass:: sigma.processing.conditions.ValueProcessingCondition .. _transformations: diff --git a/pyproject.toml b/pyproject.toml index 5352d4f6..5d762967 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pySigma" -version = "0.7.3" +version = "0.8.0" license = "LGPL-2.1-only" description = "Sigma rule processing and conversion tools" authors = ["Thomas Patzke "] diff --git a/sigma/processing/conditions.py b/sigma/processing/conditions.py index febce43a..a12fd2fc 100644 --- a/sigma/processing/conditions.py +++ b/sigma/processing/conditions.py @@ -19,6 +19,15 @@ class RuleProcessingCondition(ABC): def match(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule : SigmaRule) -> bool: """Match condition on Sigma rule.""" +class FieldNameProcessingCondition(ABC): + """ + Base class for conditions on field names in detection items, Sigma rule field lists and other + use cases that require matching on field names without detection item context. + """ + @abstractmethod + def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> bool: + "The method match is called for each field name and must return a bool result." + @dataclass class DetectionItemProcessingCondition(ABC): """ @@ -117,9 +126,9 @@ class RuleProcessingItemAppliedCondition(RuleProcessingCondition): def match(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule : SigmaRule) -> bool: return rule.was_processed_by(self.processing_item_id) -### Detection Item Condition Classes ### +### Field Name Condition Classes ### @dataclass -class IncludeFieldCondition(DetectionItemProcessingCondition): +class IncludeFieldCondition(FieldNameProcessingCondition): """ Matches on field name if it is contained in fields list. The parameter 'type' determines if field names are matched as plain string ("plain") or regular expressions ("re"). @@ -142,19 +151,19 @@ def __post_init__(self): else: raise SigmaConfigurationError(f"Invalid detection item field name condition type '{self.type}', supported types are 'plain' or 're'.") - def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool: - if detection_item.field is None: + def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: Optional[str]) -> bool: + if field is None: return False elif self.type == "plain": - return detection_item.field in self.fields + return field in self.fields else: # regular expression matching try: return any(( - pattern.match(detection_item.field) + pattern.match(field) for pattern in self.patterns )) except Exception as e: - msg = f" (while processing detection item: field={str(detection_item.field)} value={str(detection_item.value)})" + msg = f" (while processing field '{field}'" if len (e.args) > 1: e.args = (e.args[0] + msg,) + e.args[1:] else: @@ -167,6 +176,7 @@ class ExcludeFieldCondition(IncludeFieldCondition): def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool: return not super().match(pipeline, detection_item) +### Detection Item Condition Classes ### @dataclass class MatchStringCondition(ValueProcessingCondition): """ @@ -205,6 +215,16 @@ class DetectionItemProcessingItemAppliedCondition(DetectionItemProcessingConditi def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem) -> bool: return detection_item.was_processed_by(self.processing_item_id) +@dataclass +class FieldNameProcessingItemAppliedCondition(DetectionItemProcessingCondition): + """ + Checks if processing item was applied to a field name. + """ + processing_item_id : str + + def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> bool: + return pipeline.field_was_processed_by(field, self.processing_item_id) + ### Condition mappings between rule identifier and class rule_conditions : Dict[str, RuleProcessingCondition] = { @@ -213,8 +233,11 @@ def match(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", detect "processing_item_applied": RuleProcessingItemAppliedCondition, } detection_item_conditions : Dict[str, DetectionItemProcessingCondition] = { - "include_fields": IncludeFieldCondition, - "exclude_fields": ExcludeFieldCondition, "match_string": MatchStringCondition, "processing_item_applied": DetectionItemProcessingItemAppliedCondition, +} +field_name_conditions : Dict[str, DetectionItemProcessingCondition] = { + "include_fields": IncludeFieldCondition, + "exclude_fields": ExcludeFieldCondition, + "processing_item_applied": FieldNameProcessingItemAppliedCondition, } \ No newline at end of file diff --git a/sigma/processing/pipeline.py b/sigma/processing/pipeline.py index d618d0f9..374bab59 100644 --- a/sigma/processing/pipeline.py +++ b/sigma/processing/pipeline.py @@ -1,9 +1,11 @@ +from collections import defaultdict from dataclasses import dataclass, field +from functools import partial from typing import List, Literal, Mapping, Set, Any, Callable, Iterable, Dict, Tuple, Optional from sigma.processing.tracking import FieldMappingTracking from sigma.rule import SigmaDetectionItem, SigmaRule from sigma.processing.transformations import transformations, Transformation -from sigma.processing.conditions import rule_conditions, RuleProcessingCondition, detection_item_conditions, DetectionItemProcessingCondition +from sigma.processing.conditions import rule_conditions, RuleProcessingCondition, detection_item_conditions, DetectionItemProcessingCondition, field_name_conditions, FieldNameProcessingCondition from sigma.exceptions import SigmaConfigurationError import yaml @@ -23,6 +25,9 @@ class ProcessingItem: detection_item_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all detection_item_condition_negation : bool = False detection_item_conditions : List[DetectionItemProcessingCondition] = field(default_factory=list) + field_name_condition_linking : Callable[[ Iterable[bool] ], bool] = all # any or all + field_name_condition_negation : bool = False + field_name_conditions : List[FieldNameProcessingCondition] = field(default_factory=list) identifier : Optional[str] = None @classmethod @@ -44,6 +49,11 @@ def from_dict(cls, d : dict): d.get("detection_item_conditions", list()), detection_item_conds := list() ), + ( + field_name_conditions, + d.get("field_name_conditions", list()), + field_name_conds := list() + ), ): for i, cond_def in enumerate(cond_defs): try: @@ -72,9 +82,11 @@ def from_dict(cls, d : dict): } rule_condition_linking = condition_linking[d.get("rule_cond_op", "and")] # default: conditions are linked with and operator detection_item_condition_linking = condition_linking[d.get("detection_item_cond_op", "and")] # same for detection item conditions + field_name_condition_linking = condition_linking[d.get("field_name_cond_op", "and")] # same for field name conditions rule_condition_negation = d.get("rule_cond_not", False) detection_item_condition_negation = d.get("detection_item_cond_not", False) + field_name_condition_negation = d.get("field_name_cond_not", False) # Transformation try: @@ -90,14 +102,14 @@ def from_dict(cls, d : dict): params = { k: v for k, v in d.items() - if k not in {"rule_conditions", "rule_cond_op", "rule_cond_not", "detection_item_conditions", "detection_item_cond_op", "detection_item_cond_not", "type", "id"} + if k not in {"rule_conditions", "rule_cond_op", "rule_cond_not", "detection_item_conditions", "detection_item_cond_op", "detection_item_cond_not", "field_name_conditions", "field_name_cond_op", "field_name_cond_not", "type", "id"} } try: transformation = transformation_class(**params) except (SigmaConfigurationError, TypeError) as e: raise SigmaConfigurationError("Error in transformation: " + str(e)) from e - return cls(transformation, rule_condition_linking, rule_condition_negation, rule_conds, detection_item_condition_linking, detection_item_condition_negation, detection_item_conds, identifier) + return cls(transformation, rule_condition_linking, rule_condition_negation, rule_conds, detection_item_condition_linking, detection_item_condition_negation, detection_item_conds, field_name_condition_linking, field_name_condition_negation, field_name_conds, identifier) def __post_init__(self): self.transformation.set_processing_item(self) # set processing item in transformation object after it is instantiated @@ -120,15 +132,33 @@ def apply(self, pipeline : "ProcessingPipeline", rule : SigmaRule) -> Tuple[Sigm return False def match_detection_item(self, pipeline : "ProcessingPipeline", detection_item : SigmaDetectionItem) -> bool: - """Evalutates detection item conditions from processing item to detection item and returns - result.""" - cond_result = self.detection_item_condition_linking([ + """ + Evalutates detection item and field name conditions from processing item to detection item + and returns result. + """ + detection_item_cond_result = self.detection_item_condition_linking([ condition.match(pipeline, detection_item) for condition in self.detection_item_conditions ]) if self.detection_item_condition_negation: - cond_result = not cond_result - return not self.detection_item_conditions or cond_result + detection_item_cond_result = not detection_item_cond_result + + field_name_cond_result = self.match_field_name(pipeline, detection_item.field) + + return detection_item_cond_result and field_name_cond_result + + def match_field_name(self, pipeline : "ProcessingPipeline", field : Optional[str]) -> bool: + """ + Evaluate field name conditions on field names and return result. + """ + field_name_cond_result = self.field_name_condition_linking([ + condition.match(pipeline, field) + for condition in self.field_name_conditions + ]) + if self.field_name_condition_negation: + field_name_cond_result = not field_name_cond_result + + return field_name_cond_result @dataclass class ProcessingPipeline: @@ -152,6 +182,7 @@ class ProcessingPipeline: # TODO: move this to parameters or return values of apply(). applied : List[bool] = field(init=False, compare=False, default_factory=list) # list of applied items as booleans. If True, the corresponding item at the same position was applied applied_ids : Set[str] = field(init=False, compare=False, default_factory=set) # set of identifiers of applied items, doesn't contains items without identifier + field_name_applied_ids : Dict[str, Set[str]] = field(init=False, compare=False, default_factory=partial(defaultdict, set)) # Mapping of field names from rule fields list to set of applied processing items field_mappings : FieldMappingTracking = field(init=False, compare=False, default_factory=FieldMappingTracking) # Mapping between initial field names and finally mapped field name. state : Mapping[str, Any] = field(init=False, compare=False, default_factory=dict) # pipeline state: allows to set variables that can be used in conversion (e.g. indices, data model names etc.) @@ -185,6 +216,7 @@ def apply(self, rule : SigmaRule) -> SigmaRule: """Apply processing pipeline on Sigma rule.""" self.applied = list() self.applied_ids = set() + self.field_name_applied_ids = defaultdict(set) self.field_mappings = FieldMappingTracking() self.state = dict() for item in self.items: @@ -194,6 +226,27 @@ def apply(self, rule : SigmaRule) -> SigmaRule: self.applied_ids.add(itid) return rule + def track_field_processing_items(self, src_field : str, dest_field : List[str], processing_item_id : Optional[str]) -> None: + """ + Track processing items that were applied to field names. This adds the processing_item_id to + the set of applied processing items from src_field and assigns a copy of this set ass + tracking set to all fields in dest_field. + """ + applied_identifiers : Set = self.field_name_applied_ids[src_field] + if processing_item_id is not None: + applied_identifiers.add(processing_item_id) + del self.field_name_applied_ids[src_field] + for field in dest_field: + self.field_name_applied_ids[field] = applied_identifiers.copy() + + def field_was_processed_by(self, field : Optional[str], processing_item_id : str) -> bool: + """ + Check if field name was processed by a particular processing item. + """ + if field is None: + return False + return processing_item_id in self.field_name_applied_ids[field] + def __add__(self, other : Optional["ProcessingPipeline"]) -> "ProcessingPipeline": """Concatenate two processing pipelines and merge their variables.""" if other is None: diff --git a/sigma/processing/transformations.py b/sigma/processing/transformations.py index b54503dc..ba6df67a 100644 --- a/sigma/processing/transformations.py +++ b/sigma/processing/transformations.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod +from functools import partial from sigma.conditions import ConditionOR, SigmaCondition -from typing import Any, Iterable, List, Dict, Optional, Union, Pattern, Iterator +from typing import Any, Iterable, List, Dict, Optional, Set, Union, Pattern, Iterator from dataclasses import dataclass, field import dataclasses import random @@ -37,6 +38,7 @@ def processing_item_applied(self, d : Union[SigmaRule, SigmaDetection, SigmaDete """Mark detection item or detection as applied.""" d.add_applied_processing_item(self.processing_item) +@dataclass class DetectionItemTransformation(Transformation): """ Iterates over all detection items of a Sigma rule and calls the apply_detection_item method @@ -76,6 +78,7 @@ def apply(self, pipeline : "sigma.processing.pipeline.ProcessingPipeline", rule for detection in rule.detection.detections.values(): self.apply_detection(detection) +@dataclass class FieldMappingTransformationBase(DetectionItemTransformation): """ Transformation that is applied to detection items and additionally the field list of a Sigma @@ -88,10 +91,24 @@ def apply_field_name(self, field : str) -> List[str]: a list of strings that are expanded into a new field list. """ + def _apply_field_name(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field : str) -> List[str]: + """ + Evaluate field name conditions and perform transformation with apply_field_name() method if + condition matches, else return original value. + """ + if self.processing_item is None or self.processing_item.match_field_name(pipeline, field): + result = self.apply_field_name(field) + if self.processing_item is not None: + pipeline.track_field_processing_items(field, result, self.processing_item.identifier) + return result + else: + return [ field ] + def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule) -> None: + _apply_field_name = partial(self._apply_field_name, pipeline) rule.fields = [ item - for mapping in map(self.apply_field_name, rule.fields) + for mapping in map(_apply_field_name, rule.fields) for item in mapping ] return super().apply(pipeline, rule) @@ -149,6 +166,7 @@ def apply_value(self, field : str, val : SigmaType) -> Optional[Union[SigmaType, The type annotation of the val argument is used to skip incompatible values. """ +@dataclass class ConditionTransformation(Transformation): """ Iterates over all rule conditions and calls the apply_condition method for each condition. Automatically diff --git a/tests/test_conversion_base.py b/tests/test_conversion_base.py index 023f8aba..59e3541c 100644 --- a/tests/test_conversion_base.py +++ b/tests/test_conversion_base.py @@ -1079,7 +1079,7 @@ def test_convert_dropped_detection_item_and(): ProcessingPipeline([ ProcessingItem( DropDetectionItemTransformation(), - detection_item_conditions=[ IncludeFieldCondition(fields=["EventID"]) ], + field_name_conditions=[ IncludeFieldCondition(fields=["EventID"]) ], ), ]), ) @@ -1104,7 +1104,7 @@ def test_convert_dropped_detection_item_or(): ProcessingPipeline([ ProcessingItem( DropDetectionItemTransformation(), - detection_item_conditions=[ IncludeFieldCondition(fields=["EventID"]) ], + field_name_conditions=[ IncludeFieldCondition(fields=["EventID"]) ], ), ]), ) diff --git a/tests/test_processing_conditions.py b/tests/test_processing_conditions.py index f963e3de..ebed8dca 100644 --- a/tests/test_processing_conditions.py +++ b/tests/test_processing_conditions.py @@ -3,7 +3,7 @@ from sigma.exceptions import SigmaConfigurationError, SigmaRegularExpressionError import pytest from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline -from sigma.processing.conditions import DetectionItemProcessingItemAppliedCondition, LogsourceCondition, IncludeFieldCondition, ExcludeFieldCondition, MatchStringCondition, RuleContainsDetectionItemCondition, RuleProcessingItemAppliedCondition +from sigma.processing.conditions import DetectionItemProcessingItemAppliedCondition, FieldNameProcessingItemAppliedCondition, LogsourceCondition, IncludeFieldCondition, ExcludeFieldCondition, MatchStringCondition, RuleContainsDetectionItemCondition, RuleProcessingItemAppliedCondition from sigma.rule import SigmaDetectionItem, SigmaLogSource, SigmaRule from tests.test_processing_pipeline import processing_item @@ -89,38 +89,38 @@ def test_rule_contains_detection_item_nomatch_value(sigma_rule): ).match(dummy_processing_pipeline, sigma_rule) def test_include_field_condition_match(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, detection_item) == True + assert IncludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, "field") == True def test_include_field_condition_match_nofield(dummy_processing_pipeline, detection_item_nofield): - assert IncludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, detection_item_nofield) == False + assert IncludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, None) == False def test_include_field_condition_nomatch(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["testfield", "otherfield"]).match(dummy_processing_pipeline, detection_item) == False + assert IncludeFieldCondition(["testfield", "otherfield"]).match(dummy_processing_pipeline, "field") == False def test_include_field_condition_re_match(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, detection_item) == True + assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, "field") == True def test_include_field_condition_re_match_nofield(dummy_processing_pipeline, detection_item_nofield): - assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, detection_item_nofield) == False + assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, None) == False def test_include_field_condition_re_nomatch(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["o[0-9]+", "x.*"], "re").match(dummy_processing_pipeline, detection_item) == False + assert IncludeFieldCondition(["o[0-9]+", "x.*"], "re").match(dummy_processing_pipeline, "field") == False def test_include_field_condition_wrong_type(dummy_processing_pipeline, detection_item): with pytest.raises(SigmaConfigurationError, match="Invalid.*type"): IncludeFieldCondition(["field", "otherfield"], "invalid") def test_exclude_field_condition_match(dummy_processing_pipeline, detection_item): - assert ExcludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, detection_item) == False + assert ExcludeFieldCondition(["field", "otherfield"]).match(dummy_processing_pipeline, "field") == False def test_exclude_field_condition_nomatch(dummy_processing_pipeline, detection_item): - assert ExcludeFieldCondition(["testfield", "otherfield"]).match(dummy_processing_pipeline, detection_item) == True + assert ExcludeFieldCondition(["testfield", "otherfield"]).match(dummy_processing_pipeline, "field") == True def test_exclude_field_condition_re_match(dummy_processing_pipeline, detection_item): - assert ExcludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, detection_item) == False + assert ExcludeFieldCondition(["o[0-9]+", "f.*"], "re").match(dummy_processing_pipeline, "field") == False def test_exclude_field_condition_re_nomatch(dummy_processing_pipeline, detection_item): - assert ExcludeFieldCondition(["o[0-9]+", "x.*"], "re").match(dummy_processing_pipeline, detection_item) == True + assert ExcludeFieldCondition(["o[0-9]+", "x.*"], "re").match(dummy_processing_pipeline, "field") == True @pytest.fixture def multivalued_detection_item(): @@ -165,4 +165,18 @@ def test_detection_item_processing_item_not_applied(dummy_processing_pipeline, p ).match( dummy_processing_pipeline, detection_item, - ) \ No newline at end of file + ) + +@pytest.fixture +def pipeline_field_tracking(): + pipeline = ProcessingPipeline() + pipeline.track_field_processing_items("field1", ["fieldA", "fieldB"], "processing_item") + return pipeline + +def test_field_name_processing_item_applied(pipeline_field_tracking): + assert FieldNameProcessingItemAppliedCondition(processing_item_id="processing_item") \ + .match(pipeline_field_tracking, "fieldA") + +def test_field_name_processing_item_not_applied(pipeline_field_tracking): + assert not FieldNameProcessingItemAppliedCondition(processing_item_id="processing_item") \ + .match(pipeline_field_tracking, "fieldC") \ No newline at end of file diff --git a/tests/test_processing_pipeline.py b/tests/test_processing_pipeline.py index 92e26961..302ee398 100644 --- a/tests/test_processing_pipeline.py +++ b/tests/test_processing_pipeline.py @@ -472,6 +472,23 @@ def test_processingpipeline_apply_partial(sigma_rule): and pipeline.applied == [False, True] \ and pipeline.applied_ids == { "append" } +def test_processingpipeline_field_processing_item_tracking(): + pipeline = ProcessingPipeline() + pipeline.track_field_processing_items("field1", ["fieldA", "fieldB"], "processing_item_1") + pipeline.track_field_processing_items("fieldA", ["fieldA", "fieldC", "fieldD"], "processing_item_2") + pipeline.track_field_processing_items("fieldB", ["fieldD", "fieldE"], "processing_item_3") + pipeline.track_field_processing_items("fieldE", ["fieldF"], None) + assert pipeline.field_name_applied_ids == { + "fieldA": {"processing_item_1", "processing_item_2"}, + "fieldC": {"processing_item_1", "processing_item_2"}, + "fieldD": {"processing_item_1", "processing_item_3"}, + "fieldF": {"processing_item_1", "processing_item_3"}, + } + assert pipeline.field_was_processed_by("fieldF", "processing_item_3") == True + assert pipeline.field_was_processed_by("fieldF", "processing_item_2") == False + assert pipeline.field_was_processed_by("nonexistingfield", "processing_item_2") == False + assert pipeline.field_was_processed_by(None, "processing_item_3") == False + def test_processingpipeline_concatenation(): p1 = ProcessingPipeline( items=[ diff --git a/tests/test_processing_transformations.py b/tests/test_processing_transformations.py index c6913f34..aa2a44cc 100644 --- a/tests/test_processing_transformations.py +++ b/tests/test_processing_transformations.py @@ -6,7 +6,7 @@ from sigma.processing import transformations from sigma.processing.transformations import AddConditionTransformation, ChangeLogsourceTransformation, ConditionTransformation, DetectionItemFailureTransformation, DropDetectionItemTransformation, RuleFailureTransformation, FieldMappingTransformation, AddFieldnameSuffixTransformation, AddFieldnamePrefixTransformation, SetStateTransformation, Transformation, WildcardPlaceholderTransformation, ValueListPlaceholderTransformation, QueryExpressionPlaceholderTransformation, ReplaceStringTransformation from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem -from sigma.processing.conditions import IncludeFieldCondition +from sigma.processing.conditions import DetectionItemProcessingItemAppliedCondition, FieldNameProcessingItemAppliedCondition, IncludeFieldCondition from sigma.rule import SigmaLogSource, SigmaRule, SigmaDetection, SigmaDetectionItem from sigma.types import Placeholder, SigmaNumber, SigmaQueryExpression, SigmaString, SpecialChars from sigma.modifiers import SigmaExpandModifier @@ -184,7 +184,7 @@ def test_drop_detection_item_transformation(sigma_rule : SigmaRule, dummy_pipeli transformation = DropDetectionItemTransformation() processing_item = ProcessingItem( transformation, - detection_item_conditions=[ IncludeFieldCondition(fields=["field2"]) ], + field_name_conditions=[ IncludeFieldCondition(fields=["field2"]) ], ) transformation.apply(dummy_pipeline, sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection([ @@ -198,7 +198,7 @@ def test_drop_detection_item_transformation_all(sigma_rule : SigmaRule, dummy_pi transformation = DropDetectionItemTransformation() processing_item = ProcessingItem( transformation, - detection_item_conditions=[ IncludeFieldCondition(fields=["field1", "field2", "field3"]) ], + field_name_conditions=[ IncludeFieldCondition(fields=["field1", "field2", "field3"]) ], ) transformation.apply(dummy_pipeline, sigma_rule) assert sigma_rule.detection.detections["test"].detection_items[0].detection_items == [] @@ -239,7 +239,7 @@ def test_add_fieldname_suffix_keyword(dummy_pipeline, keyword_sigma_rule, add_fi def test_add_fieldname_suffix_tracking(dummy_pipeline, sigma_rule, add_fieldname_suffix_transformation): processing_item = ProcessingItem( add_fieldname_suffix_transformation, - detection_item_conditions=[ + field_name_conditions=[ IncludeFieldCondition("field1") ], identifier="test", @@ -295,7 +295,7 @@ def test_add_fieldname_prefix_keyword(dummy_pipeline, keyword_sigma_rule, add_fi def test_add_fieldname_prefix_tracking(dummy_pipeline, sigma_rule, add_fieldname_prefix_transformation): processing_item = ProcessingItem( add_fieldname_prefix_transformation, - detection_item_conditions=[ + field_name_conditions=[ IncludeFieldCondition("field1") ], identifier="test", @@ -314,6 +314,38 @@ def test_add_fieldname_prefix_tracking(dummy_pipeline, sigma_rule, add_fieldname assert sigma_rule.was_processed_by("test") assert processing_item.transformation.pipeline.field_mappings == { "field1": { "test.field1" } } +def test_fields_list_mapping_with_detection_item_condition(sigma_rule : SigmaRule): + processing_pipeline = ProcessingPipeline( + [ + ProcessingItem( + identifier="suffix_some", + transformation=AddFieldnameSuffixTransformation(".test"), + field_name_conditions=[ + IncludeFieldCondition( + fields=["^field\\d+"], + type="re", + ), + ], + ), + ProcessingItem( + identifier="prefix_others", + transformation=AddFieldnamePrefixTransformation("test."), + field_name_conditions=[ + FieldNameProcessingItemAppliedCondition("suffix_some"), + ], + field_name_condition_negation=True, + ), + ] + ) + processing_pipeline.apply(sigma_rule) + assert sigma_rule.fields == [ + "test.otherfield1", + "field1.test", + "field2.test", + "field3.test", + "test.otherfield2", + ] + def test_wildcard_placeholders(dummy_pipeline, sigma_rule_placeholders : SigmaRule): transformation = WildcardPlaceholderTransformation() transformation.apply(dummy_pipeline, sigma_rule_placeholders)