diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 4b87e2c79..0b56dafd1 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -228,7 +228,7 @@ def find_process_capabilities( def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress: bool = False ) -> Capabilities: all_process_matches: MatchResults = collections.defaultdict(list) all_thread_matches: MatchResults = collections.defaultdict(list) diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 5704bc97d..b644d4e1a 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -481,11 +481,11 @@ def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> raise NotImplementedError() -def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> DynamicFeatureExtractor: +def ProcessFilter(extractor: DynamicFeatureExtractor, pids: set[int]) -> DynamicFeatureExtractor: original_get_processes = extractor.get_processes def filtered_get_processes(self): - yield from (f for f in original_get_processes() if f.address.pid in processes) + yield from (f for f in original_get_processes() if f.address.pid in pids) # we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one. # this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code. @@ -497,7 +497,7 @@ def filtered_get_processes(self): return new_extractor -def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set) -> DynamicFeatureExtractor: +def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set[Address]) -> DynamicFeatureExtractor: original_get_threads = extractor.get_threads def filtered_get_threads(self, ph: ProcessHandle): diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index f5113272d..172d1afcb 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -636,7 +636,7 @@ def build_statements(d, scopes: Scopes): ) elif key == "sequence": - if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD)): + if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE)): raise InvalidRule("sequence subscope supported only for the process and thread scopes") if len(d[key]) != 1: diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 5e43227bc..fb4cd7e25 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -27,11 +27,14 @@ # ... import textwrap +from typing import Iterator from functools import lru_cache +import pytest import fixtures import capa.main +import capa.rules import capa.capabilities.dynamic from capa.features.extractors.base_extractor import ThreadFilter, DynamicFeatureExtractor @@ -62,7 +65,7 @@ def get_0000a657_thread3064(): return extractor -def get_call_ids(matches): +def get_call_ids(matches) -> Iterator[int]: for address, _ in matches: yield address.id @@ -96,7 +99,7 @@ def test_dynamic_call_scope(): assert 8 in get_call_ids(capabilities.matches[r.name]) -# match the first 5-tuple sequence. +# match the first sequence. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -147,7 +150,7 @@ def test_dynamic_sequence_scope(): # call 14: RtlAddVectoredExceptionHandler(1921490089, 0) # call 15: GetSystemTime() # call 16: NtAllocateVirtualMemory(no, 4, 786432, 4784128, 4294967295) -def test_dynamic_sequence_scope2(): +def test_dynamic_sequence_scope_length(): extractor = get_0000a657_thread3064() rule = textwrap.dedent( @@ -178,6 +181,108 @@ def test_dynamic_sequence_scope2(): assert r.name not in capabilities.matches +# show that you can use a call subscope in sequence rules. +# +# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) +# thread: 3064 +# ... +# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll) +# ... +def test_dynamic_sequence_call_subscope(): + extractor = get_0000a657_thread3064() + + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - call: + - and: + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + """ + ) + + r = capa.rules.Rule.from_yaml(rule) + ruleset = capa.rules.RuleSet([r]) + + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 11 in get_call_ids(capabilities.matches[r.name]) + + +# show that you can use a sequence subscope in sequence rules. +# +# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) +# thread: 3064 +# ... +# call 10: LdrGetDllHandle(1974337536, kernel32.dll) +# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll) +# call 12: LdrGetDllHandle(1974337536, kernel32.dll) +# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll) +# ... +def test_dynamic_sequence_scope_sequence_subscope(): + extractor = get_0000a657_thread3064() + + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - sequence: + - description: resolve add VEH # should match at 11 + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + - sequence: + - description: resolve remove VEH # should match at 13 + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: RemoveVectoredExceptionHandler + """ + ) + + r = capa.rules.Rule.from_yaml(rule) + ruleset = capa.rules.RuleSet([r]) + + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 13 in get_call_ids(capabilities.matches[r.name]) + + +# show that you can't use thread subscope in sequence rules. +def test_dynamic_sequence_scope_thread_subscope(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - thread: + - string: "foo" + """ + ) + + with pytest.raises(capa.rules.InvalidRule): + capa.rules.Rule.from_yaml(rule) + + # show how you might use a sequence rule: to match a small window for a collection of features. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)