Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not Equals (!=) Expressions #301

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 101 additions & 5 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from collections import ChainMap, defaultdict
from contextlib import contextmanager
import re

from sigma.correlations import (
Expand Down Expand Up @@ -139,6 +140,9 @@ class Backend(ABC):
# not exists: convert as "not exists-expression" or as dedicated expression
explicit_not_exists_expression: ClassVar[bool] = False

# use not_eq_token, not_eq_expression, etc. to implement != as a separate expression instead of not_token in ConditionNOT
convert_not_as_not_eq: ClassVar[bool] = False

def __init__(
self,
processing_pipeline: Optional[ProcessingPipeline] = None,
Expand Down Expand Up @@ -745,9 +749,15 @@ class variables. If this is not sufficient, the respective methods can be implem
eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator)
)
not_eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator) if using not_eq_expression over not_token
)
eq_expression: ClassVar[str] = (
"{field}{backend.eq_token}{value}" # Expression for field = value
)
not_eq_expression: ClassVar[str] = (
"{field}{backend.not_eq_token}{value}" # Expression for field != value
)

# Query structure
# The generated query can be embedded into further structures. One common example are data
Expand Down Expand Up @@ -812,12 +822,15 @@ class variables. If this is not sufficient, the respective methods can be implem
}
)

# String matching operators. if none is appropriate eq_token is used.
# String matching operators. if none is appropriate eq_token (or not_eq_token) is used.
startswith_expression: ClassVar[Optional[str]] = None
not_startswith_expression: ClassVar[Optional[str]] = None
startswith_expression_allow_special: ClassVar[bool] = False
endswith_expression: ClassVar[Optional[str]] = None
not_endswith_expression: ClassVar[Optional[str]] = None
endswith_expression_allow_special: ClassVar[bool] = False
contains_expression: ClassVar[Optional[str]] = None
not_contains_expression: ClassVar[Optional[str]] = None
contains_expression_allow_special: ClassVar[bool] = False
wildcard_match_expression: ClassVar[Optional[str]] = (
None # Special expression if wildcards can't be matched with the eq_token operator.
Expand All @@ -828,6 +841,7 @@ class variables. If this is not sufficient, the respective methods can be implem
# is one of the flags shortcuts supported by Sigma (currently i, m and s) and refers to the
# token stored in the class variable re_flags.
re_expression: ClassVar[Optional[str]] = None
not_re_expression: ClassVar[Optional[str]] = None
re_escape_char: ClassVar[Optional[str]] = (
None # Character used for escaping in regular expressions
)
Expand All @@ -849,17 +863,21 @@ class variables. If this is not sufficient, the respective methods can be implem
# Case sensitive string matching operators similar to standard string matching. If not provided,
# case_sensitive_match_expression is used.
case_sensitive_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_startswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_endswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_not_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_contains_expression_allow_special: ClassVar[bool] = False

# CIDR expressions: define CIDR matching if backend has native support. Else pySigma expands
# CIDR values into string wildcard matches.
cidr_expression: ClassVar[Optional[str]] = (
None # CIDR expression query as format string with placeholders {field}, {value} (the whole CIDR value), {network} (network part only), {prefixlen} (length of network mask prefix) and {netmask} (CIDR network mask only)
)
not_cidr_expression: ClassVar[Optional[str]] = None

# Numeric comparison operators
compare_op_expression: ClassVar[Optional[str]] = (
Expand Down Expand Up @@ -1087,6 +1105,58 @@ def __new__(cls, *args, **kwargs):
c.explicit_not_exists_expression = c.field_not_exists_expression is not None
return c

@contextmanager
def not_equals_context_manager(self, use_negated_expressions: bool = False):
"""Context manager to temporarily swap expressions with their negated versions."""
if not use_negated_expressions:
yield
return

# Store original expressions
original_expressions = {
"eq_expression": self.eq_expression,
"re_expression": self.re_expression,
"cidr_expression": self.cidr_expression,
"startswith_expression": self.startswith_expression,
"case_sensitive_startswith_expression": self.case_sensitive_startswith_expression,
"endswith_expression": self.endswith_expression,
"case_sensitive_endswith_expression": self.case_sensitive_endswith_expression,
"contains_expression": self.contains_expression,
"case_sensitive_contains_expression": self.case_sensitive_contains_expression,
}

# Swap to negated versions
try:
self.eq_expression = self.not_eq_expression
self.re_expression = self.not_re_expression
self.cidr_expression = self.not_cidr_expression
self.startswith_expression = self.not_startswith_expression
self.case_sensitive_startswith_expression = (
self.case_sensitive_not_startswith_expression
)
self.endswith_expression = self.not_endswith_expression
self.case_sensitive_endswith_expression = self.case_sensitive_not_endswith_expression
self.contains_expression = self.not_contains_expression
self.case_sensitive_contains_expression = self.case_sensitive_not_contains_expression
yield
finally:
# Restore original expressions
self.eq_expression = original_expressions["eq_expression"]
self.re_expression = original_expressions["re_expression"]
self.cidr_expression = original_expressions["cidr_expression"]
self.startswith_expression = original_expressions["startswith_expression"]
self.case_sensitive_startswith_expression = original_expressions[
"case_sensitive_startswith_expression"
]
self.endswith_expression = original_expressions["endswith_expression"]
self.case_sensitive_endswith_expression = original_expressions[
"case_sensitive_endswith_expression"
]
self.contains_expression = original_expressions["contains_expression"]
self.case_sensitive_contains_expression = original_expressions[
"case_sensitive_contains_expression"
]

def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool:
"""
Compare precedence of outer and inner condition items. Return True if precedence of
Expand Down Expand Up @@ -1209,17 +1279,22 @@ def convert_condition_not(
arg = cond.args[0]
try:
if arg.__class__ in self.precedence: # group if AND or OR condition is negated
return (
self.not_token + self.token_separator + self.convert_condition_group(arg, state)
)
converted_group = self.convert_condition_group(arg, state)
if self.convert_not_as_not_eq:
return converted_group
else:
return self.not_token + self.token_separator + converted_group
else:
expr = self.convert_condition(arg, state)
if isinstance(
expr, DeferredQueryExpression
): # negate deferred expression and pass it to parent
return expr.negate()
else: # convert negated expression to string
return self.not_token + self.token_separator + expr
if self.convert_not_as_not_eq:
return expr
else:
return self.not_token + self.token_separator + expr
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'not' not supported by the backend")

Expand Down Expand Up @@ -1313,6 +1388,27 @@ def convert_value_str(self, s: SigmaString, state: ConversionState) -> str:
else:
return converted

def convert_condition_field_eq_val(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
"""Uses context manager with parent class method to swap expressions with their negated versions
if convert_not_as_not_eq is set and the parent of the condition is a ConditionNOT."""

# Determine if negation is needed

def is_parent_not(cond):
if cond.parent is None:
return False
if isinstance(cond.parent, ConditionNOT):
return True
return is_parent_not(cond.parent)

negation = is_parent_not(cond) and self.convert_not_as_not_eq

# Use context manager to handle negation
with self.not_equals_context_manager(use_negated_expressions=negation):
return super().convert_condition_field_eq_val(cond, state)

def convert_condition_field_eq_val_str(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
Expand Down
175 changes: 175 additions & 0 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2803,3 +2803,178 @@ def test_convert_without_output(test_backend):

assert rule._conversion_result == ['mappedA="value" and \'field A\'="value"']
assert result == []


def test_convert_not_as_not_eq(test_backend, monkeypatch):
"""Test that NOT conditions are converted using not_eq expressions when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA: value1
condition: not sel
"""
)
)
== ['mappedA!="value1"']
)


def test_convert_not_startswith(test_backend, monkeypatch):
"""Test negated startswith expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_startswith_expression", "{field} not_startswith {value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|startswith: "val"
condition: not sel
"""
)
)
== ['mappedA not_startswith "val"']
)


def test_convert_not_contains(test_backend, monkeypatch):
"""Test negated contains expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_contains_expression", "{field} not_contains {value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|contains: "val"
condition: not sel
"""
)
)
== ['mappedA not_contains "val"']
)


def test_convert_not_re(test_backend, monkeypatch):
"""Test negated regular expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_re_expression", "{field}!=/{regex}/")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|re: "val.*"
condition: not sel
"""
)
)
== ["mappedA!=/val.*/"]
)


def test_convert_not_cidr(test_backend, monkeypatch):
"""Test negated CIDR expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_cidr_expression", "cidrnotmatch('{field}', \"{value}\")")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|cidr: "192.168.1.0/24"
condition: not sel
"""
)
)
== ["cidrnotmatch('mappedA', \"192.168.1.0/24\")"]
)


def test_convert_not_and_group(test_backend, monkeypatch):
"""Test that NOT with AND group is handled correctly when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: value1
sel2:
fieldB: value2
condition: not (sel1 and sel2)
"""
)
)
== ['(mappedA!="value1" and mappedB!="value2")']
)


def test_convert_not_or_group(test_backend, monkeypatch):
"""Test that NOT with OR group is handled correctly when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: value1
sel2:
fieldB: value2
condition: not (sel1 or sel2)
"""
)
)
== ['(mappedA!="value1" or mappedB!="value2")']
)