From 4dab51534a12efe6ce40aba54e73f5a65d1d69d7 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 10:00:09 -0600 Subject: [PATCH 01/14] add suuport for esql type --- detection_rules/misc.py | 40 +++++++++++++++++++ detection_rules/rule.py | 26 +++++++++--- detection_rules/rule_validators.py | 32 ++++++++++++--- detection_rules/schemas/definitions.py | 4 +- ...rare_microsoft_office_child_processes.toml | 38 ++++++++++++++++++ tests/test_all_rules.py | 27 +++++++++++-- 6 files changed, 150 insertions(+), 17 deletions(-) create mode 100644 rules/windows/command_and_control_rare_microsoft_office_child_processes.toml diff --git a/detection_rules/misc.py b/detection_rules/misc.py index e940f920316..2f9f6fe8da2 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -11,6 +11,7 @@ from pathlib import Path from functools import wraps +from threading import Lock from typing import NoReturn import click @@ -446,3 +447,42 @@ def _wrapped(*args, **kwargs): return _wrapped return _wrapper + + +class ElasticsearchClientSingleton: + """Singleton for Elasticsearch client.""" + _instance = None + _lock = Lock() + + def __new__(cls, *args, **kwargs): + """Create a new instance of the Elasticsearch client.""" + with cls._lock: # ensures thread-safe singleton initialization + if cls._instance is None: + cls._instance = super(ElasticsearchClientSingleton, cls).__new__(cls) + # Initialize the client here + # export DR_VALIDATE_ESQL=true + # export DR_CLOUD_ID="" or export DR_ELASTICSEARCH_URL="" + # export DR_ES_PASSWORD="" + # export DR_ES_USER="" + es_client_args = { + "es_password": getdefault("es_password")(), + "es_user": getdefault("es_user")(), + "ignore_ssl_errors": True + } + + cloud_id = getdefault('cloud_id')() + elasticsearch_url = getdefault('elasticsearch_url')() + + if cloud_id: + es_client_args["cloud_id"] = cloud_id + elif not cloud_id and elasticsearch_url: + es_client_args["elasticsearch_url"] = elasticsearch_url + else: + raise ClientError("Either the cloud_id or elasticsearch_url must be set.") + + cls._instance.client = get_elasticsearch_client(**es_client_args) + return cls._instance + + @classmethod + def get_client(cls): + return cls()._instance.client diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 1eaf485b4af..4cb3d9d8df7 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -569,6 +569,8 @@ def validator(self) -> Optional[QueryValidator]: return KQLValidator(self.query) elif self.language == "eql": return EQLValidator(self.query) + elif self.language == "esql": + return ESQLValidator(self.query) def validate_query(self, meta: RuleMeta) -> None: validator = self.validator @@ -714,6 +716,13 @@ def interval_ratio(self) -> Optional[float]: return interval / self.max_span +@dataclass(frozen=True) +class ESQLRuleData(QueryRuleData): + """ESQL rules are a special case of query rules.""" + type: Literal["esql"] + language: Literal["esql"] + + @dataclass(frozen=True) class ThreatMatchRuleData(QueryRuleData): """Specific fields for indicator (threat) match rule.""" @@ -760,8 +769,9 @@ def validate_query(self, meta: RuleMeta) -> None: # All of the possible rule types # Sort inverse of any inheritance - see comment in TOMLRuleContents.to_dict -AnyRuleData = Union[EQLRuleData, ThresholdQueryRuleData, ThreatMatchRuleData, - MachineLearningRuleData, QueryRuleData, NewTermsRuleData] +AnyRuleData = Union[EQLRuleData, ESQLRuleData, ThresholdQueryRuleData, + ThreatMatchRuleData, MachineLearningRuleData, QueryRuleData, + NewTermsRuleData] class BaseRuleContents(ABC): @@ -945,9 +955,10 @@ def _post_dict_conversion(self, obj: dict) -> dict: super()._post_dict_conversion(obj) # build time fields - self._convert_add_related_integrations(obj) - self._convert_add_required_fields(obj) - self._convert_add_setup(obj) + if not isinstance(self.data, ESQLRuleData): + self._convert_add_related_integrations(obj) + self._convert_add_required_fields(obj) + self._convert_add_setup(obj) # validate new fields against the schema rule_type = obj['type'] @@ -1084,6 +1095,9 @@ def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta, packaged_integrations = [] datasets = set() + if data.type == "esql": + return packaged_integrations + for node in data.get('ast', []): if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) @@ -1333,4 +1347,4 @@ def get_unique_query_fields(rule: TOMLRule) -> List[str]: # avoid a circular import -from .rule_validators import EQLValidator, KQLValidator # noqa: E402 +from .rule_validators import EQLValidator, ESQLValidator, KQLValidator # noqa: E402 diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index f54d7345043..10fd37823c0 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -4,19 +4,23 @@ # 2.0. """Validation logic for rules containing queries.""" +import os from functools import cached_property -from typing import List, Optional, Union, Tuple -from semver import Version +from typing import List, Optional, Tuple, Union import eql +from marshmallow import ValidationError +from semver import Version import kql from . import ecs, endgame -from .integrations import get_integration_schema_data, load_integrations_manifests -from .misc import load_current_package_version +from .integrations import (get_integration_schema_data, + load_integrations_manifests) +from .misc import ElasticsearchClientSingleton, load_current_package_version +from .rule import (EQLRuleData, QueryRuleData, QueryValidator, RuleMeta, + TOMLRuleContents, set_eql_config) from .schemas import get_stack_schemas -from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, EQLRuleData, set_eql_config EQL_ERROR_TYPES = Union[eql.EqlCompileError, eql.EqlError, @@ -346,6 +350,24 @@ def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) - return [], False +class ESQLValidator(QueryValidator): + """Specific fields for ESQL query event types.""" + + def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: + """Validate an ESQL query while checking TOMLRule.""" + if not os.environ.get("DR_VALIDATE_ESQL"): + return + + if Version.parse(meta.min_stack_version) < Version.parse("8.11.0"): + raise ValidationError(f"Rule minstack must be greater than 8.10.0 {data.rule_id}") + + client = ElasticsearchClientSingleton.get_client() + client.info() + client.perform_request("POST", "/_query", params={"pretty": True}, + headers={"accept": "application/json", "content-type": "application/json"}, + body={"query": f"{self.query} | LIMIT 0"}) + + def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]: """Extract the field name from an EQL or KQL parse error.""" lines = exc.source.splitlines() diff --git a/detection_rules/schemas/definitions.py b/detection_rules/schemas/definitions.py index e4bf09efd57..1926ede4ac2 100644 --- a/detection_rules/schemas/definitions.py +++ b/detection_rules/schemas/definitions.py @@ -137,7 +137,7 @@ CodeString = NewType("CodeString", str) ConditionSemVer = NewType('ConditionSemVer', str, validate=validate.Regexp(CONDITION_VERSION_PATTERN)) Date = NewType('Date', str, validate=validate.Regexp(DATE_PATTERN)) -FilterLanguages = Literal["kuery", "lucene"] +FilterLanguages = Literal["eql", "esql", "kuery", "lucene"] Interval = NewType('Interval', str, validate=validate.Regexp(INTERVAL_PATTERN)) Markdown = NewType("MarkdownField", CodeString) Maturity = Literal['development', 'experimental', 'beta', 'production', 'deprecated'] @@ -148,7 +148,7 @@ PositiveInteger = NewType('PositiveInteger', int, validate=validate.Range(min=1)) RiskScore = NewType("MaxSignals", int, validate=validate.Range(min=1, max=100)) RuleName = NewType('RuleName', str, validate=validate.Regexp(NAME_PATTERN)) -RuleType = Literal['query', 'machine_learning', 'eql', 'threshold', 'threat_match', 'new_terms'] +RuleType = Literal['query', 'machine_learning', 'eql', 'esql', 'threshold', 'threat_match', 'new_terms'] SemVer = NewType('SemVer', str, validate=validate.Regexp(VERSION_PATTERN)) SemVerMinorOnly = NewType('SemVerFullStrict', str, validate=validate.Regexp(MINOR_SEMVER)) Severity = Literal['low', 'medium', 'high', 'critical'] diff --git a/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml b/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml new file mode 100644 index 00000000000..a965a6521ed --- /dev/null +++ b/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml @@ -0,0 +1,38 @@ +[metadata] +creation_date = "2023/02/27" +integration = ["endpoint"] +maturity = "production" +min_stack_comments = "ES|QL Rule" +min_stack_version = "8.11.0" +updated_date = "2023/06/22" + +[rule] +author = ["Elastic"] +description = """ +Detects rare child processes of Microsoft Office applications, which may indicate an attempt to execute malicious payloads or scripts. This detection focuses on instances where Microsoft Office processes spawn child processes that occur infrequently, potentially as part of a post-exploitation stage of an attack. +""" +from = "now-9m" +index = ["logs-endpoint.events.*"] +language = "esql" +license = "Elastic License v2" +name = "Rare Microsoft Office Child Processes" +risk_score = 47 +rule_id = "24220495-cffe-45a1-996c-37b599ba0e43" +severity = "medium" +tags = ["Data Source: Elastic Endpoint", "Domain: Endpoint", "OS: Windows", "Use Case: Threat Detection", "Tactic: Command and Control", "Data Source: Elastic Defend"] +timestamp_override = "event.ingested" +type = "esql" +query = ''' +from .ds-logs-endpoint.events.process-default-* + | where event.action == "start" and process.code_signature.subject_name like "Microsoft*" and process.parent.name in ("winword.exe", "WINWORD.EXE", "EXCEL.EXE", "excel.exe") + | eval process_path = replace(process.executable, """[cC]:\\[uU][sS][eE][rR][sS]\\[a-zA-Z0-9\.\-\_\$]+\\""", "C:\\\\users\\\\user\\\\") + | stats cc = count(*) by process_path, process.parent.name | where cc <= 5 +''' + +[[rule.threat]] +framework = "MITRE ATT&CK" + +[rule.threat.tactic] +id = "TA0011" +name = "Command and Control" +reference = "https://attack.mitre.org/tactics/TA0011/" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index d6f28d68e1b..7121070dad6 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -27,7 +27,7 @@ from detection_rules.rule import (QueryRuleData, QueryValidator, TOMLRuleContents) from detection_rules.rule_loader import FILE_PATTERN -from detection_rules.rule_validators import EQLValidator, KQLValidator +from detection_rules.rule_validators import EQLValidator, ESQLValidator, KQLValidator from detection_rules.schemas import definitions, get_stack_schemas from detection_rules.utils import (INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump) @@ -812,12 +812,14 @@ def build_rule(query: str, query_language: str): def test_event_dataset(self): for rule in self.all_rules: - if(isinstance(rule.contents.data, QueryRuleData)): + if (isinstance(rule.contents.data, QueryRuleData)): # Need to pick validator based on language if rule.contents.data.language == "kuery": test_validator = KQLValidator(rule.contents.data.query) - if rule.contents.data.language == "eql": + elif rule.contents.data.language == "eql": test_validator = EQLValidator(rule.contents.data.query) + elif rule.contents.data.language == "esql": + test_validator = ESQLValidator(rule.contents.data.query) data = rule.contents.data meta = rule.contents.metadata if meta.query_schema_validation is not False or meta.maturity != "deprecated": @@ -833,7 +835,7 @@ def test_event_dataset(self): meta, pkg_integrations) - if(validation_integrations_check and "event.dataset" in rule.contents.data.query): + if (validation_integrations_check and "event.dataset" in rule.contents.data.query): raise validation_integrations_check @@ -1417,3 +1419,20 @@ def test_new_terms_fields_unique(self): if rule.contents.data.type == "new_terms": assert len(set(rule.contents.data.new_terms.value)) == len(rule.contents.data.new_terms.value), \ f"new terms fields values are not unique - {rule.contents.data.new_terms.value}" + + +class TestESQLRules(BaseRuleTest): + """Test ESQL Rules.""" + + @unittest.skipIf(not os.environ.get("DR_VALIDATE_ESQL"), + "Test only run when DR_VALIDATE_ESQL environment variable set.") + def test_environment_variables_set(self): + assert os.environ.get("DR_ES_USER") is not None + assert (os.environ.get("DR_CLOUD_ID") is not None or os.environ.get("DR_ELASTICSEARCH_URL") is not None) + + for rule in self.production_rules: + if rule.contents.data.type == "esql": + + # Stub test to validate esql rules + assert rule.contents.data.language == "esql", \ + f"{self.rule_str(rule)} is not an ESQL rule type" From b40d23ad2c7e9a7816a07ec8ae3774ccbff4102e Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 11:30:44 -0600 Subject: [PATCH 02/14] skip esql rules without ast --- detection_rules/rule.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 4cb3d9d8df7..6cbf010de8d 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -1095,14 +1095,14 @@ def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta, packaged_integrations = [] datasets = set() - if data.type == "esql": - return packaged_integrations - - for node in data.get('ast', []): - if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': - datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) - elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset': - datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value))) + if data.type != "esql": + # skip ES|QL rules until ast is available + + for node in data.get('ast', []): + if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': + datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) + elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset': + datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value))) # integration is None to remove duplicate references upstream in Kibana # chronologically, event.dataset is checked for package:integration, then rule tags From c910144eecb8a0794e05e5c963ad4ebdf55abfb6 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 11:39:31 -0600 Subject: [PATCH 03/14] skip unique_fields for esql rules --- detection_rules/rule.py | 7 +++---- detection_rules/rule_validators.py | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 6cbf010de8d..1697a6698e7 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -955,10 +955,9 @@ def _post_dict_conversion(self, obj: dict) -> dict: super()._post_dict_conversion(obj) # build time fields - if not isinstance(self.data, ESQLRuleData): - self._convert_add_related_integrations(obj) - self._convert_add_required_fields(obj) - self._convert_add_setup(obj) + self._convert_add_related_integrations(obj) + self._convert_add_required_fields(obj) + self._convert_add_setup(obj) # validate new fields against the schema rule_type = obj['type'] diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 10fd37823c0..eb544f43188 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -353,6 +353,12 @@ def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) - class ESQLValidator(QueryValidator): """Specific fields for ESQL query event types.""" + @cached_property + def unique_fields(self) -> List[str]: + """Return a list of unique fields in the query.""" + # return empty list for ES|QL rules until ast is available + return [] + def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: """Validate an ESQL query while checking TOMLRule.""" if not os.environ.get("DR_VALIDATE_ESQL"): From 800c2647b3c73c9b9dd6de8d31970a3b53026316 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 12:20:13 -0600 Subject: [PATCH 04/14] validate integrations with base validator --- detection_rules/rule_validators.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index eb544f43188..1ca5f214adf 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -8,6 +8,7 @@ from functools import cached_property from typing import List, Optional, Tuple, Union +import elasticsearch import eql from marshmallow import ValidationError from semver import Version @@ -369,9 +370,17 @@ def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: client = ElasticsearchClientSingleton.get_client() client.info() - client.perform_request("POST", "/_query", params={"pretty": True}, - headers={"accept": "application/json", "content-type": "application/json"}, - body={"query": f"{self.query} | LIMIT 0"}) + headers = {"accept": "application/json", "content-type": "application/json"} + try: + client.perform_request("POST", "/_query", params={"pretty": True}, + headers=headers, + body={"query": f"{self.query} | LIMIT 0"}) + except elasticsearch.BadRequestError as exc: + raise ValidationError(f"ESQL query failed: {exc}") + + def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[ + ValidationError, None, ValueError]: + return self.validate(data, meta) def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]: From 3f0a83a74ebb76e000d3a029ecba86fda21c3f3c Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 13:44:34 -0600 Subject: [PATCH 05/14] fail on queries with enrich fields for now --- detection_rules/misc.py | 1 + detection_rules/rule_validators.py | 1 + tests/test_all_rules.py | 7 ++++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 2f9f6fe8da2..96db58b0ecb 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -485,4 +485,5 @@ def __new__(cls, *args, **kwargs): @classmethod def get_client(cls): + """Get the Elasticsearch client instance.""" return cls()._instance.client diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 1ca5f214adf..b2883905055 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -363,6 +363,7 @@ def unique_fields(self) -> List[str]: def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: """Validate an ESQL query while checking TOMLRule.""" if not os.environ.get("DR_VALIDATE_ESQL"): + print(f"Skipping ES|QL validation for {data.name} - {data.rule_id}") return if Version.parse(meta.min_stack_version) < Version.parse("8.11.0"): diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 7121070dad6..bdd111f4322 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -1435,4 +1435,9 @@ def test_environment_variables_set(self): # Stub test to validate esql rules assert rule.contents.data.language == "esql", \ - f"{self.rule_str(rule)} is not an ESQL rule type" + f"{self.rule_str(rule)} is not an ES|QL rule type" + + # Validate that the rule does not contain enrich + # until we support it + assert "enrich" not in rule.contents.data.query.lower(), \ + f"{self.rule_str(rule)} is an ES|QL rule type and contains enrich" From bcac745a89f51d22f5b288668b4bc04455e4b231 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 20:04:39 -0600 Subject: [PATCH 06/14] add unit tests --- detection_rules/rule.py | 12 +++- ...rare_microsoft_office_child_processes.toml | 1 - tests/data/__init__.py | 0 ...command_control_dummy_production_rule.toml | 37 ++++++++++++ tests/test_all_rules.py | 60 ++++++++++++++++--- 5 files changed, 99 insertions(+), 11 deletions(-) create mode 100644 tests/data/__init__.py create mode 100644 tests/data/command_control_dummy_production_rule.toml diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 1697a6698e7..4e0243aa429 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -569,8 +569,6 @@ def validator(self) -> Optional[QueryValidator]: return KQLValidator(self.query) elif self.language == "eql": return EQLValidator(self.query) - elif self.language == "esql": - return ESQLValidator(self.query) def validate_query(self, meta: RuleMeta) -> None: validator = self.validator @@ -717,10 +715,18 @@ def interval_ratio(self) -> Optional[float]: @dataclass(frozen=True) -class ESQLRuleData(QueryRuleData): +class ESQLRuleData(BaseRuleData): """ESQL rules are a special case of query rules.""" type: Literal["esql"] language: Literal["esql"] + query: str + + @cached_property + def validator(self) -> Optional[QueryValidator]: + return ESQLValidator(self.query) + + def validate_query(self, meta: RuleMeta) -> None: + return self.validator.validate(self, meta) @dataclass(frozen=True) diff --git a/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml b/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml index a965a6521ed..e3a08913284 100644 --- a/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml +++ b/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml @@ -12,7 +12,6 @@ description = """ Detects rare child processes of Microsoft Office applications, which may indicate an attempt to execute malicious payloads or scripts. This detection focuses on instances where Microsoft Office processes spawn child processes that occur infrequently, potentially as part of a post-exploitation stage of an attack. """ from = "now-9m" -index = ["logs-endpoint.events.*"] language = "esql" license = "Elastic License v2" name = "Rare Microsoft Office Child Processes" diff --git a/tests/data/__init__.py b/tests/data/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/data/command_control_dummy_production_rule.toml b/tests/data/command_control_dummy_production_rule.toml new file mode 100644 index 00000000000..caa798ff22a --- /dev/null +++ b/tests/data/command_control_dummy_production_rule.toml @@ -0,0 +1,37 @@ +[metadata] +creation_date = "2023/11/20" +integration = ["endpoint"] +maturity = "production" +min_stack_comments = "ES|QL Rule" +min_stack_version = "8.11.0" +updated_date = "2023/11/20" + +[rule] +author = ["Elastic"] +description = """ +Sample ES|QL rule for unit tests. +""" +from = "now-9m" +language = "esql" +license = "Elastic License v2" +name = "Sample ES|QL rule for unit tests" +risk_score = 47 +rule_id = "24220495-cffe-45a1-996c-37b599ba0e43" +severity = "medium" +tags = ["Data Source: Elastic Endpoint", "Domain: Endpoint", "OS: Windows", "Use Case: Threat Detection", "Tactic: Command and Control", "Data Source: Elastic Defend"] +timestamp_override = "event.ingested" +type = "esql" +query = ''' +from .ds-logs-endpoint.events.process-default-* + | where event.action == "start" and process.code_signature.subject_name like "Microsoft*" and process.parent.name in ("winword.exe", "WINWORD.EXE", "EXCEL.EXE", "excel.exe") + | eval process_path = replace(process.executable, """[cC]:\\[uU][sS][eE][rR][sS]\\[a-zA-Z0-9\.\-\_\$]+\\""", "C:\\\\users\\\\user\\\\") + | stats cc = count(*) by process_path, process.parent.name | where cc <= 5 +''' + +[[rule.threat]] +framework = "MITRE ATT&CK" + +[rule.threat.tactic] +id = "TA0011" +name = "Command and Control" +reference = "https://attack.mitre.org/tactics/TA0011/" diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index bdd111f4322..c646ca19881 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -10,9 +10,13 @@ import uuid import warnings from collections import defaultdict +from copy import deepcopy from pathlib import Path import eql.ast +import marshmallow +import pytest +from contextlib import nullcontext as does_not_raise from marshmallow import ValidationError from semver import Version @@ -26,11 +30,11 @@ from detection_rules.packaging import current_stack_version from detection_rules.rule import (QueryRuleData, QueryValidator, TOMLRuleContents) -from detection_rules.rule_loader import FILE_PATTERN +from detection_rules.rule_loader import FILE_PATTERN, RuleCollection from detection_rules.rule_validators import EQLValidator, ESQLValidator, KQLValidator from detection_rules.schemas import definitions, get_stack_schemas from detection_rules.utils import (INTEGRATION_RULE_DIR, PatchedTemplate, - get_path, load_etc_dump) + get_path, load_etc_dump, load_rule_contents) from detection_rules.version_lock import default_version_lock from rta import get_available_tests @@ -1424,12 +1428,54 @@ def test_new_terms_fields_unique(self): class TestESQLRules(BaseRuleTest): """Test ESQL Rules.""" - @unittest.skipIf(not os.environ.get("DR_VALIDATE_ESQL"), - "Test only run when DR_VALIDATE_ESQL environment variable set.") - def test_environment_variables_set(self): - assert os.environ.get("DR_ES_USER") is not None - assert (os.environ.get("DR_CLOUD_ID") is not None or os.environ.get("DR_ELASTICSEARCH_URL") is not None) + @classmethod + def setUpClass(cls): + """Set up test environment.""" + + cls.dr_es_user = os.environ.get("DR_ES_USER") + cls.dr_cloud_id = os.environ.get("DR_CLOUD_ID") + cls.dr_elasticsearch_url = os.environ.get("DR_ELASTICSEARCH_URL") + cls.dr_validate_esql = os.environ.get("DR_VALIDATE_ESQL") + + if cls.dr_validate_esql is None: + raise unittest.SkipTest("Test only run when DR_VALIDATE_ESQL environment variable set.") + assert cls.dr_es_user is not None, "DR_ES_USER environment variable is not set." + assert cls.dr_cloud_id is not None or cls.dr_elasticsearch_url is not None, \ + "Either DR_CLOUD_ID or DR_ELASTICSEARCH_URL must be set." + + super().setUpClass() + + def test_esql_queries(self): + test_cases = [ + # invalid queries + ('from .ds-logs-endpoint.events.process-default-* | wheres process.name like "Microsoft*"', + pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), + ('from .ds-logs-endpoint.events.process-default-* | where process.names like "Microsoft*"', + pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), + + # valid queries + ('from .ds-logs-endpoint.events.process-default-* | where process.name like "Microsoft*"', + does_not_raise(), None), + ] + for esql_query, expectation, message in test_cases: + self.run_esql_test(esql_query, expectation, message) + + def run_esql_test(self, esql_query, expectation, message): + """Test that the endpoint schema query validators are working correctly.""" + rc = RuleCollection() + file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) + original_production_rule = load_rule_contents(file_path) + + # Test that a ValidationError is raised if the query doesn't match the schema + production_rule = deepcopy(original_production_rule)[0] + production_rule["rule"]["query"] = esql_query + + expectation.match_expr = message + with expectation: + rc.load_dict(production_rule) + def test_esql_rules(self): + """Test ESQL rules.""" for rule in self.production_rules: if rule.contents.data.type == "esql": From 87f6ec5e3115563ad47a543e17d87cca1d088676 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 20:09:25 -0600 Subject: [PATCH 07/14] small reorganization --- tests/test_all_rules.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index c646ca19881..6c0fd3b52af 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -1445,6 +1445,20 @@ def setUpClass(cls): super().setUpClass() + def run_esql_test(self, esql_query, expectation, message): + """Test that the endpoint schema query validators are working correctly.""" + rc = RuleCollection() + file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) + original_production_rule = load_rule_contents(file_path) + + # Test that a ValidationError is raised if the query doesn't match the schema + production_rule = deepcopy(original_production_rule)[0] + production_rule["rule"]["query"] = esql_query + + expectation.match_expr = message + with expectation: + rc.load_dict(production_rule) + def test_esql_queries(self): test_cases = [ # invalid queries @@ -1460,20 +1474,6 @@ def test_esql_queries(self): for esql_query, expectation, message in test_cases: self.run_esql_test(esql_query, expectation, message) - def run_esql_test(self, esql_query, expectation, message): - """Test that the endpoint schema query validators are working correctly.""" - rc = RuleCollection() - file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) - original_production_rule = load_rule_contents(file_path) - - # Test that a ValidationError is raised if the query doesn't match the schema - production_rule = deepcopy(original_production_rule)[0] - production_rule["rule"]["query"] = esql_query - - expectation.match_expr = message - with expectation: - rc.load_dict(production_rule) - def test_esql_rules(self): """Test ESQL rules.""" for rule in self.production_rules: From 4e048a8d9f676c2d914084ffe253c40dc83f42ee Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Mon, 20 Nov 2023 20:10:26 -0600 Subject: [PATCH 08/14] docstrings --- tests/test_all_rules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 6c0fd3b52af..4106d023615 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -1431,7 +1431,6 @@ class TestESQLRules(BaseRuleTest): @classmethod def setUpClass(cls): """Set up test environment.""" - cls.dr_es_user = os.environ.get("DR_ES_USER") cls.dr_cloud_id = os.environ.get("DR_CLOUD_ID") cls.dr_elasticsearch_url = os.environ.get("DR_ELASTICSEARCH_URL") @@ -1446,7 +1445,7 @@ def setUpClass(cls): super().setUpClass() def run_esql_test(self, esql_query, expectation, message): - """Test that the endpoint schema query validators are working correctly.""" + """Test that the query validation is working correctly.""" rc = RuleCollection() file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) original_production_rule = load_rule_contents(file_path) @@ -1460,6 +1459,7 @@ def run_esql_test(self, esql_query, expectation, message): rc.load_dict(production_rule) def test_esql_queries(self): + """Test ESQL queries.""" test_cases = [ # invalid queries ('from .ds-logs-endpoint.events.process-default-* | wheres process.name like "Microsoft*"', From 5c3755740237740d85895ed0d2e8bbd469497c3f Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Mon, 27 Nov 2023 23:59:10 -0700 Subject: [PATCH 09/14] removed remote validation from the marshmallow load flow --- detection_rules/misc.py | 41 ------ detection_rules/packaging.py | 3 +- detection_rules/remote_validation.py | 140 ++++++++++++++++++ detection_rules/rule.py | 36 ++--- detection_rules/rule_validators.py | 28 +--- tests/data/__init__.py | 4 + tests/test_all_rules.py | 210 ++------------------------- tests/test_rules_remote.py | 18 +++ tests/test_specific_rules.py | 186 ++++++++++++++++++++++++ 9 files changed, 386 insertions(+), 280 deletions(-) create mode 100644 detection_rules/remote_validation.py create mode 100644 tests/test_rules_remote.py create mode 100644 tests/test_specific_rules.py diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 96db58b0ecb..e940f920316 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -11,7 +11,6 @@ from pathlib import Path from functools import wraps -from threading import Lock from typing import NoReturn import click @@ -447,43 +446,3 @@ def _wrapped(*args, **kwargs): return _wrapped return _wrapper - - -class ElasticsearchClientSingleton: - """Singleton for Elasticsearch client.""" - _instance = None - _lock = Lock() - - def __new__(cls, *args, **kwargs): - """Create a new instance of the Elasticsearch client.""" - with cls._lock: # ensures thread-safe singleton initialization - if cls._instance is None: - cls._instance = super(ElasticsearchClientSingleton, cls).__new__(cls) - # Initialize the client here - # export DR_VALIDATE_ESQL=true - # export DR_CLOUD_ID="" or export DR_ELASTICSEARCH_URL="" - # export DR_ES_PASSWORD="" - # export DR_ES_USER="" - es_client_args = { - "es_password": getdefault("es_password")(), - "es_user": getdefault("es_user")(), - "ignore_ssl_errors": True - } - - cloud_id = getdefault('cloud_id')() - elasticsearch_url = getdefault('elasticsearch_url')() - - if cloud_id: - es_client_args["cloud_id"] = cloud_id - elif not cloud_id and elasticsearch_url: - es_client_args["elasticsearch_url"] = elasticsearch_url - else: - raise ClientError("Either the cloud_id or elasticsearch_url must be set.") - - cls._instance.client = get_elasticsearch_client(**es_client_args) - return cls._instance - - @classmethod - def get_client(cls): - """Get the Elasticsearch client instance.""" - return cls()._instance.client diff --git a/detection_rules/packaging.py b/detection_rules/packaging.py index 6251c68b183..d09bf16fe77 100644 --- a/detection_rules/packaging.py +++ b/detection_rules/packaging.py @@ -277,8 +277,9 @@ def get_summary_rule_info(r: TOMLRule): r = r.contents rule_str = f'{r.name:<{longest_name}} (v:{r.autobumped_version} t:{r.data.type}' if isinstance(rule.contents.data, QueryRuleData): + index = rule.contents.data.get("index") or [] rule_str += f'-{r.data.language}' - rule_str += f'(indexes:{"".join(index_map[idx] for idx in rule.contents.data.index) or "none"}' + rule_str += f'(indexes:{"".join(index_map[idx] for idx in index) or "none"}' return rule_str diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py new file mode 100644 index 00000000000..1e5979cc042 --- /dev/null +++ b/detection_rules/remote_validation.py @@ -0,0 +1,140 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +from functools import cached_property +from multiprocessing.pool import ThreadPool +from typing import List, Optional + +import elasticsearch +from elasticsearch import Elasticsearch +from marshmallow import ValidationError +from requests import HTTPError + +from kibana import Kibana + +from .misc import ClientError, getdefault, get_elasticsearch_client, get_kibana_client +from .rule import ( + AnyRuleData, ESQLRuleData, QueryRuleData, ThresholdQueryRuleData, ThreatMatchRuleData, MachineLearningRuleData, + EQLRuleData, NewTermsRuleData, TOMLRule +) + + +class RemoteConnector: + """Base client class for remote validation and testing.""" + + def __init__(self, parse_config: bool = False): + es_args = ['cloud_id', 'ignore_ssl_errors', 'elasticsearch_url', 'es_user', 'es_password', 'timeout'] + kibana_args = [ + 'cloud_id', 'ignore_ssl_errors', 'kibana_url', 'kibana_user', 'kibana_password', 'space', 'kibana_cookie', + 'provider_type', 'provider_name' + ] + + if parse_config: + es_kwargs = {arg: getdefault(arg)() for arg in es_args} + kibana_kwargs = {arg: getdefault(arg)() for arg in kibana_args} + + try: + self.es_client = get_elasticsearch_client(**es_kwargs) + except ClientError: + self.es_client = None + + try: + self.kibana_client = get_kibana_client(**kibana_kwargs) + except HTTPError: + self.kibana_client = None + + @staticmethod + def auth_es(*, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, + elasticsearch_url: Optional[str] = None, es_user: Optional[str] = None, + es_password: Optional[str] = None, timeout: Optional[int] = None) -> Elasticsearch: + """Return an authenticated Elasticsearch client.""" + client = get_elasticsearch_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, + elasticsearch_url=elasticsearch_url, es_user=es_user, es_password=es_password, + timeout=timeout) + return client + + @staticmethod + def auth_kibana(*, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, + kibana_url: Optional[str] = None, kibana_user: Optional[str] = None, + kibana_password: Optional[str] = None, space: Optional[str] = None, + kibana_cookie: Optional[str] = None, provider_type: Optional[str] = None, + provider_name: Optional[str] = None) -> Kibana: + """Return an authenticated Kibana client.""" + client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, kibana_url=kibana_url, + kibana_user=kibana_user, kibana_password=kibana_password, space=space, + kibana_cookie=kibana_cookie, provider_type=provider_type, + provider_name=provider_name) + return client + + +class RemoteValidator(RemoteConnector): + """Client class for remote validation.""" + + def __init__(self, parse_config: bool = False): + super(RemoteValidator, self).__init__(parse_config=parse_config) + + @cached_property + def get_validate_methods(self) -> List[str]: + """Return all validate methods.""" + exempt = ('validate_rule', 'validate_rules') + methods = [m for m in self.__dir__() if m.startswith('validate_') and m not in exempt] + return methods + + def get_validate_method(self, name: str) -> callable: + """Return validate method by name.""" + assert name in self.get_validate_methods, f'validate method {name} not found' + return getattr(self, name) + + def validate_rule(self, data: AnyRuleData): + """Validate a single rule query.""" + method = self.get_validate_method(f'validate_{data.type}') + return method(data) + + def validate_rules(self, rules: List[TOMLRule], threads: int = 50): + """Validate a collection of rules via threads.""" + responses = {} + + def request(d: AnyRuleData): + try: + responses[d.rule_id] = self.validate_rule(d) + except ValidationError as e: + responses[d.rule_id] = e.messages + + pool = ThreadPool(processes=threads) + pool.map(request, [r.contents.data for r in rules]) + pool.close() + pool.join() + + return responses + + def validate_esql(self, data: ESQLRuleData): + headers = {"accept": "application/json", "content-type": "application/json"} + body = {'query': f'{data.query} | LIMIT 0'} + try: + response = self.es_client.perform_request('POST', '/_query', headers=headers, params={'pretty': True}, + body=body) + except elasticsearch.BadRequestError as exc: + raise ValidationError(f'ES|QL query failed: {exc}') + + return response.body + + def validate_query(self, data: QueryRuleData): + """Validate query for "query" rule types.""" + + def validate_threshold(self, data: ThresholdQueryRuleData): + """Validate query for "threshold" rule types.""" + + def validate_eql(self, data: EQLRuleData): + """Validate query for "eql" rule types.""" + + def validate_new_terms(self, data: NewTermsRuleData): + """Validate query for "new_terms" rule types.""" + + def validate_threat_match(self, data: ThreatMatchRuleData): + """Validate query for "threat_match" rule types.""" + + def validate_machine_learning(self, data: MachineLearningRuleData): + """Validate query for "machine_learning" rule types.""" + # TODO ??? diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 4e0243aa429..dff463c25df 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -505,6 +505,10 @@ def unique_fields(self) -> Any: def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: raise NotImplementedError() + @staticmethod + def validate_remote(remote_validator: 'RemoteValidator', data: 'QueryRuleData'): + remote_validator.validate_rule(data) + @cached def get_required_fields(self, index: str) -> List[dict]: """Retrieves fields needed for the query along with type information from the schema.""" @@ -569,6 +573,8 @@ def validator(self) -> Optional[QueryValidator]: return KQLValidator(self.query) elif self.language == "eql": return EQLValidator(self.query) + elif self.language == 'esql': + return ESQLValidator(self.query) def validate_query(self, meta: RuleMeta) -> None: validator = self.validator @@ -594,9 +600,8 @@ def get_required_fields(self, index: str) -> List[dict]: return validator.get_required_fields(index or []) @validates_schema - def validate_exceptions(self, data, **kwargs): + def validates_query_data(self, data, **kwargs): """Custom validation for query rule type and subclasses.""" - # alert suppression is only valid for query rule type and not any of its subclasses if data.get('alert_suppression') and data['type'] != 'query': raise ValidationError("Alert suppression is only valid for query rule type.") @@ -715,18 +720,17 @@ def interval_ratio(self) -> Optional[float]: @dataclass(frozen=True) -class ESQLRuleData(BaseRuleData): +class ESQLRuleData(QueryRuleData): """ESQL rules are a special case of query rules.""" type: Literal["esql"] language: Literal["esql"] query: str - @cached_property - def validator(self) -> Optional[QueryValidator]: - return ESQLValidator(self.query) - - def validate_query(self, meta: RuleMeta) -> None: - return self.validator.validate(self, meta) + @validates_schema + def validates_esql_data(self, data, **kwargs): + """Custom validation for query rule type and subclasses.""" + if data.get('index'): + raise ValidationError("Index is not a valid field for ES|QL rule type.") @dataclass(frozen=True) @@ -1100,14 +1104,11 @@ def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta, packaged_integrations = [] datasets = set() - if data.type != "esql": - # skip ES|QL rules until ast is available - - for node in data.get('ast', []): - if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': - datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) - elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset': - datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value))) + for node in data.get('ast') or []: + if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': + datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) + elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset': + datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value))) # integration is None to remove duplicate references upstream in Kibana # chronologically, event.dataset is checked for package:integration, then rule tags @@ -1353,3 +1354,4 @@ def get_unique_query_fields(rule: TOMLRule) -> List[str]: # avoid a circular import from .rule_validators import EQLValidator, ESQLValidator, KQLValidator # noqa: E402 +from .remote_validation import RemoteValidator # noqa: E402 diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index b2883905055..0b2c165d5bf 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -4,11 +4,9 @@ # 2.0. """Validation logic for rules containing queries.""" -import os from functools import cached_property from typing import List, Optional, Tuple, Union -import elasticsearch import eql from marshmallow import ValidationError from semver import Version @@ -18,7 +16,7 @@ from . import ecs, endgame from .integrations import (get_integration_schema_data, load_integrations_manifests) -from .misc import ElasticsearchClientSingleton, load_current_package_version +from .misc import load_current_package_version from .rule import (EQLRuleData, QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents, set_eql_config) from .schemas import get_stack_schemas @@ -354,30 +352,20 @@ def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) - class ESQLValidator(QueryValidator): """Specific fields for ESQL query event types.""" + @cached_property + def ast(self): + return None + @cached_property def unique_fields(self) -> List[str]: """Return a list of unique fields in the query.""" - # return empty list for ES|QL rules until ast is available + # return empty list for ES|QL rules until ast is available (friendlier than raising error) + # raise NotImplementedError('ES|QL query parsing not yet supported') return [] def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: """Validate an ESQL query while checking TOMLRule.""" - if not os.environ.get("DR_VALIDATE_ESQL"): - print(f"Skipping ES|QL validation for {data.name} - {data.rule_id}") - return - - if Version.parse(meta.min_stack_version) < Version.parse("8.11.0"): - raise ValidationError(f"Rule minstack must be greater than 8.10.0 {data.rule_id}") - - client = ElasticsearchClientSingleton.get_client() - client.info() - headers = {"accept": "application/json", "content-type": "application/json"} - try: - client.perform_request("POST", "/_query", params={"pretty": True}, - headers=headers, - body={"query": f"{self.query} | LIMIT 0"}) - except elasticsearch.BadRequestError as exc: - raise ValidationError(f"ESQL query failed: {exc}") + # temporarily override to NOP until ES|QL query parsing is supported def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[ ValidationError, None, ValueError]: diff --git a/tests/data/__init__.py b/tests/data/__init__.py index e69de29bb2d..72ea1f6e244 100644 --- a/tests/data/__init__.py +++ b/tests/data/__init__.py @@ -0,0 +1,4 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 4106d023615..74f7aee965f 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -10,13 +10,10 @@ import uuid import warnings from collections import defaultdict -from copy import deepcopy from pathlib import Path import eql.ast -import marshmallow -import pytest -from contextlib import nullcontext as does_not_raise + from marshmallow import ValidationError from semver import Version @@ -30,11 +27,10 @@ from detection_rules.packaging import current_stack_version from detection_rules.rule import (QueryRuleData, QueryValidator, TOMLRuleContents) -from detection_rules.rule_loader import FILE_PATTERN, RuleCollection +from detection_rules.rule_loader import FILE_PATTERN from detection_rules.rule_validators import EQLValidator, ESQLValidator, KQLValidator from detection_rules.schemas import definitions, get_stack_schemas -from detection_rules.utils import (INTEGRATION_RULE_DIR, PatchedTemplate, - get_path, load_etc_dump, load_rule_contents) +from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump from detection_rules.version_lock import default_version_lock from rta import get_available_tests @@ -300,7 +296,7 @@ def test_required_tags(self): missing_required_tags = set() if isinstance(rule.contents.data, QueryRuleData): - for index in rule.contents.data.index: + for index in rule.contents.data.get('index') or []: expected_tags = required_tags_map.get(index, {}) expected_all = expected_tags.get('all', []) expected_any = expected_tags.get('any', []) @@ -615,6 +611,9 @@ def test_integration_tag(self): valid_integration_folders = [p.name for p in list(Path(INTEGRATION_RULE_DIR).glob("*")) if p.name != 'endpoint'] for rule in self.production_rules: + # TODO: temp bypass for esql rules; once parsed, we should be able to look for indexes via `FROM` + if not rule.contents.data.get('index'): + continue if isinstance(rule.contents.data, QueryRuleData) and rule.contents.data.language != 'lucene': rule_integrations = rule.contents.metadata.get('integration') or [] rule_integrations = [rule_integrations] if isinstance(rule_integrations, str) else rule_integrations @@ -623,7 +622,7 @@ def test_integration_tag(self): meta = rule.contents.metadata package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) package_integrations_list = list(set([integration["package"] for integration in package_integrations])) - indices = data.get('index') + indices = data.get('index') or [] for rule_integration in rule_integrations: if ("even.dataset" in rule.contents.data.query and not package_integrations and # noqa: W504 not rule_promotion and rule_integration not in definitions.NON_DATASET_PACKAGES): # noqa: W504 @@ -667,7 +666,7 @@ def test_integration_tag(self): "f3e22c8b-ea47-45d1-b502-b57b6de950b3" ] if any([re.search("|".join(non_dataset_packages), i, re.IGNORECASE) - for i in rule.contents.data.index]): + for i in rule.contents.data.get('index') or []]): if not rule.contents.metadata.integration and rule.id not in ignore_ids and \ rule.contents.data.type not in definitions.MACHINE_LEARNING: err_msg = f'substrings {non_dataset_packages} found in '\ @@ -1183,35 +1182,6 @@ def test_rule_risk_score_severity_mismatch(self): self.fail(err_msg) -class TestEndpointQuery(BaseRuleTest): - """Test endpoint-specific rules.""" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.3.0"), - "Test only applicable to 8.3+ stacks since query updates are min_stacked at 8.3.0") - def test_os_and_platform_in_query(self): - """Test that all endpoint rules have an os defined and linux includes platform.""" - for rule in self.production_rules: - if not rule.contents.data.get('language') in ('eql', 'kuery'): - continue - if rule.path.parent.name not in ('windows', 'macos', 'linux'): - # skip cross-platform for now - continue - - ast = rule.contents.data.ast - fields = [str(f) for f in ast if isinstance(f, (kql.ast.Field, eql.ast.Field))] - - err_msg = f'{self.rule_str(rule)} missing required field for endpoint rule' - if 'host.os.type' not in fields: - # Exception for Forwarded Events which contain Windows-only fields. - if rule.path.parent.name == 'windows' and not any(field.startswith('winlog.') for field in fields): - self.assertIn('host.os.type', fields, err_msg) - - # going to bypass this for now - # if rule.path.parent.name == 'linux': - # err_msg = f'{self.rule_str(rule)} missing required field for linux endpoint rule' - # self.assertIn('host.os.platform', fields, err_msg) - - class TestNoteMarkdownPlugins(BaseRuleTest): """Test if a guide containing Osquery Plugin syntax contains the version note.""" @@ -1325,165 +1295,3 @@ def test_group_field_in_schemas(self): if fld not in schema.keys(): self.fail(f"{self.rule_str(rule)} alert suppression field {fld} not \ found in ECS, Beats, or non-ecs schemas") - - -class TestNewTerms(BaseRuleTest): - """Test new term rules.""" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), - "Test only applicable to 8.4+ stacks for new terms feature.") - def test_history_window_start(self): - """Test new terms history window start field.""" - - for rule in self.production_rules: - if rule.contents.data.type == "new_terms": - - # validate history window start field exists and is correct - assert rule.contents.data.new_terms.history_window_start, \ - "new terms field found with no history_window_start field defined" - assert rule.contents.data.new_terms.history_window_start[0].field == "history_window_start", \ - f"{rule.contents.data.new_terms.history_window_start} should be 'history_window_start'" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), - "Test only applicable to 8.4+ stacks for new terms feature.") - def test_new_terms_field_exists(self): - # validate new terms and history window start fields are correct - for rule in self.production_rules: - if rule.contents.data.type == "new_terms": - assert rule.contents.data.new_terms.field == "new_terms_fields", \ - f"{rule.contents.data.new_terms.field} should be 'new_terms_fields' for new_terms rule type" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), - "Test only applicable to 8.4+ stacks for new terms feature.") - def test_new_terms_fields(self): - """Test new terms fields are schema validated.""" - # ecs validation - for rule in self.production_rules: - if rule.contents.data.type == "new_terms": - meta = rule.contents.metadata - feature_min_stack = Version.parse('8.4.0') - current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) - min_stack_version = Version.parse(meta.get("min_stack_version")) if \ - meta.get("min_stack_version") else None - min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ - current_package_version else min_stack_version - - assert min_stack_version >= feature_min_stack, \ - f"New Terms rule types only compatible with {feature_min_stack}+" - ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] - beats_version = get_stack_schemas()[str(min_stack_version)]['beats'] - - # checks if new terms field(s) are in ecs, beats non-ecs or integration schemas - queryvalidator = QueryValidator(rule.contents.data.query) - _, _, schema = queryvalidator.get_beats_schema([], beats_version, ecs_version) - integration_manifests = load_integrations_manifests() - integration_schemas = load_integrations_schemas() - integration_tags = meta.get("integration") - if integration_tags: - for tag in integration_tags: - latest_tag_compat_ver, _ = find_latest_compatible_version( - package=tag, - integration="", - rule_stack_version=min_stack_version, - packages_manifest=integration_manifests) - if latest_tag_compat_ver: - integration_schema = integration_schemas[tag][latest_tag_compat_ver] - for policy_template in integration_schema.keys(): - schema.update(**integration_schemas[tag][latest_tag_compat_ver][policy_template]) - for new_terms_field in rule.contents.data.new_terms.value: - assert new_terms_field in schema.keys(), \ - f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), - "Test only applicable to 8.4+ stacks for new terms feature.") - def test_new_terms_max_limit(self): - """Test new terms max limit.""" - # validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862 - for rule in self.production_rules: - if rule.contents.data.type == "new_terms": - meta = rule.contents.metadata - feature_min_stack = Version.parse('8.4.0') - feature_min_stack_extended_fields = Version.parse('8.6.0') - current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) - min_stack_version = Version.parse(meta.get("min_stack_version")) if \ - meta.get("min_stack_version") else None - min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ - current_package_version else min_stack_version - if min_stack_version >= feature_min_stack and \ - min_stack_version < feature_min_stack_extended_fields: - assert len(rule.contents.data.new_terms.value) == 1, \ - f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}" - - @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"), - "Test only applicable to 8.4+ stacks for new terms feature.") - def test_new_terms_fields_unique(self): - """Test new terms fields are unique.""" - # validate fields are unique - for rule in self.production_rules: - if rule.contents.data.type == "new_terms": - assert len(set(rule.contents.data.new_terms.value)) == len(rule.contents.data.new_terms.value), \ - f"new terms fields values are not unique - {rule.contents.data.new_terms.value}" - - -class TestESQLRules(BaseRuleTest): - """Test ESQL Rules.""" - - @classmethod - def setUpClass(cls): - """Set up test environment.""" - cls.dr_es_user = os.environ.get("DR_ES_USER") - cls.dr_cloud_id = os.environ.get("DR_CLOUD_ID") - cls.dr_elasticsearch_url = os.environ.get("DR_ELASTICSEARCH_URL") - cls.dr_validate_esql = os.environ.get("DR_VALIDATE_ESQL") - - if cls.dr_validate_esql is None: - raise unittest.SkipTest("Test only run when DR_VALIDATE_ESQL environment variable set.") - assert cls.dr_es_user is not None, "DR_ES_USER environment variable is not set." - assert cls.dr_cloud_id is not None or cls.dr_elasticsearch_url is not None, \ - "Either DR_CLOUD_ID or DR_ELASTICSEARCH_URL must be set." - - super().setUpClass() - - def run_esql_test(self, esql_query, expectation, message): - """Test that the query validation is working correctly.""" - rc = RuleCollection() - file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) - original_production_rule = load_rule_contents(file_path) - - # Test that a ValidationError is raised if the query doesn't match the schema - production_rule = deepcopy(original_production_rule)[0] - production_rule["rule"]["query"] = esql_query - - expectation.match_expr = message - with expectation: - rc.load_dict(production_rule) - - def test_esql_queries(self): - """Test ESQL queries.""" - test_cases = [ - # invalid queries - ('from .ds-logs-endpoint.events.process-default-* | wheres process.name like "Microsoft*"', - pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), - ('from .ds-logs-endpoint.events.process-default-* | where process.names like "Microsoft*"', - pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), - - # valid queries - ('from .ds-logs-endpoint.events.process-default-* | where process.name like "Microsoft*"', - does_not_raise(), None), - ] - for esql_query, expectation, message in test_cases: - self.run_esql_test(esql_query, expectation, message) - - def test_esql_rules(self): - """Test ESQL rules.""" - for rule in self.production_rules: - if rule.contents.data.type == "esql": - - # Stub test to validate esql rules - assert rule.contents.data.language == "esql", \ - f"{self.rule_str(rule)} is not an ES|QL rule type" - - # Validate that the rule does not contain enrich - # until we support it - assert "enrich" not in rule.contents.data.query.lower(), \ - f"{self.rule_str(rule)} is an ES|QL rule type and contains enrich" diff --git a/tests/test_rules_remote.py b/tests/test_rules_remote.py new file mode 100644 index 00000000000..9b9b6841d89 --- /dev/null +++ b/tests/test_rules_remote.py @@ -0,0 +1,18 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +from .base import BaseRuleTest +from detection_rules.remote_validation import RemoteValidator + + +class TestRemoteRules(BaseRuleTest): + """Test rules against a remote Elastic stack instance.""" + + def test_esql_rules(self): + """Temporarily explicitly test all ES|QL rules remotely pending parsing lib.""" + esql_rules = [r for r in self.all_rules if r.contents.data.type == 'esql'] + # TODO: assert config is present + rv = RemoteValidator(parse_config=True) + rv.validate_rules(esql_rules) diff --git a/tests/test_specific_rules.py b/tests/test_specific_rules.py new file mode 100644 index 00000000000..f844f89f4aa --- /dev/null +++ b/tests/test_specific_rules.py @@ -0,0 +1,186 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. + +import unittest +from copy import deepcopy +from pathlib import Path + +import eql.ast + +from semver import Version + +import kql +from detection_rules.integrations import ( + find_latest_compatible_version, load_integrations_manifests, load_integrations_schemas +) +from detection_rules.misc import load_current_package_version +from detection_rules.packaging import current_stack_version +from detection_rules.rule import QueryValidator +from detection_rules.rule_loader import RuleCollection +from detection_rules.schemas import get_stack_schemas +from detection_rules.utils import get_path, load_rule_contents + +from .base import BaseRuleTest +PACKAGE_STACK_VERSION = Version.parse(current_stack_version(), optional_minor_and_patch=True) + + +class TestEndpointQuery(BaseRuleTest): + """Test endpoint-specific rules.""" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.3.0"), + "Test only applicable to 8.3+ stacks since query updates are min_stacked at 8.3.0") + def test_os_and_platform_in_query(self): + """Test that all endpoint rules have an os defined and linux includes platform.""" + for rule in self.production_rules: + if not rule.contents.data.get('language') in ('eql', 'kuery'): + continue + if rule.path.parent.name not in ('windows', 'macos', 'linux'): + # skip cross-platform for now + continue + + ast = rule.contents.data.ast + fields = [str(f) for f in ast if isinstance(f, (kql.ast.Field, eql.ast.Field))] + + err_msg = f'{self.rule_str(rule)} missing required field for endpoint rule' + if 'host.os.type' not in fields: + # Exception for Forwarded Events which contain Windows-only fields. + if rule.path.parent.name == 'windows' and not any(field.startswith('winlog.') for field in fields): + self.assertIn('host.os.type', fields, err_msg) + + # going to bypass this for now + # if rule.path.parent.name == 'linux': + # err_msg = f'{self.rule_str(rule)} missing required field for linux endpoint rule' + # self.assertIn('host.os.platform', fields, err_msg) + + +class TestNewTerms(BaseRuleTest): + """Test new term rules.""" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_history_window_start(self): + """Test new terms history window start field.""" + + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + + # validate history window start field exists and is correct + assert rule.contents.data.new_terms.history_window_start, \ + "new terms field found with no history_window_start field defined" + assert rule.contents.data.new_terms.history_window_start[0].field == "history_window_start", \ + f"{rule.contents.data.new_terms.history_window_start} should be 'history_window_start'" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_field_exists(self): + # validate new terms and history window start fields are correct + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + assert rule.contents.data.new_terms.field == "new_terms_fields", \ + f"{rule.contents.data.new_terms.field} should be 'new_terms_fields' for new_terms rule type" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_fields(self): + """Test new terms fields are schema validated.""" + # ecs validation + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + meta = rule.contents.metadata + feature_min_stack = Version.parse('8.4.0') + current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) + min_stack_version = Version.parse(meta.get("min_stack_version")) if \ + meta.get("min_stack_version") else None + min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ + current_package_version else min_stack_version + + assert min_stack_version >= feature_min_stack, \ + f"New Terms rule types only compatible with {feature_min_stack}+" + ecs_version = get_stack_schemas()[str(min_stack_version)]['ecs'] + beats_version = get_stack_schemas()[str(min_stack_version)]['beats'] + + # checks if new terms field(s) are in ecs, beats non-ecs or integration schemas + queryvalidator = QueryValidator(rule.contents.data.query) + _, _, schema = queryvalidator.get_beats_schema([], beats_version, ecs_version) + integration_manifests = load_integrations_manifests() + integration_schemas = load_integrations_schemas() + integration_tags = meta.get("integration") + if integration_tags: + for tag in integration_tags: + latest_tag_compat_ver, _ = find_latest_compatible_version( + package=tag, + integration="", + rule_stack_version=min_stack_version, + packages_manifest=integration_manifests) + if latest_tag_compat_ver: + integration_schema = integration_schemas[tag][latest_tag_compat_ver] + for policy_template in integration_schema.keys(): + schema.update(**integration_schemas[tag][latest_tag_compat_ver][policy_template]) + for new_terms_field in rule.contents.data.new_terms.value: + assert new_terms_field in schema.keys(), \ + f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.4.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_max_limit(self): + """Test new terms max limit.""" + # validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862 + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + meta = rule.contents.metadata + feature_min_stack = Version.parse('8.4.0') + feature_min_stack_extended_fields = Version.parse('8.6.0') + current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True) + min_stack_version = Version.parse(meta.get("min_stack_version")) if \ + meta.get("min_stack_version") else None + min_stack_version = current_package_version if min_stack_version is None or min_stack_version < \ + current_package_version else min_stack_version + if feature_min_stack <= min_stack_version < feature_min_stack_extended_fields: + assert len(rule.contents.data.new_terms.value) == 1, \ + f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}" + + @unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"), + "Test only applicable to 8.4+ stacks for new terms feature.") + def test_new_terms_fields_unique(self): + """Test new terms fields are unique.""" + # validate fields are unique + for rule in self.production_rules: + if rule.contents.data.type == "new_terms": + assert len(set(rule.contents.data.new_terms.value)) == len(rule.contents.data.new_terms.value), \ + f"new terms fields values are not unique - {rule.contents.data.new_terms.value}" + + +class TestESQLRules(BaseRuleTest): + """Test ESQL Rules.""" + + def run_esql_test(self, esql_query, expectation, message): + """Test that the query validation is working correctly.""" + rc = RuleCollection() + file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml")) + original_production_rule = load_rule_contents(file_path) + + # Test that a ValidationError is raised if the query doesn't match the schema + production_rule = deepcopy(original_production_rule)[0] + production_rule["rule"]["query"] = esql_query + + expectation.match_expr = message + with expectation: + rc.load_dict(production_rule) + + def test_esql_queries(self): + """Test ESQL queries.""" + # test_cases = [ + # # invalid queries + # ('from .ds-logs-endpoint.events.process-default-* | wheres process.name like "Microsoft*"', + # pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), + # ('from .ds-logs-endpoint.events.process-default-* | where process.names like "Microsoft*"', + # pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"), + # + # # valid queries + # ('from .ds-logs-endpoint.events.process-default-* | where process.name like "Microsoft*"', + # does_not_raise(), None), + # ] + # for esql_query, expectation, message in test_cases: + # self.run_esql_test(esql_query, expectation, message) From 25f122ca53f4400dcd99aefad9dfa39cc8fd89cb Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Thu, 30 Nov 2023 09:15:18 -0700 Subject: [PATCH 10/14] resolve conflicts from upstream --- tests/test_all_rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 5b3b6a0ef2a..7efc08e7b6c 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -28,7 +28,7 @@ from detection_rules.rule import (QueryRuleData, QueryValidator, TOMLRuleContents) from detection_rules.rule_loader import FILE_PATTERN -from detection_rules.rule_validators import EQLValidator, ESQLValidator, KQLValidator +from detection_rules.rule_validators import EQLValidator, KQLValidator from detection_rules.schemas import definitions, get_stack_schemas from detection_rules.utils import INTEGRATION_RULE_DIR, PatchedTemplate, get_path, load_etc_dump from detection_rules.version_lock import default_version_lock From 63401ea60f5a1d98dc7b8f733e938a45ff182fb3 Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Thu, 30 Nov 2023 10:41:59 -0700 Subject: [PATCH 11/14] set clients in RemoteConnector from auth methods --- detection_rules/remote_validation.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py index 1e5979cc042..1a695691455 100644 --- a/detection_rules/remote_validation.py +++ b/detection_rules/remote_validation.py @@ -45,28 +45,27 @@ def __init__(self, parse_config: bool = False): except HTTPError: self.kibana_client = None - @staticmethod - def auth_es(*, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, + def auth_es(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, elasticsearch_url: Optional[str] = None, es_user: Optional[str] = None, es_password: Optional[str] = None, timeout: Optional[int] = None) -> Elasticsearch: """Return an authenticated Elasticsearch client.""" - client = get_elasticsearch_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, - elasticsearch_url=elasticsearch_url, es_user=es_user, es_password=es_password, - timeout=timeout) - return client + self.es_client = get_elasticsearch_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, + elasticsearch_url=elasticsearch_url, es_user=es_user, + es_password=es_password, timeout=timeout) + return self.es_client - @staticmethod - def auth_kibana(*, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, + def auth_kibana(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, kibana_url: Optional[str] = None, kibana_user: Optional[str] = None, kibana_password: Optional[str] = None, space: Optional[str] = None, kibana_cookie: Optional[str] = None, provider_type: Optional[str] = None, provider_name: Optional[str] = None) -> Kibana: """Return an authenticated Kibana client.""" - client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, kibana_url=kibana_url, - kibana_user=kibana_user, kibana_password=kibana_password, space=space, - kibana_cookie=kibana_cookie, provider_type=provider_type, - provider_name=provider_name) - return client + self.kibana_client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, + kibana_url=kibana_url, kibana_user=kibana_user, + kibana_password=kibana_password, space=space, + kibana_cookie=kibana_cookie, provider_type=provider_type, + provider_name=provider_name) + return self.kibana_client class RemoteValidator(RemoteConnector): From a43d6bfefefa951630dfa56294b70c1de9f31834 Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Wed, 6 Dec 2023 23:40:40 -0700 Subject: [PATCH 12/14] thread remote rules; add engine test --- detection_rules/misc.py | 8 +- detection_rules/remote_validation.py | 126 ++++++++++++++++++++------- detection_rules/rule.py | 8 +- tests/test_rules_remote.py | 4 + 4 files changed, 107 insertions(+), 39 deletions(-) diff --git a/detection_rules/misc.py b/detection_rules/misc.py index e940f920316..82d8e4893af 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -11,7 +11,7 @@ from pathlib import Path from functools import wraps -from typing import NoReturn +from typing import NoReturn, Optional import click import requests @@ -270,12 +270,16 @@ def load_current_package_version() -> str: return load_etc_dump('packages.yml')['package']['name'] +def get_default_config() -> Optional[Path]: + return next(Path(get_path()).glob('.detection-rules-cfg.*'), None) + + @cached def parse_config(): """Parse a default config file.""" import eql - config_file = next(Path(get_path()).glob('.detection-rules-cfg.*'), None) + config_file = get_default_config() config = {} if config_file and config_file.exists(): diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py index 1a695691455..207405f14c4 100644 --- a/detection_rules/remote_validation.py +++ b/detection_rules/remote_validation.py @@ -3,9 +3,11 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. +from dataclasses import dataclass +from datetime import datetime from functools import cached_property from multiprocessing.pool import ThreadPool -from typing import List, Optional +from typing import Dict, List, Optional import elasticsearch from elasticsearch import Elasticsearch @@ -15,16 +17,26 @@ from kibana import Kibana from .misc import ClientError, getdefault, get_elasticsearch_client, get_kibana_client -from .rule import ( - AnyRuleData, ESQLRuleData, QueryRuleData, ThresholdQueryRuleData, ThreatMatchRuleData, MachineLearningRuleData, - EQLRuleData, NewTermsRuleData, TOMLRule -) +from .rule import TOMLRule, TOMLRuleContents +from .schemas import definitions + + +@dataclass +class RemoteValidationResult: + """Dataclass for remote validation results.""" + rule_id: definitions.UUIDString + rule_name: str + contents: dict + query_results: Optional[dict] + engine_results: Optional[dict] class RemoteConnector: """Base client class for remote validation and testing.""" - def __init__(self, parse_config: bool = False): + MAX_RETRIES = 5 + + def __init__(self, parse_config: bool = False, **kwargs): es_args = ['cloud_id', 'ignore_ssl_errors', 'elasticsearch_url', 'es_user', 'es_password', 'timeout'] kibana_args = [ 'cloud_id', 'ignore_ssl_errors', 'kibana_url', 'kibana_user', 'kibana_password', 'space', 'kibana_cookie', @@ -36,35 +48,39 @@ def __init__(self, parse_config: bool = False): kibana_kwargs = {arg: getdefault(arg)() for arg in kibana_args} try: - self.es_client = get_elasticsearch_client(**es_kwargs) + if 'max_retries' not in es_kwargs: + es_kwargs['max_retries'] = self.MAX_RETRIES + self.es_client = get_elasticsearch_client(**es_kwargs, **kwargs) except ClientError: self.es_client = None try: - self.kibana_client = get_kibana_client(**kibana_kwargs) + self.kibana_client = get_kibana_client(**kibana_kwargs, **kwargs) except HTTPError: self.kibana_client = None def auth_es(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, elasticsearch_url: Optional[str] = None, es_user: Optional[str] = None, - es_password: Optional[str] = None, timeout: Optional[int] = None) -> Elasticsearch: + es_password: Optional[str] = None, timeout: Optional[int] = None, **kwargs) -> Elasticsearch: """Return an authenticated Elasticsearch client.""" + if 'max_retries' not in kwargs: + kwargs['max_retries'] = self.MAX_RETRIES self.es_client = get_elasticsearch_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, elasticsearch_url=elasticsearch_url, es_user=es_user, - es_password=es_password, timeout=timeout) + es_password=es_password, timeout=timeout, **kwargs) return self.es_client def auth_kibana(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, kibana_url: Optional[str] = None, kibana_user: Optional[str] = None, kibana_password: Optional[str] = None, space: Optional[str] = None, kibana_cookie: Optional[str] = None, provider_type: Optional[str] = None, - provider_name: Optional[str] = None) -> Kibana: + provider_name: Optional[str] = None, **kwargs) -> Kibana: """Return an authenticated Kibana client.""" self.kibana_client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, kibana_url=kibana_url, kibana_user=kibana_user, kibana_password=kibana_password, space=space, kibana_cookie=kibana_cookie, provider_type=provider_type, - provider_name=provider_name) + provider_name=provider_name, **kwargs) return self.kibana_client @@ -86,54 +102,98 @@ def get_validate_method(self, name: str) -> callable: assert name in self.get_validate_methods, f'validate method {name} not found' return getattr(self, name) - def validate_rule(self, data: AnyRuleData): + @staticmethod + def prep_for_preview(contents: TOMLRuleContents) -> dict: + """Prepare rule for preview.""" + end_time = datetime.utcnow().isoformat() + dumped = contents.to_api_format().copy() + dumped.update(timeframeEnd=end_time, invocationCount=1) + return dumped + + def engine_preview(self, contents: TOMLRuleContents) -> dict: + """Get results from detection engine preview API.""" + dumped = self.prep_for_preview(contents) + return self.kibana_client.post('/api/detection_engine/rules/preview', json=dumped) + + def validate_rule(self, contents: TOMLRuleContents) -> RemoteValidationResult: """Validate a single rule query.""" - method = self.get_validate_method(f'validate_{data.type}') - return method(data) + method = self.get_validate_method(f'validate_{contents.data.type}') + query_results = method(contents) + engine_results = self.engine_preview(contents) + return RemoteValidationResult(contents.data.rule_id, contents.data.name, contents.to_api_format(), + query_results, engine_results) - def validate_rules(self, rules: List[TOMLRule], threads: int = 50): + def validate_rules(self, rules: List[TOMLRule], threads: int = 5) -> Dict[str, RemoteValidationResult]: """Validate a collection of rules via threads.""" responses = {} - def request(d: AnyRuleData): + def request(c: TOMLRuleContents): try: - responses[d.rule_id] = self.validate_rule(d) + responses[c.data.rule_id] = self.validate_rule(c) except ValidationError as e: - responses[d.rule_id] = e.messages + responses[c.data.rule_id] = e.messages pool = ThreadPool(processes=threads) - pool.map(request, [r.contents.data for r in rules]) + pool.map(request, [r.contents for r in rules]) pool.close() pool.join() return responses - def validate_esql(self, data: ESQLRuleData): + def validate_esql(self, contents: TOMLRuleContents) -> dict: + query = contents.data.query + rule_id = contents.data.rule_id headers = {"accept": "application/json", "content-type": "application/json"} - body = {'query': f'{data.query} | LIMIT 0'} + body = {'query': f'{query} | LIMIT 0'} try: response = self.es_client.perform_request('POST', '/_query', headers=headers, params={'pretty': True}, body=body) - except elasticsearch.BadRequestError as exc: - raise ValidationError(f'ES|QL query failed: {exc}') + except Exception as exc: + if isinstance(exc, elasticsearch.BadRequestError): + raise ValidationError(f'ES|QL query failed: {exc} for rule: {rule_id}, query: \n{query}') + else: + raise Exception(f'ES|QL query failed for rule: {rule_id}, query: \n{query}') from exc return response.body - def validate_query(self, data: QueryRuleData): + def validate_eql(self, contents: TOMLRuleContents) -> dict: + """Validate query for "eql" rule types.""" + query = contents.data.query + rule_id = contents.data.rule_id + index = contents.data.index + time_range = {"range": {"@timestamp": {"gt": 'now-1h/h', "lte": 'now', "format": "strict_date_optional_time"}}} + body = {'query': query} + try: + response = self.es_client.eql.search(index=index, body=body, ignore_unavailable=True, filter=time_range) + except Exception as exc: + if isinstance(exc, elasticsearch.BadRequestError): + raise ValidationError(f'EQL query failed: {exc} for rule: {rule_id}, query: \n{query}') + else: + raise Exception(f'EQL query failed for rule: {rule_id}, query: \n{query}') from exc + + return response.body + + @staticmethod + def validate_query(self, contents: TOMLRuleContents) -> dict: """Validate query for "query" rule types.""" + return {'results': 'Unable to remote validate query rules'} - def validate_threshold(self, data: ThresholdQueryRuleData): + @staticmethod + def validate_threshold(self, contents: TOMLRuleContents) -> dict: """Validate query for "threshold" rule types.""" + return {'results': 'Unable to remote validate threshold rules'} - def validate_eql(self, data: EQLRuleData): - """Validate query for "eql" rule types.""" - - def validate_new_terms(self, data: NewTermsRuleData): + @staticmethod + def validate_new_terms(self, contents: TOMLRuleContents) -> dict: """Validate query for "new_terms" rule types.""" + return {'results': 'Unable to remote validate new_terms rules'} - def validate_threat_match(self, data: ThreatMatchRuleData): + @staticmethod + def validate_threat_match(self, contents: TOMLRuleContents) -> dict: """Validate query for "threat_match" rule types.""" + return {'results': 'Unable to remote validate threat_match rules'} - def validate_machine_learning(self, data: MachineLearningRuleData): + @staticmethod + def validate_machine_learning(self, contents: TOMLRuleContents) -> dict: """Validate query for "machine_learning" rule types.""" - # TODO ??? + return {'results': 'Unable to remote validate machine_learning rules'} diff --git a/detection_rules/rule.py b/detection_rules/rule.py index e43dd36af01..a4c2b464a7c 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -505,10 +505,6 @@ def unique_fields(self) -> Any: def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None: raise NotImplementedError() - @staticmethod - def validate_remote(remote_validator: 'RemoteValidator', data: 'QueryRuleData'): - remote_validator.validate_rule(data) - @cached def get_required_fields(self, index: str) -> List[dict]: """Retrieves fields needed for the query along with type information from the schema.""" @@ -1145,6 +1141,10 @@ def post_conversion_validation(self, value: dict, **kwargs): data.data_validator.validate_bbr(metadata.get('bypass_bbr_timing')) data.validate(metadata) if hasattr(data, 'validate') else False + @staticmethod + def validate_remote(remote_validator: 'RemoteValidator', contents: 'TOMLRuleContents'): + remote_validator.validate_rule(contents) + def to_dict(self, strip_none_values=True) -> dict: # Load schemas directly from the data and metadata classes to avoid schema ambiguity which can # result from union fields which contain classes and related subclasses (AnyRuleData). See issue #1141 diff --git a/tests/test_rules_remote.py b/tests/test_rules_remote.py index 9b9b6841d89..d6dbba7eb3a 100644 --- a/tests/test_rules_remote.py +++ b/tests/test_rules_remote.py @@ -3,10 +3,14 @@ # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. +import unittest + from .base import BaseRuleTest +from detection_rules.misc import get_default_config from detection_rules.remote_validation import RemoteValidator +@unittest.skipIf(get_default_config() is None, 'Skipping remote validation due to missing config') class TestRemoteRules(BaseRuleTest): """Test rules against a remote Elastic stack instance.""" From 307c259176f44dc487e24399f71cb3dcf0b53928 Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Thu, 7 Dec 2023 20:13:02 -0700 Subject: [PATCH 13/14] Add versions to remote validation results --- detection_rules/remote_validation.py | 8 ++++++-- detection_rules/rule_validators.py | 2 +- tests/test_rules_remote.py | 13 ++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py index 207405f14c4..bab2646041b 100644 --- a/detection_rules/remote_validation.py +++ b/detection_rules/remote_validation.py @@ -16,7 +16,7 @@ from kibana import Kibana -from .misc import ClientError, getdefault, get_elasticsearch_client, get_kibana_client +from .misc import ClientError, getdefault, get_elasticsearch_client, get_kibana_client, load_current_package_version from .rule import TOMLRule, TOMLRuleContents from .schemas import definitions @@ -27,6 +27,8 @@ class RemoteValidationResult: rule_id: definitions.UUIDString rule_name: str contents: dict + rule_version: int + stack_version: str query_results: Optional[dict] engine_results: Optional[dict] @@ -120,8 +122,10 @@ def validate_rule(self, contents: TOMLRuleContents) -> RemoteValidationResult: method = self.get_validate_method(f'validate_{contents.data.type}') query_results = method(contents) engine_results = self.engine_preview(contents) + rule_version = contents.autobumped_version + stack_version = load_current_package_version() return RemoteValidationResult(contents.data.rule_id, contents.data.name, contents.to_api_format(), - query_results, engine_results) + rule_version, stack_version, query_results, engine_results) def validate_rules(self, rules: List[TOMLRule], threads: int = 5) -> Dict[str, RemoteValidationResult]: """Validate a collection of rules via threads.""" diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index bf3a591a6bd..9dcfdb468f2 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -350,7 +350,7 @@ def validate_rule_type_configurations(self, data: EQLRuleData, meta: RuleMeta) - class ESQLValidator(QueryValidator): - """Specific fields for ESQL query event types.""" + """Validate specific fields for ESQL query event types.""" @cached_property def ast(self): diff --git a/tests/test_rules_remote.py b/tests/test_rules_remote.py index d6dbba7eb3a..e422239ce62 100644 --- a/tests/test_rules_remote.py +++ b/tests/test_rules_remote.py @@ -7,16 +7,15 @@ from .base import BaseRuleTest from detection_rules.misc import get_default_config -from detection_rules.remote_validation import RemoteValidator +# from detection_rules.remote_validation import RemoteValidator @unittest.skipIf(get_default_config() is None, 'Skipping remote validation due to missing config') class TestRemoteRules(BaseRuleTest): """Test rules against a remote Elastic stack instance.""" - def test_esql_rules(self): - """Temporarily explicitly test all ES|QL rules remotely pending parsing lib.""" - esql_rules = [r for r in self.all_rules if r.contents.data.type == 'esql'] - # TODO: assert config is present - rv = RemoteValidator(parse_config=True) - rv.validate_rules(esql_rules) + # def test_esql_rules(self): + # """Temporarily explicitly test all ES|QL rules remotely pending parsing lib.""" + # esql_rules = [r for r in self.all_rules if r.contents.data.type == 'esql'] + # rv = RemoteValidator(parse_config=True) + # rv.validate_rules(esql_rules) From 44209c77528061082d18118d50a6462de2311ba4 Mon Sep 17 00:00:00 2001 From: brokensound77 Date: Fri, 8 Dec 2023 11:56:34 -0700 Subject: [PATCH 14/14] remove test rule file --- ...rare_microsoft_office_child_processes.toml | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 rules/windows/command_and_control_rare_microsoft_office_child_processes.toml diff --git a/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml b/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml deleted file mode 100644 index e3a08913284..00000000000 --- a/rules/windows/command_and_control_rare_microsoft_office_child_processes.toml +++ /dev/null @@ -1,37 +0,0 @@ -[metadata] -creation_date = "2023/02/27" -integration = ["endpoint"] -maturity = "production" -min_stack_comments = "ES|QL Rule" -min_stack_version = "8.11.0" -updated_date = "2023/06/22" - -[rule] -author = ["Elastic"] -description = """ -Detects rare child processes of Microsoft Office applications, which may indicate an attempt to execute malicious payloads or scripts. This detection focuses on instances where Microsoft Office processes spawn child processes that occur infrequently, potentially as part of a post-exploitation stage of an attack. -""" -from = "now-9m" -language = "esql" -license = "Elastic License v2" -name = "Rare Microsoft Office Child Processes" -risk_score = 47 -rule_id = "24220495-cffe-45a1-996c-37b599ba0e43" -severity = "medium" -tags = ["Data Source: Elastic Endpoint", "Domain: Endpoint", "OS: Windows", "Use Case: Threat Detection", "Tactic: Command and Control", "Data Source: Elastic Defend"] -timestamp_override = "event.ingested" -type = "esql" -query = ''' -from .ds-logs-endpoint.events.process-default-* - | where event.action == "start" and process.code_signature.subject_name like "Microsoft*" and process.parent.name in ("winword.exe", "WINWORD.EXE", "EXCEL.EXE", "excel.exe") - | eval process_path = replace(process.executable, """[cC]:\\[uU][sS][eE][rR][sS]\\[a-zA-Z0-9\.\-\_\$]+\\""", "C:\\\\users\\\\user\\\\") - | stats cc = count(*) by process_path, process.parent.name | where cc <= 5 -''' - -[[rule.threat]] -framework = "MITRE ATT&CK" - -[rule.threat.tactic] -id = "TA0011" -name = "Command and Control" -reference = "https://attack.mitre.org/tactics/TA0011/"