Skip to content

Commit

Permalink
Merge pull request #33 from CybercentreCanada/multidecoder
Browse files Browse the repository at this point in the history
Simplify filtering iocs
  • Loading branch information
cccs-jh authored Jun 30, 2022
2 parents e14550b + 1db1639 commit e6059e0
Showing 1 changed file with 22 additions and 27 deletions.
49 changes: 22 additions & 27 deletions deobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import binascii
import os

from collections import Counter
from collections import Counter, defaultdict
from typing import Callable, Dict, List, Optional, Tuple

import regex
Expand All @@ -24,6 +24,19 @@
TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]]


def filter_iocs(iocs, original: bytes, seen: set, reversed=False):
new_iocs = defaultdict(set)
for ioc_type in iocs:
for ioc in iocs[ioc_type]:
prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc
if reversed:
prefix = prefix[::-1]
if prefix not in seen and prefix not in original:
seen.add(prefix)
new_iocs[ioc_type].add(ioc)
return new_iocs


class DeobfuScripter(ServiceBase):
""" Service for deobfuscating scripts """
VALIDCHARS = b' 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
Expand Down Expand Up @@ -423,13 +436,15 @@ def execute(self, request: ServiceRequest) -> None:
before_deobfuscation = layer

# --- Stage 2: Deobsfucation ------------------------------------------------------------------------------
seen_iocs = set()
passes: dict[int, tuple[list[str], dict[str, set[bytes]]]] = {}
techniques = first_pass
n_pass = 0 # Ensure n_pass is bound outside of the loop
for n_pass in range(max_attempts):
layer, techiques_used, iocs = self._deobfuscripter_pass(layer, techniques, md)
if techiques_used:
passes[n_pass] = techiques_used, iocs # Store the techniques used and iocs found for each pass
# Store the techniques used and new iocs found for each pass
passes[n_pass] = techiques_used, filter_iocs(iocs, before_deobfuscation, seen_iocs)
else:
# If there are no new layers in a pass, start second pass or break
if len(techniques) != len(first_pass):
Expand All @@ -440,7 +455,10 @@ def execute(self, request: ServiceRequest) -> None:
# --- Final Layer -----------------------------------------------------------------------------------------
layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md, final=True)
if final_techniques:
passes[n_pass+1] = final_techniques, final_iocs
passes[n_pass+1] = final_techniques, filter_iocs(final_iocs, before_deobfuscation, seen_iocs)

# Get new reversed iocs
rev_iocs = filter_iocs(md.ioc_tags(layer[::-1]), before_deobfuscation, seen_iocs)

# --- Compiling results -----------------------------------------------------------------------------------
if request.get_param('extract_original_iocs'):
Expand Down Expand Up @@ -473,29 +491,6 @@ def execute(self, request: ServiceRequest) -> None:
heuristic.add_signature_id(tech, frequency=count)
mres.add_line(f"{tech}, {count} time(s).")

# Filter for new IOCs
seen_iocs = set()
for n_pass, (_, iocs) in passes.items():
for ioc_type in iocs:
new_iocs = set()
for ioc in iocs[ioc_type]:
prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc
if prefix not in seen_iocs and prefix not in before_deobfuscation:
new_iocs.add(ioc)
seen_iocs.add(ioc)
iocs[ioc_type] = new_iocs
# And for new reversed IOCs
rev_iocs = md.ioc_tags(clean[::-1])
reversed_file = before_deobfuscation[::-1]
for ioc_type in rev_iocs:
for ioc in rev_iocs[ioc_type]:
new_iocs = set()
prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc
if prefix not in seen_iocs and prefix not in reversed_file:
new_iocs.add(ioc)
seen_iocs.add(ioc)
rev_iocs[ioc_type] = new_iocs

# Display final layer
byte_count = 5000
if request.deep_scan or (len(clean) > 1000 and heuristic.score >= 500) or seen_iocs:
Expand All @@ -520,7 +515,7 @@ def execute(self, request: ServiceRequest) -> None:
for n_pass, (_, iocs) in passes.items():
if not iocs:
continue
new_ioc_res.add_line("New IOCs found in pass {n_pass}:")
new_ioc_res.add_line(f"New IOCs found in pass {n_pass}:")
for ioc_type in iocs:
for ioc in iocs[ioc_type]:
if n_pass == 0: # iocs in the first pass can be found by other services
Expand Down

0 comments on commit e6059e0

Please sign in to comment.