From 7150cd7cc41c1e7e36a694dbadb41dcaf9ad70e5 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Thu, 2 Jun 2022 19:00:00 -0400 Subject: [PATCH 01/20] Integrate Multidecoder Use multidecoder for aditional techniques and ioc finding - refactor deobfuscation passes into a method - use multidecoder with depth 1 on each pass, giving a new layer and per pass iocs - on the final layer use multidecoder at full depth - report script extraction seperatly from deobfuscations - replace patternmatch iocs with iocs from multidecoder - report iocs by pass to show which are the most deobfuscated - score iocs found after the first pass higher --- deobs.py | 173 +++++++++++++++++++++++++------------------ service_manifest.yml | 18 +++-- 2 files changed, 113 insertions(+), 78 deletions(-) diff --git a/deobs.py b/deobs.py index cedf1c1..d4a970d 100644 --- a/deobs.py +++ b/deobs.py @@ -7,21 +7,25 @@ import os from collections import Counter -from itertools import chain from typing import Callable, Dict, List, Optional, Set, Tuple import magic import regex from bs4 import BeautifulSoup +from multidecoder.query import squash_replace, obfuscation_counts from assemblyline.common.str_utils import safe_str -from assemblyline_v4_service.common.balbuzard.patterns import PatternMatch +from assemblyline_v4_service.common.extractor.decode_wrapper import DecoderWrapper, get_tree_tags from assemblyline_v4_service.common.base import ServiceBase from assemblyline_v4_service.common.request import ServiceRequest, MaxExtractedExceeded from assemblyline_v4_service.common.result import Result, ResultSection, BODY_FORMAT, Heuristic +# Type declarations +TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]] + + class DeobfuScripter(ServiceBase): """ Service for deobfuscating scripts """ FILETYPES = ['application', 'document', 'exec', 'image', 'Microsoft', 'text'] @@ -471,7 +475,7 @@ def extract_htmlscript(self, text: bytes) -> List[bytes]: def execute(self, request: ServiceRequest) -> None: # --- Setup ---------------------------------------------------------------------------------------------- request.result = Result() - patterns = PatternMatch() + md = DecoderWrapper() max_attempts = 100 if request.deep_scan else 10 @@ -479,7 +483,6 @@ def execute(self, request: ServiceRequest) -> None: self.hashes = set() # --- Prepare Techniques ---------------------------------------------------------------------------------- - TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]] first_pass: TechniqueList = [ ('MSOffice Embedded script', self.msoffice_embedded_script_string), ('CHR and CHRB decode', self.chr_decode), @@ -502,15 +505,22 @@ def execute(self, request: ServiceRequest) -> None: final_pass: TechniqueList = [ ('Charcode', self.charcode), ] + final_pass.extend(second_pass) code_extracts = [ ('.*html.*', "HTML scripts extraction", self.extract_htmlscript) ] - layers_list: list[str] = [] layer = request.file_contents # --- Stage 1: Script Extraction -------------------------------------------------------------------------- + extract_res = ResultSection("Extraction") + for pattern, name, func in code_extracts: + if regex.match(regex.compile(pattern), request.task.file_type): + extracted_parts = func(request.file_contents) + layer = b"\n".join(extracted_parts).strip() + extract_res.add_line(name) + break if request.file_type == 'code/ps1': sig = regex.search( rb'# SIG # Begin signature block\r\n(?:# [A-Za-z0-9+/=]+\r\n)+# SIG # End signature block', @@ -527,46 +537,38 @@ def execute(self, request: ServiceRequest) -> None: with open(sig_path, 'wb+') as f: f.write(signature) request.add_extracted(sig_path, sig_filename, "Powershell Signature") + extract_res.add_line(f"Powershell Signature Comment, see {sig_filename}") except binascii.Error: pass - for pattern, name, func in code_extracts: - if regex.match(regex.compile(pattern), request.task.file_type): - extracted_parts = func(request.file_contents) - layer = b"\n".join(extracted_parts).strip() - layers_list.append(name) - break + if extract_res.body: + request.result.add_section(extract_res) + # Save extracted scripts before deobfuscation before_deobfuscation = layer # --- Stage 2: Deobsfucation ------------------------------------------------------------------------------ + passes: dict[int, tuple[list[str], dict[str, set[bytes]]]] = {} techniques = first_pass - layers_count = len(layers_list) - for _ in range(max_attempts): - for name, technique in techniques: - result = technique(layer) - if result: - layers_list.append(name) - # Looks like it worked, restart with new layer - layer = result - # If there are no new layers in a pass, start second pass or break - if layers_count == len(layers_list): + n_pass = 0 # Ensure n_pass is bound outside of the loop + for n_pass in range(max_attempts): + layer, techiques_used, iocs = self._deobfuscripter_pass(layer, techniques, md) + if techiques_used: + passes[n_pass] = techiques_used, iocs # Store the techniques used and iocs found for each pass + else: + # If there are no new layers in a pass, start second pass or break if len(techniques) != len(first_pass): # Already on second pass break techniques = second_pass - layers_count = len(layers_list) # --- Final Layer ----------------------------------------------------------------------------------------- - final_pass.extend(techniques) - for name, technique in final_pass: - res = technique(layer) - if res: - layers_list.append(name) - layer = res + layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md) + if final_techniques: + passes[n_pass+1] = final_techniques, final_iocs # --- Compiling results ----------------------------------------------------------------------------------- if request.get_param('extract_original_iocs'): - pat_values = patterns.ioc_match(before_deobfuscation, bogon_ip=True, just_network=False) + pat_values = get_tree_tags(md.multidecoder.scan(before_deobfuscation, 1)) ioc_res = ResultSection("The following IOCs were found in the original file", parent=request.result, body_format=BODY_FORMAT.MEMORY_DUMP) for k, val in pat_values.items(): @@ -575,7 +577,7 @@ def execute(self, request: ServiceRequest) -> None: ioc_res.add_line(f"Found {k.upper().replace('.', ' ')}: {safe_str(v)}") ioc_res.add_tag(k, v) - if not layers_list: + if not passes: return # Cleanup final layer clean = self.clean_up_final_layer(layer) @@ -588,41 +590,39 @@ def execute(self, request: ServiceRequest) -> None: parent=request.result, heuristic=heuristic) - tech_count = Counter(layers_list) + tech_count = Counter() + for p in passes.values(): + tech_count.update(p[0]) for tech, count in tech_count.items(): heuristic.add_signature_id(tech, frequency=count) mres.add_line(f"{tech}, {count} time(s).") - # Check for new IOCs - pat_values = patterns.ioc_match(clean, bogon_ip=True, just_network=False) - diff_tags: Dict[str, List[bytes]] = {} - for ioc_type, iocs in pat_values.items(): - for ioc in iocs: - if ioc_type == 'network.static.uri': - if b'/'.join(ioc.split(b'/', 3)[:3]) not in before_deobfuscation: - diff_tags.setdefault(ioc_type, []) - diff_tags[ioc_type].append(ioc) - elif ioc not in before_deobfuscation: - diff_tags.setdefault(ioc_type, []) - diff_tags[ioc_type].append(ioc) - + # Filter for new IOCs + seen_iocs = set() + for n_pass, (_, iocs) in passes.items(): + for ioc_type in iocs: + new_iocs = set() + for ioc in iocs[ioc_type]: + prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc + if prefix not in seen_iocs and prefix not in before_deobfuscation: + new_iocs.add(ioc) + seen_iocs.add(ioc) + iocs[ioc_type] = new_iocs # And for new reversed IOCs - rev_values = patterns.ioc_match(clean[::-1], bogon_ip=True, just_network=False) - rev_tags: Dict[str, List[bytes]] = {} + rev_iocs = md.ioc_tags(clean[::-1]) reversed_file = before_deobfuscation[::-1] - for ioc_type, iocs in rev_values.items(): - for ioc in iocs: - if ioc_type == 'network.static.uri': - if b'/'.join(ioc.split(b'/', 3)[:3]) not in reversed_file: - rev_tags.setdefault(ioc_type, []) - rev_tags[ioc_type].append(ioc) - elif ioc not in reversed_file and ioc[::-1] not in diff_tags.get(ioc_type, []): - rev_tags.setdefault(ioc_type, []) - rev_tags[ioc_type].append(ioc) + for ioc_type in rev_iocs: + for ioc in rev_iocs[ioc_type]: + new_iocs = set() + prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc + if prefix not in seen_iocs and prefix not in reversed_file: + new_iocs.add(ioc) + seen_iocs.add(ioc) + rev_iocs[ioc_type] = new_iocs # Display final layer byte_count = 5000 - if request.deep_scan or (len(clean) > 1000 and heuristic.score >= 500) or diff_tags or rev_tags: + if request.deep_scan or (len(clean) > 1000 and heuristic.score >= 500) or seen_iocs: # Save extracted file byte_count = 500 file_name = f"{os.path.basename(request.file_name)}_decoded_final" @@ -637,23 +637,33 @@ def execute(self, request: ServiceRequest) -> None: ResultSection(f"First {byte_count} bytes of the final layer:", body=safe_str(clean[:byte_count]), body_format=BODY_FORMAT.MEMORY_DUMP, parent=request.result) - # Display new IOCs from final layer - if diff_tags or rev_tags: - ioc_new = ResultSection("New IOCs found after de-obfustcation", parent=request.result, + # Report new IOCs + new_ioc_res = ResultSection("New IOCs found after de-obfustcation", body_format=BODY_FORMAT.MEMORY_DUMP) - has_network_heur = False - for ty, val in chain(diff_tags.items(), rev_tags.items()): - if "network" in ty and ty != 'network.static.domain': - has_network_heur = True - for v in val: - ioc_new.add_line(f"Found {ty.upper().replace('.', ' ')}: {safe_str(v)}") - ioc_new.add_tag(ty, v) - - if has_network_heur: - ioc_new.set_heuristic(7) - else: - ioc_new.set_heuristic(6) - + heuristic = 5 + for n_pass, (_, iocs) in passes.items(): + if not iocs: + continue + new_ioc_res.add_line("New IOCs found in pass {n_pass}:") + for ioc_type in iocs: + for ioc in iocs[ioc_type]: + if n_pass != 0: # iocs in the first pass can be found by other services + heuristic = max(7 if 'network' in ioc_type and ioc_type != 'network.static.domain' + else 6, heuristic) + new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") + new_ioc_res.add_tag(ioc_type, ioc) + if rev_iocs: + new_ioc_res.add_line("New IOCs found reversed in the final layer:") + for ioc_type in rev_iocs: + for ioc in rev_iocs[ioc_type]: + heuristic = max(7 if 'network' in ioc_type and ioc_type != 'network.static.domain' + else 6, heuristic) + new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") + new_ioc_res.add_tag(ioc_type, ioc) + if new_ioc_res.body: + request.result.add_section(new_ioc_res) + + # Report extracted files if len(self.files_extracted) > 0: ext_file_res = ResultSection("The following files were extracted during the deobfuscation", heuristic=Heuristic(8), parent=request.result) @@ -666,3 +676,22 @@ def execute(self, request: ServiceRequest) -> None: except MaxExtractedExceeded: self.log.warning('Extraction limit exceeded while adding files of interest.') break + + @staticmethod + def _deobfuscripter_pass(layer: bytes, + techniques: TechniqueList, + md: DecoderWrapper, + final=False) -> tuple[bytes, list[str], dict]: + techniques_used = [] + for name, technique in techniques: + result = technique(layer) + if result: + techniques_used.append(name) + # Looks like it worked, continue with the new layer + layer = result + # Use multidecoder techniques and ioc tagging + tree = md.multidecoder.scan(layer, depth=10 if final else 1) + techniques_used.extend(obfuscation_counts(tree).keys()) + iocs = get_tree_tags(tree) # Get IoCs for the pass + layer = squash_replace(layer, tree) + return layer, techniques_used, iocs diff --git a/service_manifest.yml b/service_manifest.yml index 2f54ac1..8d3f0b0 100644 --- a/service_manifest.yml +++ b/service_manifest.yml @@ -23,29 +23,35 @@ submission_params: value: false heuristics: - - description: Obfuscation techniques were found and deobfuscated in the file + - description: Obfuscation techniques were found and de-obfuscated in the file filetype: code/.* heur_id: 1 name: Obfuscation score: 10 max_score: 1000 - - description: IOCs where found only after de-obfuscation + - description: IOCs were found after simple de-obfuscation + filetype: code/.* + heur_id: 5 + name: Lightly De-obfuscated IOCs + score: 50 + + - description: IOCs were found only after layered de-obfuscations filetype: code/.* heur_id: 6 name: De-obfuscated IOCs - score: 50 + score: 100 - - description: Network IOCs where found only after de-obfuscation + - description: Network IOCs were found only after layered de-obfuscations filetype: code/.* heur_id: 7 name: De-obfuscated Network IOCs score: 500 - - description: The service found interesting files during the deobfuscation + - description: The service found interesting files during the de-obfuscation filetype: code/.* heur_id: 8 - name: Deobfuscated file + name: De-obfuscated file score: 10 docker_config: From 6f5335eb61c13619cd48e0e9f868f6d41e9e0d83 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Thu, 2 Jun 2022 19:20:46 -0400 Subject: [PATCH 02/20] minor fixes - use final on final layer - add heuristic for iocs - use actual default depth instead of passing in 10 --- deobs.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/deobs.py b/deobs.py index d4a970d..4f46789 100644 --- a/deobs.py +++ b/deobs.py @@ -562,7 +562,7 @@ def execute(self, request: ServiceRequest) -> None: techniques = second_pass # --- Final Layer ----------------------------------------------------------------------------------------- - layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md) + layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md, final=True) if final_techniques: passes[n_pass+1] = final_techniques, final_iocs @@ -640,16 +640,17 @@ def execute(self, request: ServiceRequest) -> None: # Report new IOCs new_ioc_res = ResultSection("New IOCs found after de-obfustcation", body_format=BODY_FORMAT.MEMORY_DUMP) - heuristic = 5 + heuristic = 0 for n_pass, (_, iocs) in passes.items(): if not iocs: continue new_ioc_res.add_line("New IOCs found in pass {n_pass}:") for ioc_type in iocs: for ioc in iocs[ioc_type]: - if n_pass != 0: # iocs in the first pass can be found by other services - heuristic = max(7 if 'network' in ioc_type and ioc_type != 'network.static.domain' - else 6, heuristic) + if n_pass == 0: # iocs in the first pass can be found by other services + heuristic = 5 + elif heuristic < 7: + heuristic = 7 if 'network' in ioc_type and ioc_type != 'network.static.domain' else 6 new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") new_ioc_res.add_tag(ioc_type, ioc) if rev_iocs: @@ -660,6 +661,8 @@ def execute(self, request: ServiceRequest) -> None: else 6, heuristic) new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") new_ioc_res.add_tag(ioc_type, ioc) + if heuristic > 0: + new_ioc_res.set_heuristic(heuristic) if new_ioc_res.body: request.result.add_section(new_ioc_res) @@ -690,7 +693,10 @@ def _deobfuscripter_pass(layer: bytes, # Looks like it worked, continue with the new layer layer = result # Use multidecoder techniques and ioc tagging - tree = md.multidecoder.scan(layer, depth=10 if final else 1) + if final: + tree = md.multidecoder.scan(layer) + else: + tree = md.multidecoder.scan(layer, depth=1) techniques_used.extend(obfuscation_counts(tree).keys()) iocs = get_tree_tags(tree) # Get IoCs for the pass layer = squash_replace(layer, tree) From 30eeb66e393f8dc53a33cfe23096ab25def5e20e Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Tue, 7 Jun 2022 16:25:03 -0400 Subject: [PATCH 03/20] Extract files from Multidecoder and remove b64decode_str --- deobs.py | 60 +++++--------------------------------------------------- 1 file changed, 5 insertions(+), 55 deletions(-) diff --git a/deobs.py b/deobs.py index 4f46789..c729a02 100644 --- a/deobs.py +++ b/deobs.py @@ -3,13 +3,11 @@ from __future__ import annotations import binascii -import hashlib import os from collections import Counter -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Callable, Dict, List, Optional, Tuple -import magic import regex from bs4 import BeautifulSoup @@ -34,8 +32,6 @@ class DeobfuScripter(ServiceBase): def __init__(self, config: Optional[Dict] = None) -> None: super().__init__(config) - self.hashes: Set[str] = set() - self.files_extracted: Set[str] = set() def start(self) -> None: self.log.debug("DeobfuScripter service started") @@ -168,49 +164,6 @@ def string_replace(text: bytes) -> Optional[bytes]: return output return None - def b64decode_str(self, text: bytes) -> Optional[bytes]: - """ Decode base64 """ - b64str = regex.findall(b'((?:[A-Za-z0-9+/]{3,}={0,2}(?:&#[x1][A0];)?[\r]?[\n]?){6,})', text) - output = text - for bmatch in b64str: - s = bmatch.replace(b'\n', - b'').replace(b'\r', b'').replace(b' ', b'').replace(b' ', b'').replace(b' ', b'') - uniq_char = set(s) - if len(uniq_char) > 6: - if len(s) >= 16 and len(s) % 4 == 0: - try: - d = binascii.a2b_base64(s) - except binascii.Error: - continue - m = magic.Magic(mime=True) - mag = magic.Magic() - ftype = m.from_buffer(d) - mag_ftype = mag.from_buffer(d) - sha256hash = hashlib.sha256(d).hexdigest() - if sha256hash not in self.hashes: - if len(d) > 500: - for file_type in self.FILETYPES: - if (file_type in ftype and 'octet-stream' not in ftype) or file_type in mag_ftype: - b64_file_name = f"{sha256hash[0:10]}_b64_decoded" - b64_file_path = os.path.join(self.working_directory, b64_file_name) - with open(b64_file_path, 'wb') as b64_file: - b64_file.write(d) - self.files_extracted.add(b64_file_path) - self.hashes.add(sha256hash) - break - - if len(set(d)) > 6 and all(8 < c < 127 for c in d) and len(regex.sub(rb"\s", b"", d)) > 14: - output = output.replace(bmatch, d) - else: - # Test for ASCII seperated by \x00 - p = d.replace(b'\x00', b'') - if len(set(p)) > 6 and all(8 < c < 127 for c in p) and len(regex.sub(rb"\s", b"", p)) > 14: - output = output.replace(bmatch, p) - - if output == text: - return None - return output - @staticmethod def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: """ Parse variables of fake arrays """ @@ -475,13 +428,10 @@ def extract_htmlscript(self, text: bytes) -> List[bytes]: def execute(self, request: ServiceRequest) -> None: # --- Setup ---------------------------------------------------------------------------------------------- request.result = Result() - md = DecoderWrapper() + md = DecoderWrapper(self.working_directory) max_attempts = 100 if request.deep_scan else 10 - self.files_extracted = set() - self.hashes = set() - # --- Prepare Techniques ---------------------------------------------------------------------------------- first_pass: TechniqueList = [ ('MSOffice Embedded script', self.msoffice_embedded_script_string), @@ -491,7 +441,6 @@ def execute(self, request: ServiceRequest) -> None: ('Array of strings', self.array_of_strings), ('Fake array vars', self.vars_of_fake_arrays), ('Reverse strings', self.str_reverse), - ('B64 Decode', self.b64decode_str), ('Simple XOR function', self.simple_xor_function), ] second_pass: TechniqueList = [ @@ -667,10 +616,10 @@ def execute(self, request: ServiceRequest) -> None: request.result.add_section(new_ioc_res) # Report extracted files - if len(self.files_extracted) > 0: + if md.extracted_files: ext_file_res = ResultSection("The following files were extracted during the deobfuscation", heuristic=Heuristic(8), parent=request.result) - for extracted in self.files_extracted: + for extracted in md.extracted_files: file_name = os.path.basename(extracted) try: if request.add_extracted(extracted, file_name, "File of interest deobfuscated from sample", @@ -697,6 +646,7 @@ def _deobfuscripter_pass(layer: bytes, tree = md.multidecoder.scan(layer) else: tree = md.multidecoder.scan(layer, depth=1) + md.extract_files(tree, 500) techniques_used.extend(obfuscation_counts(tree).keys()) iocs = get_tree_tags(tree) # Get IoCs for the pass layer = squash_replace(layer, tree) From 6f21a314b099e3c98e6d59064469270624b24c6f Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 8 Jun 2022 09:27:01 -0400 Subject: [PATCH 04/20] Move chr_decode to multidecoder --- deobs.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/deobs.py b/deobs.py index c729a02..ba5aa1a 100644 --- a/deobs.py +++ b/deobs.py @@ -26,7 +26,6 @@ class DeobfuScripter(ServiceBase): """ Service for deobfuscating scripts """ - FILETYPES = ['application', 'document', 'exec', 'image', 'Microsoft', 'text'] VALIDCHARS = b' 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' BINCHARS = bytes(list(set(range(0, 256)) - set(VALIDCHARS))) @@ -106,20 +105,6 @@ def charcode_hex(text: bytes) -> Optional[bytes]: return None return output - @staticmethod - def chr_decode(text: bytes) -> Optional[bytes]: - """ Replace calls to chr with the corresponding character """ - output = text - for fullc, c in regex.findall(rb'(chr[bw]?\(([0-9]{1,3})\))', output, regex.I): - # noinspection PyBroadException - try: - output = regex.sub(regex.escape(fullc), '"{}"'.format(chr(int(c))).encode('utf-8'), output) - except Exception: - continue - if output == text: - return None - return output - @staticmethod def xml_unescape(text: bytes) -> Optional[bytes]: """ Replace XML escape sequences with the corresponding character """ @@ -435,7 +420,6 @@ def execute(self, request: ServiceRequest) -> None: # --- Prepare Techniques ---------------------------------------------------------------------------------- first_pass: TechniqueList = [ ('MSOffice Embedded script', self.msoffice_embedded_script_string), - ('CHR and CHRB decode', self.chr_decode), ('String replace', self.string_replace), ('Powershell carets', self.powershell_carets), ('Array of strings', self.array_of_strings), From df2e705371ef55e12b3e6a431376993bce3d4048 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 8 Jun 2022 13:41:39 -0400 Subject: [PATCH 05/20] Move string replace into multidecoder --- deobs.py | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/deobs.py b/deobs.py index ba5aa1a..c601816 100644 --- a/deobs.py +++ b/deobs.py @@ -115,39 +115,6 @@ def xml_unescape(text: bytes) -> Optional[bytes]: output = output.replace(escape, int(escape[2:-1]).to_bytes(1, 'big')) return output if output != text else None - @staticmethod - def string_replace(text: bytes) -> Optional[bytes]: - """ Replace calls to replace() with their output """ - if b'replace(' in text.lower(): - # Process string with replace functions calls - # Such as "SaokzueofpigxoFile".replace(/ofpigx/g, "T").replace(/okzu/g, "v") - output = text - # Find all occurrences of string replace (JS) - for strreplace in [o[0] for o in - regex.findall(rb'(["\'][^"\']+["\']((\.replace\([^)]+\))+))', output, flags=regex.I)]: - substitute = strreplace - # Extract all substitutions - for str1, str2 in regex.findall(rb'\.replace\([/\'"]([^,]+)[/\'\"]g?\s*,\s*[\'\"]([^)]*)[\'\"]\)', - substitute, flags=regex.I): - # Execute the substitution - substitute = substitute.replace(str1, str2) - # Remove the replace calls from the layer (prevent accidental substitutions in the next step) - substitute = substitute[:substitute.lower().index(b'.replace(')] - output = output.replace(strreplace, substitute) - - # Process global string replace - replacements = regex.findall(rb'replace\(\s*/([^)]+)/g?, [\'"]([^\'"]*)[\'"]', output) - for str1, str2 in replacements: - output = output.replace(str1, str2) - # Process VB string replace - replacements = regex.findall(rb'Replace\(\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?' - rb'([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?', output) - for str1, str2, str3 in replacements: - output = output.replace(str1, str1.replace(str2, str3)) - output = regex.sub(rb'\.replace\(\s*/([^)]+)/g?, [\'"]([^\'"]*)[\'"]\)', b'', output) - if output != text: - return output - return None @staticmethod def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: @@ -420,7 +387,6 @@ def execute(self, request: ServiceRequest) -> None: # --- Prepare Techniques ---------------------------------------------------------------------------------- first_pass: TechniqueList = [ ('MSOffice Embedded script', self.msoffice_embedded_script_string), - ('String replace', self.string_replace), ('Powershell carets', self.powershell_carets), ('Array of strings', self.array_of_strings), ('Fake array vars', self.vars_of_fake_arrays), From ccd672533b3e0ef1222e81e84b20381f312dd579 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 8 Jun 2022 13:54:05 -0400 Subject: [PATCH 06/20] Remove string concatenation since it's already in multidecoder --- deobs.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/deobs.py b/deobs.py index c601816..5ec8c5e 100644 --- a/deobs.py +++ b/deobs.py @@ -158,15 +158,6 @@ def array_of_strings(self, text: bytes) -> Optional[bytes]: return None - @staticmethod - def concat_strings(text: bytes) -> Optional[bytes]: - """ Concatenate disconnected strings """ - # Line continuation character in VB -- '_' - output = regex.sub(rb'[\'"][\s\n_]*?[+&][\s\n_]*[\'"]', b'', text) - if output != text: - return output - return None - @staticmethod def str_reverse(text: bytes) -> Optional[bytes]: """ Replace StrReverse function calls with the reverse of its argument """ @@ -394,7 +385,6 @@ def execute(self, request: ServiceRequest) -> None: ('Simple XOR function', self.simple_xor_function), ] second_pass: TechniqueList = [ - ('Concat strings', self.concat_strings), ('MSWord macro vars', self.mswordmacro_vars), ('Powershell vars', self.powershell_vars), ('Charcode hex', self.charcode_hex), From fdea304d5a8fc424b20dd41ea00979a30547a8c6 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 8 Jun 2022 15:43:42 -0400 Subject: [PATCH 07/20] Move str_reverse to multidecoder --- deobs.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/deobs.py b/deobs.py index 5ec8c5e..9e509f1 100644 --- a/deobs.py +++ b/deobs.py @@ -115,7 +115,6 @@ def xml_unescape(text: bytes) -> Optional[bytes]: output = output.replace(escape, int(escape[2:-1]).to_bytes(1, 'big')) return output if output != text else None - @staticmethod def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: """ Parse variables of fake arrays """ @@ -158,19 +157,6 @@ def array_of_strings(self, text: bytes) -> Optional[bytes]: return None - @staticmethod - def str_reverse(text: bytes) -> Optional[bytes]: - """ Replace StrReverse function calls with the reverse of its argument """ - output = text - # VBA format StrReverse("[text]") - replacements = regex.findall(rb'(StrReverse\("(.+?(?="\))))', output) - for full, string in replacements: - reversed_string = full.replace(string, string[::-1]).replace(b"StrReverse(", b"")[:-1] - output = output.replace(full, reversed_string) - if output != text: - return output - return None - @staticmethod def powershell_vars(text: bytes) -> Optional[bytes]: """ Replace PowerShell variables with their values """ @@ -381,7 +367,6 @@ def execute(self, request: ServiceRequest) -> None: ('Powershell carets', self.powershell_carets), ('Array of strings', self.array_of_strings), ('Fake array vars', self.vars_of_fake_arrays), - ('Reverse strings', self.str_reverse), ('Simple XOR function', self.simple_xor_function), ] second_pass: TechniqueList = [ From d8eb68b0ed48e0c43c3539a8ac035454e1d5cf63 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Thu, 9 Jun 2022 10:56:54 -0400 Subject: [PATCH 08/20] Stop after extraction if nothing is extracted --- deobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/deobs.py b/deobs.py index 9e509f1..27f8040 100644 --- a/deobs.py +++ b/deobs.py @@ -395,6 +395,8 @@ def execute(self, request: ServiceRequest) -> None: layer = b"\n".join(extracted_parts).strip() extract_res.add_line(name) break + if len(layer.strip()) < 2: + return # No script present in file if request.file_type == 'code/ps1': sig = regex.search( rb'# SIG # Begin signature block\r\n(?:# [A-Za-z0-9+/=]+\r\n)+# SIG # End signature block', From 1db16398be726a860b9171fd468e1ec43650537e Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Thu, 30 Jun 2022 10:04:40 -0400 Subject: [PATCH 09/20] Simplify filtering iocs --- deobs.py | 49 ++++++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/deobs.py b/deobs.py index 27f8040..aab5265 100644 --- a/deobs.py +++ b/deobs.py @@ -5,7 +5,7 @@ import binascii import os -from collections import Counter +from collections import Counter, defaultdict from typing import Callable, Dict, List, Optional, Tuple import regex @@ -24,6 +24,19 @@ TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]] +def filter_iocs(iocs, original: bytes, seen: set, reversed=False): + new_iocs = defaultdict(set) + for ioc_type in iocs: + for ioc in iocs[ioc_type]: + prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc + if reversed: + prefix = prefix[::-1] + if prefix not in seen and prefix not in original: + seen.add(prefix) + new_iocs[ioc_type].add(ioc) + return new_iocs + + class DeobfuScripter(ServiceBase): """ Service for deobfuscating scripts """ VALIDCHARS = b' 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' @@ -423,13 +436,15 @@ def execute(self, request: ServiceRequest) -> None: before_deobfuscation = layer # --- Stage 2: Deobsfucation ------------------------------------------------------------------------------ + seen_iocs = set() passes: dict[int, tuple[list[str], dict[str, set[bytes]]]] = {} techniques = first_pass n_pass = 0 # Ensure n_pass is bound outside of the loop for n_pass in range(max_attempts): layer, techiques_used, iocs = self._deobfuscripter_pass(layer, techniques, md) if techiques_used: - passes[n_pass] = techiques_used, iocs # Store the techniques used and iocs found for each pass + # Store the techniques used and new iocs found for each pass + passes[n_pass] = techiques_used, filter_iocs(iocs, before_deobfuscation, seen_iocs) else: # If there are no new layers in a pass, start second pass or break if len(techniques) != len(first_pass): @@ -440,7 +455,10 @@ def execute(self, request: ServiceRequest) -> None: # --- Final Layer ----------------------------------------------------------------------------------------- layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md, final=True) if final_techniques: - passes[n_pass+1] = final_techniques, final_iocs + passes[n_pass+1] = final_techniques, filter_iocs(final_iocs, before_deobfuscation, seen_iocs) + + # Get new reversed iocs + rev_iocs = filter_iocs(md.ioc_tags(layer[::-1]), before_deobfuscation, seen_iocs) # --- Compiling results ----------------------------------------------------------------------------------- if request.get_param('extract_original_iocs'): @@ -473,29 +491,6 @@ def execute(self, request: ServiceRequest) -> None: heuristic.add_signature_id(tech, frequency=count) mres.add_line(f"{tech}, {count} time(s).") - # Filter for new IOCs - seen_iocs = set() - for n_pass, (_, iocs) in passes.items(): - for ioc_type in iocs: - new_iocs = set() - for ioc in iocs[ioc_type]: - prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc - if prefix not in seen_iocs and prefix not in before_deobfuscation: - new_iocs.add(ioc) - seen_iocs.add(ioc) - iocs[ioc_type] = new_iocs - # And for new reversed IOCs - rev_iocs = md.ioc_tags(clean[::-1]) - reversed_file = before_deobfuscation[::-1] - for ioc_type in rev_iocs: - for ioc in rev_iocs[ioc_type]: - new_iocs = set() - prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc - if prefix not in seen_iocs and prefix not in reversed_file: - new_iocs.add(ioc) - seen_iocs.add(ioc) - rev_iocs[ioc_type] = new_iocs - # Display final layer byte_count = 5000 if request.deep_scan or (len(clean) > 1000 and heuristic.score >= 500) or seen_iocs: @@ -520,7 +515,7 @@ def execute(self, request: ServiceRequest) -> None: for n_pass, (_, iocs) in passes.items(): if not iocs: continue - new_ioc_res.add_line("New IOCs found in pass {n_pass}:") + new_ioc_res.add_line(f"New IOCs found in pass {n_pass}:") for ioc_type in iocs: for ioc in iocs[ioc_type]: if n_pass == 0: # iocs in the first pass can be found by other services From b4c27e036798d46c856a7a48cb9a9adee19554a3 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 27 Sep 2023 16:10:24 -0400 Subject: [PATCH 10/20] Update depricated .vscode settings --- .vscode/settings.json | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 49d23d0..1d6ab98 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -18,21 +18,15 @@ "--profile=black", // "--src=${workspaceFolder}" ], - "python.formatting.autopep8Args": [ - "--max-line-length", - "120", - "--experimental" - ], - "python.formatting.provider": "autopep8", - "python.formatting.blackArgs": [ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "black-formatter.args": [ "--line-length=120" ], - "python.linting.enabled": true, - "python.linting.flake8Enabled": true, - "python.linting.flake8Args": [ + "flake8.args": [ "--max-line-length=120", //Added the ignore of E203 for now : https://github.com/PyCQA/pycodestyle/issues/373 "--ignore=E203,W503" ], - "python.linting.pylintEnabled": false, } From acbafe23becdc6e7aaed841ec62b79f384b4e37f Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 27 Sep 2023 16:12:14 -0400 Subject: [PATCH 11/20] Format with black --- deobs.py | 305 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 171 insertions(+), 134 deletions(-) diff --git a/deobs.py b/deobs.py index aabd7c7..c726d6b 100644 --- a/deobs.py +++ b/deobs.py @@ -25,7 +25,7 @@ def filter_iocs(iocs, original: bytes, seen: set, reversed=False): new_iocs = defaultdict(set) for ioc_type in iocs: for ioc in iocs[ioc_type]: - prefix = b'/'.join(ioc.split(b'/', 3)[:3]) if ioc_type == 'network.static.uri' else ioc + prefix = b"/".join(ioc.split(b"/", 3)[:3]) if ioc_type == "network.static.uri" else ioc if reversed: prefix = prefix[::-1] if prefix not in seen and prefix not in original: @@ -35,8 +35,9 @@ def filter_iocs(iocs, original: bytes, seen: set, reversed=False): class DeobfuScripter(ServiceBase): - """ Service for deobfuscating scripts """ - VALIDCHARS = b' 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~' + """Service for deobfuscating scripts""" + + VALIDCHARS = b" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" BINCHARS = bytes(list(set(range(0, 256)) - set(VALIDCHARS))) def __init__(self, config: Optional[Dict] = None) -> None: @@ -45,17 +46,17 @@ def __init__(self, config: Optional[Dict] = None) -> None: # --- Support Modules ---------------------------------------------------------------------------------------------- def printable_ratio(self, text: bytes) -> float: - """ Calcuate the ratio of printable characters to total characters in text """ + """Calcuate the ratio of printable characters to total characters in text""" return float(float(len(text.translate(None, self.BINCHARS))) / float(len(text))) @staticmethod def encode_codepoint(codepoint: int) -> bytes: - """ Returns the utf-8 encoding of a unicode codepoint """ - return chr(codepoint).encode('utf-8') + """Returns the utf-8 encoding of a unicode codepoint""" + return chr(codepoint).encode("utf-8") @staticmethod def codepoint_sub(match: regex.Match, base: int = 16) -> bytes: - """ Replace method for unicode codepoint regex substitutions. + """Replace method for unicode codepoint regex substitutions. Args: match: The regex match object with the text of the unicode codepoint value as group 1. @@ -71,54 +72,54 @@ def codepoint_sub(match: regex.Match, base: int = 16) -> bytes: @staticmethod def add1b(s: bytes, k: int) -> bytes: - """ Add k to each byte of s """ - return bytes([(c + k) & 0xff for c in s]) + """Add k to each byte of s""" + return bytes([(c + k) & 0xFF for c in s]) @staticmethod def charcode(text: bytes) -> Optional[bytes]: - """ Replace character codes with the corresponding characters """ + """Replace character codes with the corresponding characters""" # Todo: something to handle powershell bytes syntax @staticmethod def charcode_hex(text: bytes) -> Optional[bytes]: - """ Replace hex character codes with the corresponding characters """ - output = regex.sub(rb'(?i)(?:\\x|%)([a-f0-9]{2})', lambda m: binascii.unhexlify(m.group(1)), text) + """Replace hex character codes with the corresponding characters""" + output = regex.sub(rb"(?i)(?:\\x|%)([a-f0-9]{2})", lambda m: binascii.unhexlify(m.group(1)), text) return output if output != text else None # Todo: find a way to prevent charcode_oct from mangling windows filepaths with sections that start with 0-7 @staticmethod def charcode_oct(text: bytes) -> Optional[bytes]: - """ Replace octal character codes with the corresponding characters """ - output = regex.sub(rb'\\([0-7]{1,3})', partial(DeobfuScripter.codepoint_sub, base=8), text) + """Replace octal character codes with the corresponding characters""" + output = regex.sub(rb"\\([0-7]{1,3})", partial(DeobfuScripter.codepoint_sub, base=8), text) return output if output != text else None @staticmethod def charcode_unicode(text: bytes) -> Optional[bytes]: - """ Replace unicode character codes with the corresponding utf-8 byte sequence""" - output = regex.sub(rb'(?i)(?:\\u|%u)([a-f0-9]{4})', DeobfuScripter.codepoint_sub, text) + """Replace unicode character codes with the corresponding utf-8 byte sequence""" + output = regex.sub(rb"(?i)(?:\\u|%u)([a-f0-9]{4})", DeobfuScripter.codepoint_sub, text) return output if output != text else None @staticmethod def charcode_xml(text: bytes) -> Optional[bytes]: - """ Replace XML escape sequences with the corresponding character """ - output = regex.sub(rb'(?i)&#x([a-z0-9]{1,6});', DeobfuScripter.codepoint_sub, text) - output = regex.sub(rb'&#([0-9]{1,7});', partial(DeobfuScripter.codepoint_sub, base=10), output) + """Replace XML escape sequences with the corresponding character""" + output = regex.sub(rb"(?i)&#x([a-z0-9]{1,6});", DeobfuScripter.codepoint_sub, text) + output = regex.sub(rb"&#([0-9]{1,7});", partial(DeobfuScripter.codepoint_sub, base=10), output) return output if output != text else None @staticmethod def hex_constant(text: bytes) -> Optional[bytes]: - """ Replace hexadecimal integer constants with decimal ones""" - output = regex.sub(rb'(?i)\b0x([a-f0-9]{1,16})\b', lambda m: str(int(m.group(1), 16)).encode('utf-8'), text) + """Replace hexadecimal integer constants with decimal ones""" + output = regex.sub(rb"(?i)\b0x([a-f0-9]{1,16})\b", lambda m: str(int(m.group(1), 16)).encode("utf-8"), text) return output if output != text else None @staticmethod def chr_decode(text: bytes) -> Optional[bytes]: - """ Replace calls to chr with the corresponding character """ + """Replace calls to chr with the corresponding character""" output = text - for fullc, c in regex.findall(rb'(chr[bw]?\(([0-9]{1,3})\))', output, regex.I): + for fullc, c in regex.findall(rb"(chr[bw]?\(([0-9]{1,3})\))", output, regex.I): # noinspection PyBroadException try: - output = regex.sub(regex.escape(fullc), f'"{chr(int(c))}"'.encode('utf-8'), output) + output = regex.sub(regex.escape(fullc), f'"{chr(int(c))}"'.encode("utf-8"), output) except Exception: continue if output == text: @@ -127,23 +128,25 @@ def chr_decode(text: bytes) -> Optional[bytes]: @staticmethod def string_replace(text: bytes) -> Optional[bytes]: - """ Replace calls to replace() with their output """ - if b'replace(' in text.lower(): + """Replace calls to replace() with their output""" + if b"replace(" in text.lower(): # Process string with replace functions calls # Such as "SaokzueofpigxoFile".replace(/ofpigx/g, "T").replace(/okzu/g, "v") output = text # Find all occurrences of string replace (JS) - for strreplace in [o[0] for o in - regex.findall(rb'(["\'][^"\']+["\']((\.replace\([^)]+\))+))', output, flags=regex.I)]: + for strreplace in [ + o[0] for o in regex.findall(rb'(["\'][^"\']+["\']((\.replace\([^)]+\))+))', output, flags=regex.I) + ]: substitute = strreplace # Extract all substitutions - for str1, str2 in regex.findall(rb'\.replace\([/\'"]([^,]+)[/\'\"]g?\s*,\s*[\'\"]([^)]*)[\'\"]\)', - substitute, flags=regex.I): + for str1, str2 in regex.findall( + rb'\.replace\([/\'"]([^,]+)[/\'\"]g?\s*,\s*[\'\"]([^)]*)[\'\"]\)', substitute, flags=regex.I + ): # Execute the substitution substitute = substitute.replace(str1, str2) # Remove the replace calls from the layer (prevent accidental substitutions in the next step) - if b'.replace(' in substitute.lower(): - substitute = substitute[:substitute.lower().index(b'.replace(')] + if b".replace(" in substitute.lower(): + substitute = substitute[: substitute.lower().index(b".replace(")] output = output.replace(strreplace, substitute) # Process global string replace @@ -151,26 +154,27 @@ def string_replace(text: bytes) -> Optional[bytes]: for str1, str2 in replacements: output = output.replace(str1, str2) # Process VB string replace - replacements = regex.findall(rb'Replace\(\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?' - rb'([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?', output) + replacements = regex.findall( + rb'Replace\(\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?' rb'([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?', + output, + ) for str1, str2, str3 in replacements: output = output.replace(str1, str1.replace(str2, str3)) - output = regex.sub(rb'\.replace\(\s*/([^)]+)/g?, [\'"]([^\'"]*)[\'"]\)', b'', output) + output = regex.sub(rb'\.replace\(\s*/([^)]+)/g?, [\'"]([^\'"]*)[\'"]\)', b"", output) if output != text: return output return None - @staticmethod def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: - """ Parse variables of fake arrays """ - replacements = regex.findall(rb'var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\[(\d+)\]', text) + """Parse variables of fake arrays""" + replacements = regex.findall(rb"var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\[(\d+)\]", text) if len(replacements) > 0: # ,- Make sure we do not process these again - output = regex.sub(rb'var\s+([^=]+)\s*=', rb'XXX \1 =', text) + output = regex.sub(rb"var\s+([^=]+)\s*=", rb"XXX \1 =", text) for varname, array, pos in replacements: try: - value = regex.split(rb'\s*,\s*', array)[int(pos)] + value = regex.split(rb"\s*,\s*", array)[int(pos)] except IndexError: # print '[' + array + '][' + pos + ']' break @@ -180,19 +184,20 @@ def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: return None def array_of_strings(self, text: bytes) -> Optional[bytes]: - """ Replace arrays of strings with the combined string """ + """Replace arrays of strings with the combined string""" # noinspection PyBroadException try: - replacements = regex.findall(rb'var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\s*;', text) + replacements = regex.findall(rb"var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\s*;", text) if len(replacements) > 0: # ,- Make sure we do not process these again output = text for varname, values in replacements: - occurences = [int(x) for x in regex.findall(varname + rb'\s*\[(\d+)\]', output)] + occurences = [int(x) for x in regex.findall(varname + rb"\s*\[(\d+)\]", output)] for i in occurences: try: - output = regex.sub(varname + rb'\s*\[(%d)\]' % i, - values.split(b',')[i].replace(b'\\', b'\\\\'), output) + output = regex.sub( + varname + rb"\s*\[(%d)\]" % i, values.split(b",")[i].replace(b"\\", b"\\\\"), output + ) except IndexError: # print '[' + array + '][' + pos + ']' break @@ -205,12 +210,12 @@ def array_of_strings(self, text: bytes) -> Optional[bytes]: @staticmethod def powershell_vars(text: bytes) -> Optional[bytes]: - """ Replace PowerShell variables with their values """ - replacements_string = regex.findall(rb'(\$(?:\w+|{[^\}]+\}))\s*=[^=]\s*[\"\']([^\"\']+)[\"\']', text) - replacements_func = regex.findall(rb'(\$(?:\w+|{[^\}]+\}))\s*=\s*([^=\"\'\s$]{3,50})[\s]', text) + """Replace PowerShell variables with their values""" + replacements_string = regex.findall(rb"(\$(?:\w+|{[^\}]+\}))\s*=[^=]\s*[\"\']([^\"\']+)[\"\']", text) + replacements_func = regex.findall(rb"(\$(?:\w+|{[^\}]+\}))\s*=\s*([^=\"\'\s$]{3,50})[\s]", text) if len(replacements_string) > 0 or len(replacements_func) > 0: # ,- Make sure we do not process these again - output = regex.sub(rb'\$((?:\w+|{[^\}]+\}))\s*=', rb'\$--\1 =', text) + output = regex.sub(rb"\$((?:\w+|{[^\}]+\}))\s*=", rb"\$--\1 =", text) for varname, string in replacements_string: output = output.replace(varname, string) for varname, string in replacements_func: @@ -222,7 +227,7 @@ def powershell_vars(text: bytes) -> Optional[bytes]: @staticmethod def powershell_carets(text: bytes) -> Optional[bytes]: - """ Remove PowerShell carets """ + """Remove PowerShell carets""" try: if b"^" in text or b"`" in text: output = text @@ -240,24 +245,25 @@ def powershell_carets(text: bytes) -> Optional[bytes]: # noinspection PyBroadException def msoffice_embedded_script_string(self, text: bytes) -> Optional[bytes]: - """ Replace variables with their values in MSOffice embedded scripts """ + """Replace variables with their values in MSOffice embedded scripts""" try: scripts: Dict[bytes, List[bytes]] = {} output = text # bad, prevent false var replacements like YG="86" # Replace regular variables replacements = regex.findall( - rb'^(\s*(\w+)\s*=\s*\w*\s*\+?\s(["\'])(.+)["\']\s*\+\s*vbCrLf\s*$)', output, regex.M) + rb'^(\s*(\w+)\s*=\s*\w*\s*\+?\s(["\'])(.+)["\']\s*\+\s*vbCrLf\s*$)', output, regex.M + ) if len(replacements) > 0: for full, variable_name, delim, value in replacements: scripts.setdefault(variable_name, []) scripts[variable_name].append(value.replace(delim + delim, delim)) - output = output.replace(full, b'') + output = output.replace(full, b"") for script_var, script_lines in scripts.items(): - new_script_name = b'new_script__' + script_var - output = regex.sub(rb'(.+)\b' + script_var + rb'\b', b'\\1' + new_script_name, output) - output += b"\n\n\n' ---- script referenced by \"" + new_script_name + b"\" ----\n\n\n" + new_script_name = b"new_script__" + script_var + output = regex.sub(rb"(.+)\b" + script_var + rb"\b", b"\\1" + new_script_name, output) + output += b"\n\n\n' ---- script referenced by \"" + new_script_name + b'" ----\n\n\n' output += b"\n".join(script_lines) if output == text: @@ -269,49 +275,66 @@ def msoffice_embedded_script_string(self, text: bytes) -> Optional[bytes]: return None def mswordmacro_vars(self, text: bytes) -> Optional[bytes]: - """ Replaces Microsoft Word variables with their values """ + """Replaces Microsoft Word variables with their values""" # noinspection PyBroadException try: output = text # prevent false var replacements like YG="86" # Replace regular variables - replacements = regex.findall(rb'^\s*((?:Const[\s]*)?(\w+)\s*=' - rb'\s*((?:["][^"]+["]|[\'][^\']+[\']|[0-9]*)))[\s\r]*$', - output, regex.MULTILINE | regex.DOTALL) + replacements = regex.findall( + rb"^\s*((?:Const[\s]*)?(\w+)\s*=" rb'\s*((?:["][^"]+["]|[\'][^\']+[\']|[0-9]*)))[\s\r]*$', + output, + regex.MULTILINE | regex.DOTALL, + ) if len(replacements) > 0: # If one variable is defined more then once take the second definition replacements = [(v[0], k, v[1]) for k, v in {i[1]: (i[0], i[2]) for i in replacements}.items()] for full, varname, value in replacements: - if len(regex.findall(rb'\b' + varname + rb'\b', output)) == 1: + if len(regex.findall(rb"\b" + varname + rb"\b", output)) == 1: # If there is only one instance of these, it's probably noise. - output = output.replace(full, b'') + output = output.replace(full, b"") else: final_val = value.replace(b'"', b"") # Stacked strings # b = "he" # b = b & "llo " # b = b & "world!" - stacked = regex.findall(rb'^\s*(' + varname + rb'\s*=\s*' - + varname + rb'\s*[+&]\s*((?:["][^"]+["]|[\'][^\']+[\'])))[\s\r]*$', - output, regex.MULTILINE | regex.DOTALL) + stacked = regex.findall( + rb"^\s*(" + + varname + + rb"\s*=\s*" + + varname + + rb'\s*[+&]\s*((?:["][^"]+["]|[\'][^\']+[\'])))[\s\r]*$', + output, + regex.MULTILINE | regex.DOTALL, + ) if len(stacked) > 0: for sfull, val in stacked: final_val += val.replace(b'"', b"") - output = output.replace(sfull, b'') - output = output.replace(full, b'') + output = output.replace(sfull, b"") + output = output.replace(full, b"") # If more than a of the variable name left, the assumption is that this did not # work according to plan, so just replace a few for now. - output = regex.sub(rb'(\b' + regex.escape(varname) + - rb'(?!\s*(?:=|[+&]\s*' + regex.escape(varname) + rb'))\b)', - b'"' + final_val.replace(b"\\", b"\\\\") + b'"', - output, count=5) + output = regex.sub( + rb"(\b" + + regex.escape(varname) + + rb"(?!\s*(?:=|[+&]\s*" + + regex.escape(varname) + + rb"))\b)", + b'"' + final_val.replace(b"\\", b"\\\\") + b'"', + output, + count=5, + ) # output = regex.sub(rb'(.*[^\s].*)\b' + varname + rb'\b', # b'\\1"' + final_val.replace(b"\\", b"\\\\") + b'"', # output) # Remaining stacked strings - replacements = regex.findall(rb'^\s*((\w+)\s*=\s*(\w+)\s*[+&]\s*((?:["][^"]+["]|[\'][^\']+[\'])))[\s\r]*$', - output, regex.MULTILINE | regex.DOTALL) + replacements = regex.findall( + rb'^\s*((\w+)\s*=\s*(\w+)\s*[+&]\s*((?:["][^"]+["]|[\'][^\']+[\'])))[\s\r]*$', + output, + regex.MULTILINE | regex.DOTALL, + ) replacements_vars = {x[1] for x in replacements} for v in replacements_vars: final_val = b"" @@ -319,11 +342,13 @@ def mswordmacro_vars(self, text: bytes) -> Optional[bytes]: if varname != v: continue final_val += value.replace(b'"', b"") - output = output.replace(full, b'') - output = regex.sub(rb'(\b' + v + - rb'(?!\s*(?:=|[+&]\s*' + v + rb'))\b)', - b'"' + final_val.replace(b"\\", b"\\\\") + b'"', - output, count=5) + output = output.replace(full, b"") + output = regex.sub( + rb"(\b" + v + rb"(?!\s*(?:=|[+&]\s*" + v + rb"))\b)", + b'"' + final_val.replace(b"\\", b"\\\\") + b'"', + output, + count=5, + ) if output == text: return None @@ -334,7 +359,7 @@ def mswordmacro_vars(self, text: bytes) -> Optional[bytes]: return None def simple_xor_function(self, text: bytes) -> Optional[bytes]: - """ Tries XORing the text with potential keys found in the text """ + """Tries XORing the text with potential keys found in the text""" xorstrings = regex.findall(rb'(\w+\("((?:[0-9A-Fa-f][0-9A-Fa-f])+)"\s*,\s*"([^"]+)"\))', text) option_a: List[Tuple[bytes, bytes, bytes, Optional[bytes]]] = [] option_b: List[Tuple[bytes, bytes, bytes, Optional[bytes]]] = [] @@ -370,30 +395,30 @@ def simple_xor_function(self, text: bytes) -> Optional[bytes]: @staticmethod def xor_with_key(s: bytes, k: bytes) -> bytes: - """ XOR s using the key k """ + """XOR s using the key k""" return bytes([a ^ b for a, b in zip(s, (len(s) // len(k) + 1) * k)]) @staticmethod def zp_xor_with_key(s: bytes, k: bytes) -> bytes: - """ XOR variant where xoring is skipped for 0 bytes and when the byte is equal to the keybyte """ + """XOR variant where xoring is skipped for 0 bytes and when the byte is equal to the keybyte""" return bytes([a if a in (0, b) else a ^ b for a, b in zip(s, (len(s) // len(k) + 1) * k)]) @staticmethod def clean_up_final_layer(text: bytes) -> bytes: - """ Remove deobfuscripter artifacts from final layer for display """ - output = regex.sub(rb'\r', b'', text) - output = regex.sub(rb']+>\n?', b'', output) + """Remove deobfuscripter artifacts from final layer for display""" + output = regex.sub(rb"\r", b"", text) + output = regex.sub(rb"]+>\n?", b"", output) return output # noinspection PyBroadException def extract_htmlscript(self, text: bytes) -> List[bytes]: - """ Extract scripts from html """ + """Extract scripts from html""" objects = [] try: - html = BeautifulSoup(text, 'lxml') - for tag_type in ['object', 'embed', 'script']: + html = BeautifulSoup(text, "lxml") + for tag_type in ["object", "embed", "script"]: for s in html.find_all(tag_type): - objects.append(str(s).encode('utf-8')) + objects.append(str(s).encode("utf-8")) except Exception as e: self.log.warning(f"Failure in extract_htmlscript function: {str(e)}") objects = [] @@ -410,28 +435,26 @@ def execute(self, request: ServiceRequest) -> None: # --- Prepare Techniques ---------------------------------------------------------------------------------- first_pass: TechniqueList = [ - ('MSOffice Embedded script', self.msoffice_embedded_script_string), - ('Powershell carets', self.powershell_carets), - ('Array of strings', self.array_of_strings), - ('Fake array vars', self.vars_of_fake_arrays), - ('Simple XOR function', self.simple_xor_function), + ("MSOffice Embedded script", self.msoffice_embedded_script_string), + ("Powershell carets", self.powershell_carets), + ("Array of strings", self.array_of_strings), + ("Fake array vars", self.vars_of_fake_arrays), + ("Simple XOR function", self.simple_xor_function), ] second_pass: TechniqueList = [ - ('MSWord macro vars', self.mswordmacro_vars), - ('Powershell vars', self.powershell_vars), - ('Hex Charcodes', self.charcode_hex), + ("MSWord macro vars", self.mswordmacro_vars), + ("Powershell vars", self.powershell_vars), + ("Hex Charcodes", self.charcode_hex), # ('Octal Charcodes', self.charcode_oct), - ('Unicode Charcodes', self.charcode_unicode), - ('XML Charcodes', self.charcode_xml), - ('Hex Int Constants', self.hex_constant), + ("Unicode Charcodes", self.charcode_unicode), + ("XML Charcodes", self.charcode_xml), + ("Hex Int Constants", self.hex_constant), ] second_pass.extend(first_pass) final_pass: TechniqueList = [] final_pass.extend(second_pass) - code_extracts = [ - ('.*html.*', "HTML scripts extraction", self.extract_htmlscript) - ] + code_extracts = [(".*html.*", "HTML scripts extraction", self.extract_htmlscript)] layer = request.file_contents @@ -445,20 +468,21 @@ def execute(self, request: ServiceRequest) -> None: break if len(layer.strip()) < 3: return # No script present in file - if request.file_type == 'code/ps1': + if request.file_type == "code/ps1": sig = regex.search( - rb'# SIG # Begin signature block\r\n(?:# [A-Za-z0-9+/=]+\r\n)+# SIG # End signature block', - request.file_contents) + rb"# SIG # Begin signature block\r\n(?:# [A-Za-z0-9+/=]+\r\n)+# SIG # End signature block", + request.file_contents, + ) if sig: - layer = layer[:sig.start()] + layer[sig.end():] - lines = sig.group().split(b'\r\n# ') - base64 = b''.join(line.strip() for line in lines[1:-1]) + layer = layer[: sig.start()] + layer[sig.end() :] + lines = sig.group().split(b"\r\n# ") + base64 = b"".join(line.strip() for line in lines[1:-1]) try: # Extract signature signature = binascii.a2b_base64(base64) - sig_filename = 'powershell_signature' + sig_filename = "powershell_signature" sig_path = os.path.join(self.working_directory, sig_filename) - with open(sig_path, 'wb+') as f: + with open(sig_path, "wb+") as f: f.write(signature) request.add_extracted(sig_path, sig_filename, "Powershell Signature") extract_res.add_line(f"Powershell Signature Comment, see {sig_filename}") @@ -489,16 +513,19 @@ def execute(self, request: ServiceRequest) -> None: # --- Final Layer ----------------------------------------------------------------------------------------- layer, final_techniques, final_iocs = self._deobfuscripter_pass(layer, final_pass, md, final=True) if final_techniques: - passes[n_pass+1] = final_techniques, filter_iocs(final_iocs, before_deobfuscation, seen_iocs) + passes[n_pass + 1] = final_techniques, filter_iocs(final_iocs, before_deobfuscation, seen_iocs) # Get new reversed iocs rev_iocs = filter_iocs(md.ioc_tags(layer[::-1]), before_deobfuscation, seen_iocs) # --- Compiling results ----------------------------------------------------------------------------------- - if request.get_param('extract_original_iocs'): + if request.get_param("extract_original_iocs"): pat_values = get_tree_tags(md.multidecoder.scan(before_deobfuscation, 1)) - ioc_res = ResultSection("The following IOCs were found in the original file", parent=request.result, - body_format=BODY_FORMAT.MEMORY_DUMP) + ioc_res = ResultSection( + "The following IOCs were found in the original file", + parent=request.result, + body_format=BODY_FORMAT.MEMORY_DUMP, + ) for k, val in pat_values.items(): for v in val: if ioc_res: @@ -514,9 +541,9 @@ def execute(self, request: ServiceRequest) -> None: # Display obfuscation steps heuristic = Heuristic(1) - mres = ResultSection("De-obfuscation steps taken by DeobsfuScripter", - parent=request.result, - heuristic=heuristic) + mres = ResultSection( + "De-obfuscation steps taken by DeobsfuScripter", parent=request.result, heuristic=heuristic + ) tech_count = Counter() for p in passes.values(): @@ -534,17 +561,20 @@ def execute(self, request: ServiceRequest) -> None: file_path = os.path.join(self.working_directory, file_name) # Ensure directory exists before write os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, 'wb+') as f: + with open(file_path, "wb+") as f: f.write(clean) self.log.debug(f"Submitted dropped file for analysis: {file_path}") request.add_supplementary(file_path, file_name, "Final deobfuscated layer") - ResultSection(f"First {byte_count} bytes of the final layer:", body=safe_str(clean[:byte_count]), - body_format=BODY_FORMAT.MEMORY_DUMP, parent=request.result) + ResultSection( + f"First {byte_count} bytes of the final layer:", + body=safe_str(clean[:byte_count]), + body_format=BODY_FORMAT.MEMORY_DUMP, + parent=request.result, + ) # Report new IOCs - new_ioc_res = ResultSection("New IOCs found after de-obfustcation", - body_format=BODY_FORMAT.MEMORY_DUMP) + new_ioc_res = ResultSection("New IOCs found after de-obfustcation", body_format=BODY_FORMAT.MEMORY_DUMP) heuristic = 0 for n_pass, (_, iocs) in passes.items(): if not iocs: @@ -555,15 +585,16 @@ def execute(self, request: ServiceRequest) -> None: if n_pass == 0: # iocs in the first pass can be found by other services heuristic = 5 elif heuristic < 7: - heuristic = 7 if 'network' in ioc_type and ioc_type != 'network.static.domain' else 6 + heuristic = 7 if "network" in ioc_type and ioc_type != "network.static.domain" else 6 new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") new_ioc_res.add_tag(ioc_type, ioc) if rev_iocs: new_ioc_res.add_line("New IOCs found reversed in the final layer:") for ioc_type in rev_iocs: for ioc in rev_iocs[ioc_type]: - heuristic = max(7 if 'network' in ioc_type and ioc_type != 'network.static.domain' - else 6, heuristic) + heuristic = max( + 7 if "network" in ioc_type and ioc_type != "network.static.domain" else 6, heuristic + ) new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") new_ioc_res.add_tag(ioc_type, ioc) if heuristic > 0: @@ -573,23 +604,29 @@ def execute(self, request: ServiceRequest) -> None: # Report extracted files if md.extracted_files: - ext_file_res = ResultSection("The following files were extracted during the deobfuscation", - heuristic=Heuristic(8), parent=request.result) + ext_file_res = ResultSection( + "The following files were extracted during the deobfuscation", + heuristic=Heuristic(8), + parent=request.result, + ) for extracted in md.extracted_files: file_name = os.path.basename(extracted) try: - if request.add_extracted(extracted, file_name, "File of interest deobfuscated from sample", - safelist_interface=self.api_interface): + if request.add_extracted( + extracted, + file_name, + "File of interest deobfuscated from sample", + safelist_interface=self.api_interface, + ): ext_file_res.add_line(file_name) except MaxExtractedExceeded: - self.log.warning('Extraction limit exceeded while adding files of interest.') + self.log.warning("Extraction limit exceeded while adding files of interest.") break @staticmethod - def _deobfuscripter_pass(layer: bytes, - techniques: TechniqueList, - md: DecoderWrapper, - final=False) -> tuple[bytes, list[str], dict]: + def _deobfuscripter_pass( + layer: bytes, techniques: TechniqueList, md: DecoderWrapper, final=False + ) -> tuple[bytes, list[str], dict]: techniques_used = [] for name, technique in techniques: result = technique(layer) From 70f257498e46f4e1c3e4bb90a7d4eb859109fe31 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 27 Sep 2023 16:16:15 -0400 Subject: [PATCH 12/20] Ignore formatting commit in git blame --- .git-blame-ignore-revs | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .git-blame-ignore-revs diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..dc63c24 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,2 @@ +# Format with black +acbafe23becdc6e7aaed841ec62b79f384b4e37f From fb06a94ca256f85ca2c2a9bcacf450b4c63e1312 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Fri, 6 Oct 2023 17:20:56 -0400 Subject: [PATCH 13/20] Pin assemblyline-service-utilities range --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 52f8c94..ac69b97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -assemblyline-service-utilities +assemblyline-service-utilities>=4.5,<4.6 beautifulsoup4 lxml regex From a8a4ddf413dd4af02057df6c3cb34b4600dee443 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Fri, 6 Oct 2023 17:25:14 -0400 Subject: [PATCH 14/20] Replace depricated multidecoder functions --- deobs.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/deobs.py b/deobs.py index c726d6b..d8dce3e 100644 --- a/deobs.py +++ b/deobs.py @@ -15,14 +15,15 @@ from assemblyline_v4_service.common.request import MaxExtractedExceeded, ServiceRequest from assemblyline_v4_service.common.result import BODY_FORMAT, Heuristic, Result, ResultSection from bs4 import BeautifulSoup -from multidecoder.query import obfuscation_counts, squash_replace # Type declarations TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]] -def filter_iocs(iocs, original: bytes, seen: set, reversed=False): - new_iocs = defaultdict(set) +def filter_iocs( + iocs: dict[str, set[bytes]], original: bytes, seen: set[bytes], reversed: object = False +) -> dict[str, set[bytes]]: + new_iocs: defaultdict[str, set[bytes]] = defaultdict(set) for ioc_type in iocs: for ioc in iocs[ioc_type]: prefix = b"/".join(ioc.split(b"/", 3)[:3]) if ioc_type == "network.static.uri" else ioc @@ -40,7 +41,7 @@ class DeobfuScripter(ServiceBase): VALIDCHARS = b" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" BINCHARS = bytes(list(set(range(0, 256)) - set(VALIDCHARS))) - def __init__(self, config: Optional[Dict] = None) -> None: + def __init__(self, config: dict | None = None) -> None: super().__init__(config) # --- Support Modules ---------------------------------------------------------------------------------------------- @@ -55,7 +56,7 @@ def encode_codepoint(codepoint: int) -> bytes: return chr(codepoint).encode("utf-8") @staticmethod - def codepoint_sub(match: regex.Match, base: int = 16) -> bytes: + def codepoint_sub(match: regex.Match[bytes], base: int = 16) -> bytes: """Replace method for unicode codepoint regex substitutions. Args: @@ -494,7 +495,7 @@ def execute(self, request: ServiceRequest) -> None: before_deobfuscation = layer # --- Stage 2: Deobsfucation ------------------------------------------------------------------------------ - seen_iocs = set() + seen_iocs: set[bytes] = set() passes: dict[int, tuple[list[str], dict[str, set[bytes]]]] = {} techniques = first_pass n_pass = 0 # Ensure n_pass is bound outside of the loop @@ -545,7 +546,7 @@ def execute(self, request: ServiceRequest) -> None: "De-obfuscation steps taken by DeobsfuScripter", parent=request.result, heuristic=heuristic ) - tech_count = Counter() + tech_count: Counter[str] = Counter() for p in passes.values(): tech_count.update(p[0]) for tech, count in tech_count.items(): @@ -625,8 +626,8 @@ def execute(self, request: ServiceRequest) -> None: @staticmethod def _deobfuscripter_pass( - layer: bytes, techniques: TechniqueList, md: DecoderWrapper, final=False - ) -> tuple[bytes, list[str], dict]: + layer: bytes, techniques: TechniqueList, md: DecoderWrapper, final: object = False + ) -> tuple[bytes, list[str], dict[str, set[bytes]]]: techniques_used = [] for name, technique in techniques: result = technique(layer) @@ -640,7 +641,9 @@ def _deobfuscripter_pass( else: tree = md.multidecoder.scan(layer, depth=1) md.extract_files(tree, 500) - techniques_used.extend(obfuscation_counts(tree).keys()) + obfuscations = set(node.obfuscation for node in tree) + obfuscations.discard(b"") + techniques_used.extend(obfuscations) iocs = get_tree_tags(tree) # Get IoCs for the pass - layer = squash_replace(layer, tree) + layer = tree.flatten() return layer, techniques_used, iocs From 7df6d13a60747cc1fe3e2d429567841e37ccee10 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Wed, 11 Oct 2023 13:29:03 -0400 Subject: [PATCH 15/20] Replace depricated methods and lint fixed --- deobs.py | 126 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 70 insertions(+), 56 deletions(-) diff --git a/deobs.py b/deobs.py index d8dce3e..9f50117 100644 --- a/deobs.py +++ b/deobs.py @@ -1,4 +1,4 @@ -""" DeobfuScripter: Script Deobfuscation Service """ +"""DeobfuScripter: Script Deobfuscation Service.""" from __future__ import annotations @@ -6,7 +6,7 @@ import os from collections import Counter, defaultdict from functools import partial -from typing import Callable, Dict, List, Optional, Tuple +from typing import Callable, Optional import regex from assemblyline.common.str_utils import safe_str @@ -17,12 +17,21 @@ from bs4 import BeautifulSoup # Type declarations -TechniqueList = List[Tuple[str, Callable[[bytes], Optional[bytes]]]] +TechniqueList = list[tuple[str, Callable[[bytes], Optional[bytes]]]] def filter_iocs( - iocs: dict[str, set[bytes]], original: bytes, seen: set[bytes], reversed: object = False + iocs: dict[str, set[bytes]], + original: bytes, + seen: set[bytes], + *, + reversed: object = False, ) -> dict[str, set[bytes]]: + """Filter IOCs against the original text and those already found. + + IOCs are filtered if they are found in original or are in seen. + network.static.uri tags are filtered based on segments before the path only. + """ new_iocs: defaultdict[str, set[bytes]] = defaultdict(set) for ioc_type in iocs: for ioc in iocs[ioc_type]: @@ -36,10 +45,10 @@ def filter_iocs( class DeobfuScripter(ServiceBase): - """Service for deobfuscating scripts""" + """Service for deobfuscating scripts.""" VALIDCHARS = b" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" - BINCHARS = bytes(list(set(range(0, 256)) - set(VALIDCHARS))) + BINCHARS = bytes(list(set(range(256)) - set(VALIDCHARS))) def __init__(self, config: dict | None = None) -> None: super().__init__(config) @@ -47,12 +56,12 @@ def __init__(self, config: dict | None = None) -> None: # --- Support Modules ---------------------------------------------------------------------------------------------- def printable_ratio(self, text: bytes) -> float: - """Calcuate the ratio of printable characters to total characters in text""" + """Calcuate the ratio of printable characters to total characters in text.""" return float(float(len(text.translate(None, self.BINCHARS))) / float(len(text))) @staticmethod def encode_codepoint(codepoint: int) -> bytes: - """Returns the utf-8 encoding of a unicode codepoint""" + """Get the encoding from unicode codepoint.""" return chr(codepoint).encode("utf-8") @staticmethod @@ -62,6 +71,7 @@ def codepoint_sub(match: regex.Match[bytes], base: int = 16) -> bytes: Args: match: The regex match object with the text of the unicode codepoint value as group 1. base: The base that the unicode codepoint is represented in (defaults to hexadecimal) + Returns: - The utf-8 byte sequence for the codepoint if it can be decoded. - The original match text if there is a decoding error. @@ -73,49 +83,49 @@ def codepoint_sub(match: regex.Match[bytes], base: int = 16) -> bytes: @staticmethod def add1b(s: bytes, k: int) -> bytes: - """Add k to each byte of s""" + """Add k to each byte of s.""" return bytes([(c + k) & 0xFF for c in s]) @staticmethod - def charcode(text: bytes) -> Optional[bytes]: - """Replace character codes with the corresponding characters""" + def charcode(text: bytes) -> bytes | None: + """Replace character codes with the corresponding characters.""" # Todo: something to handle powershell bytes syntax @staticmethod - def charcode_hex(text: bytes) -> Optional[bytes]: - """Replace hex character codes with the corresponding characters""" + def charcode_hex(text: bytes) -> bytes | None: + """Replace hex character codes with the corresponding characters.""" output = regex.sub(rb"(?i)(?:\\x|%)([a-f0-9]{2})", lambda m: binascii.unhexlify(m.group(1)), text) return output if output != text else None # Todo: find a way to prevent charcode_oct from mangling windows filepaths with sections that start with 0-7 @staticmethod - def charcode_oct(text: bytes) -> Optional[bytes]: - """Replace octal character codes with the corresponding characters""" + def charcode_oct(text: bytes) -> bytes | None: + """Replace octal character codes with the corresponding characters.""" output = regex.sub(rb"\\([0-7]{1,3})", partial(DeobfuScripter.codepoint_sub, base=8), text) return output if output != text else None @staticmethod - def charcode_unicode(text: bytes) -> Optional[bytes]: - """Replace unicode character codes with the corresponding utf-8 byte sequence""" + def charcode_unicode(text: bytes) -> bytes | None: + """Replace unicode character codes with the corresponding utf-8 byte sequence.""" output = regex.sub(rb"(?i)(?:\\u|%u)([a-f0-9]{4})", DeobfuScripter.codepoint_sub, text) return output if output != text else None @staticmethod - def charcode_xml(text: bytes) -> Optional[bytes]: - """Replace XML escape sequences with the corresponding character""" + def charcode_xml(text: bytes) -> bytes | None: + """Replace XML escape sequences with the corresponding character.""" output = regex.sub(rb"(?i)&#x([a-z0-9]{1,6});", DeobfuScripter.codepoint_sub, text) output = regex.sub(rb"&#([0-9]{1,7});", partial(DeobfuScripter.codepoint_sub, base=10), output) return output if output != text else None @staticmethod - def hex_constant(text: bytes) -> Optional[bytes]: - """Replace hexadecimal integer constants with decimal ones""" + def hex_constant(text: bytes) -> bytes | None: + """Replace hexadecimal integer constants with decimal ones.""" output = regex.sub(rb"(?i)\b0x([a-f0-9]{1,16})\b", lambda m: str(int(m.group(1), 16)).encode("utf-8"), text) return output if output != text else None @staticmethod - def chr_decode(text: bytes) -> Optional[bytes]: - """Replace calls to chr with the corresponding character""" + def chr_decode(text: bytes) -> bytes | None: + """Replace calls to chr with the corresponding character.""" output = text for fullc, c in regex.findall(rb"(chr[bw]?\(([0-9]{1,3})\))", output, regex.I): # noinspection PyBroadException @@ -128,8 +138,8 @@ def chr_decode(text: bytes) -> Optional[bytes]: return output @staticmethod - def string_replace(text: bytes) -> Optional[bytes]: - """Replace calls to replace() with their output""" + def string_replace(text: bytes) -> bytes | None: + """Replace calls to replace() with their output.""" if b"replace(" in text.lower(): # Process string with replace functions calls # Such as "SaokzueofpigxoFile".replace(/ofpigx/g, "T").replace(/okzu/g, "v") @@ -141,7 +151,9 @@ def string_replace(text: bytes) -> Optional[bytes]: substitute = strreplace # Extract all substitutions for str1, str2 in regex.findall( - rb'\.replace\([/\'"]([^,]+)[/\'\"]g?\s*,\s*[\'\"]([^)]*)[\'\"]\)', substitute, flags=regex.I + rb'\.replace\([/\'"]([^,]+)[/\'\"]g?\s*,\s*[\'\"]([^)]*)[\'\"]\)', + substitute, + flags=regex.I, ): # Execute the substitution substitute = substitute.replace(str1, str2) @@ -167,8 +179,8 @@ def string_replace(text: bytes) -> Optional[bytes]: return None @staticmethod - def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: - """Parse variables of fake arrays""" + def vars_of_fake_arrays(text: bytes) -> bytes | None: + """Parse variables of fake arrays.""" replacements = regex.findall(rb"var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\[(\d+)\]", text) if len(replacements) > 0: # ,- Make sure we do not process these again @@ -184,8 +196,8 @@ def vars_of_fake_arrays(text: bytes) -> Optional[bytes]: return output return None - def array_of_strings(self, text: bytes) -> Optional[bytes]: - """Replace arrays of strings with the combined string""" + def array_of_strings(self, text: bytes) -> bytes | None: + """Replace arrays of strings with the combined string.""" # noinspection PyBroadException try: replacements = regex.findall(rb"var\s+([^\s=]+)\s*=\s*\[([^\]]+)\]\s*;", text) @@ -210,8 +222,8 @@ def array_of_strings(self, text: bytes) -> Optional[bytes]: return None @staticmethod - def powershell_vars(text: bytes) -> Optional[bytes]: - """Replace PowerShell variables with their values""" + def powershell_vars(text: bytes) -> bytes | None: + """Replace PowerShell variables with their values.""" replacements_string = regex.findall(rb"(\$(?:\w+|{[^\}]+\}))\s*=[^=]\s*[\"\']([^\"\']+)[\"\']", text) replacements_func = regex.findall(rb"(\$(?:\w+|{[^\}]+\}))\s*=\s*([^=\"\'\s$]{3,50})[\s]", text) if len(replacements_string) > 0 or len(replacements_func) > 0: @@ -227,8 +239,8 @@ def powershell_vars(text: bytes) -> Optional[bytes]: return None @staticmethod - def powershell_carets(text: bytes) -> Optional[bytes]: - """Remove PowerShell carets""" + def powershell_carets(text: bytes) -> bytes | None: + """Remove PowerShell carets.""" try: if b"^" in text or b"`" in text: output = text @@ -245,10 +257,10 @@ def powershell_carets(text: bytes) -> Optional[bytes]: return None # noinspection PyBroadException - def msoffice_embedded_script_string(self, text: bytes) -> Optional[bytes]: - """Replace variables with their values in MSOffice embedded scripts""" + def msoffice_embedded_script_string(self, text: bytes) -> bytes | None: + """Replace variables with their values in MSOffice embedded scripts.""" try: - scripts: Dict[bytes, List[bytes]] = {} + scripts: dict[bytes, list[bytes]] = {} output = text # bad, prevent false var replacements like YG="86" # Replace regular variables @@ -275,8 +287,8 @@ def msoffice_embedded_script_string(self, text: bytes) -> Optional[bytes]: self.log.warning(f"Technique msoffice_embedded_script_string failed with error: {str(e)}") return None - def mswordmacro_vars(self, text: bytes) -> Optional[bytes]: - """Replaces Microsoft Word variables with their values""" + def mswordmacro_vars(self, text: bytes) -> bytes | None: + """Replaces Microsoft Word variables with their values.""" # noinspection PyBroadException try: output = text @@ -359,11 +371,11 @@ def mswordmacro_vars(self, text: bytes) -> Optional[bytes]: self.log.warning(f"Technique mswordmacro_vars failed with error: {str(e)}") return None - def simple_xor_function(self, text: bytes) -> Optional[bytes]: - """Tries XORing the text with potential keys found in the text""" + def simple_xor_function(self, text: bytes) -> bytes | None: + """Tries XORing the text with potential keys found in the text.""" xorstrings = regex.findall(rb'(\w+\("((?:[0-9A-Fa-f][0-9A-Fa-f])+)"\s*,\s*"([^"]+)"\))', text) - option_a: List[Tuple[bytes, bytes, bytes, Optional[bytes]]] = [] - option_b: List[Tuple[bytes, bytes, bytes, Optional[bytes]]] = [] + option_a: list[tuple[bytes, bytes, bytes, bytes | None]] = [] + option_b: list[tuple[bytes, bytes, bytes, bytes | None]] = [] output = text for f, x, k in xorstrings: res = self.xor_with_key(binascii.a2b_hex(x), k) @@ -396,24 +408,24 @@ def simple_xor_function(self, text: bytes) -> Optional[bytes]: @staticmethod def xor_with_key(s: bytes, k: bytes) -> bytes: - """XOR s using the key k""" + """XOR s using the key k.""" return bytes([a ^ b for a, b in zip(s, (len(s) // len(k) + 1) * k)]) @staticmethod def zp_xor_with_key(s: bytes, k: bytes) -> bytes: - """XOR variant where xoring is skipped for 0 bytes and when the byte is equal to the keybyte""" + """XOR variant where xoring is skipped for 0 bytes and when the byte is equal to the keybyte.""" return bytes([a if a in (0, b) else a ^ b for a, b in zip(s, (len(s) // len(k) + 1) * k)]) @staticmethod def clean_up_final_layer(text: bytes) -> bytes: - """Remove deobfuscripter artifacts from final layer for display""" + """Remove deobfuscripter artifacts from final layer for display.""" output = regex.sub(rb"\r", b"", text) output = regex.sub(rb"]+>\n?", b"", output) return output # noinspection PyBroadException - def extract_htmlscript(self, text: bytes) -> List[bytes]: - """Extract scripts from html""" + def extract_htmlscript(self, text: bytes) -> list[bytes]: + """Extract scripts from html.""" objects = [] try: html = BeautifulSoup(text, "lxml") @@ -517,7 +529,7 @@ def execute(self, request: ServiceRequest) -> None: passes[n_pass + 1] = final_techniques, filter_iocs(final_iocs, before_deobfuscation, seen_iocs) # Get new reversed iocs - rev_iocs = filter_iocs(md.ioc_tags(layer[::-1]), before_deobfuscation, seen_iocs) + rev_iocs = filter_iocs(md.ioc_tags(layer[::-1]), before_deobfuscation, seen_iocs, reversed=True) # --- Compiling results ----------------------------------------------------------------------------------- if request.get_param("extract_original_iocs"): @@ -594,7 +606,8 @@ def execute(self, request: ServiceRequest) -> None: for ioc_type in rev_iocs: for ioc in rev_iocs[ioc_type]: heuristic = max( - 7 if "network" in ioc_type and ioc_type != "network.static.domain" else 6, heuristic + 7 if "network" in ioc_type and ioc_type != "network.static.domain" else 6, + heuristic, ) new_ioc_res.add_line(f"Found {ioc_type.upper().replace('.', ' ')}: {safe_str(ioc)}") new_ioc_res.add_tag(ioc_type, ioc) @@ -626,7 +639,11 @@ def execute(self, request: ServiceRequest) -> None: @staticmethod def _deobfuscripter_pass( - layer: bytes, techniques: TechniqueList, md: DecoderWrapper, final: object = False + layer: bytes, + techniques: TechniqueList, + md: DecoderWrapper, + *, + final: object = False, ) -> tuple[bytes, list[str], dict[str, set[bytes]]]: techniques_used = [] for name, technique in techniques: @@ -636,12 +653,9 @@ def _deobfuscripter_pass( # Looks like it worked, continue with the new layer layer = result # Use multidecoder techniques and ioc tagging - if final: - tree = md.multidecoder.scan(layer) - else: - tree = md.multidecoder.scan(layer, depth=1) + tree = md.multidecoder.scan(layer) if final else md.multidecoder.scan(layer, depth=1) md.extract_files(tree, 500) - obfuscations = set(node.obfuscation for node in tree) + obfuscations = {node.obfuscation for node in tree} obfuscations.discard(b"") techniques_used.extend(obfuscations) iocs = get_tree_tags(tree) # Get IoCs for the pass From 3818f61e282c2b99610a1fce63632ed22eaf289d Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Mon, 16 Oct 2023 10:20:20 -0400 Subject: [PATCH 16/20] More lint fixes --- deobs.py | 45 +++++++++++++++++++++------------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/deobs.py b/deobs.py index 9f50117..5df9851 100644 --- a/deobs.py +++ b/deobs.py @@ -89,7 +89,7 @@ def add1b(s: bytes, k: int) -> bytes: @staticmethod def charcode(text: bytes) -> bytes | None: """Replace character codes with the corresponding characters.""" - # Todo: something to handle powershell bytes syntax + # TODO: something to handle powershell bytes syntax @staticmethod def charcode_hex(text: bytes) -> bytes | None: @@ -97,7 +97,7 @@ def charcode_hex(text: bytes) -> bytes | None: output = regex.sub(rb"(?i)(?:\\x|%)([a-f0-9]{2})", lambda m: binascii.unhexlify(m.group(1)), text) return output if output != text else None - # Todo: find a way to prevent charcode_oct from mangling windows filepaths with sections that start with 0-7 + # TODO: find a way to prevent charcode_oct from mangling windows filepaths with sections that start with 0-7 @staticmethod def charcode_oct(text: bytes) -> bytes | None: """Replace octal character codes with the corresponding characters.""" @@ -130,7 +130,7 @@ def chr_decode(text: bytes) -> bytes | None: for fullc, c in regex.findall(rb"(chr[bw]?\(([0-9]{1,3})\))", output, regex.I): # noinspection PyBroadException try: - output = regex.sub(regex.escape(fullc), f'"{chr(int(c))}"'.encode("utf-8"), output) + output = regex.sub(regex.escape(fullc), f'"{chr(int(c))}"'.encode(), output) except Exception: continue if output == text: @@ -168,7 +168,7 @@ def string_replace(text: bytes) -> bytes | None: output = output.replace(str1, str2) # Process VB string replace replacements = regex.findall( - rb'Replace\(\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?' rb'([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?', + rb'Replace\(\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?\s*,\s*["\']?([^,"\']*)["\']?', output, ) for str1, str2, str3 in replacements: @@ -189,7 +189,6 @@ def vars_of_fake_arrays(text: bytes) -> bytes | None: try: value = regex.split(rb"\s*,\s*", array)[int(pos)] except IndexError: - # print '[' + array + '][' + pos + ']' break output = output.replace(varname, value) if output != text: @@ -209,15 +208,16 @@ def array_of_strings(self, text: bytes) -> bytes | None: for i in occurences: try: output = regex.sub( - varname + rb"\s*\[(%d)\]" % i, values.split(b",")[i].replace(b"\\", b"\\\\"), output + varname + rb"\s*\[(%d)\]" % i, + values.split(b",")[i].replace(b"\\", b"\\\\"), + output, ) except IndexError: - # print '[' + array + '][' + pos + ']' break if output != text: return output except Exception as e: - self.log.warning(f"Technique array_of_strings failed with error: {str(e)}") + self.log.warning(f"Technique array_of_strings failed with error: {e!s}") return None @@ -265,7 +265,9 @@ def msoffice_embedded_script_string(self, text: bytes) -> bytes | None: # bad, prevent false var replacements like YG="86" # Replace regular variables replacements = regex.findall( - rb'^(\s*(\w+)\s*=\s*\w*\s*\+?\s(["\'])(.+)["\']\s*\+\s*vbCrLf\s*$)', output, regex.M + rb'^(\s*(\w+)\s*=\s*\w*\s*\+?\s(["\'])(.+)["\']\s*\+\s*vbCrLf\s*$)', + output, + regex.M, ) if len(replacements) > 0: for full, variable_name, delim, value in replacements: @@ -284,7 +286,7 @@ def msoffice_embedded_script_string(self, text: bytes) -> bytes | None: return output except Exception as e: - self.log.warning(f"Technique msoffice_embedded_script_string failed with error: {str(e)}") + self.log.warning(f"Technique msoffice_embedded_script_string failed with error: {e!s}") return None def mswordmacro_vars(self, text: bytes) -> bytes | None: @@ -338,9 +340,6 @@ def mswordmacro_vars(self, text: bytes) -> bytes | None: output, count=5, ) - # output = regex.sub(rb'(.*[^\s].*)\b' + varname + rb'\b', - # b'\\1"' + final_val.replace(b"\\", b"\\\\") + b'"', - # output) # Remaining stacked strings replacements = regex.findall( @@ -368,7 +367,7 @@ def mswordmacro_vars(self, text: bytes) -> bytes | None: return output except Exception as e: - self.log.warning(f"Technique mswordmacro_vars failed with error: {str(e)}") + self.log.warning(f"Technique mswordmacro_vars failed with error: {e!s}") return None def simple_xor_function(self, text: bytes) -> bytes | None: @@ -381,24 +380,22 @@ def simple_xor_function(self, text: bytes) -> bytes | None: res = self.xor_with_key(binascii.a2b_hex(x), k) if self.printable_ratio(res) == 1: option_a.append((f, x, k, res)) - # print 'A:',f,x,k, res else: option_a.append((f, x, k, None)) # try by shifting the key by 1 res = self.xor_with_key(binascii.a2b_hex(x), k[1:] + k[0:1]) if self.printable_ratio(res) == 1: option_b.append((f, x, k, res)) - # print 'B:',f,x,k, res else: option_b.append((f, x, k, None)) xorstrings = [] - if None not in map(lambda y: y[3], option_a): + if None not in (y[3] for y in option_a): xorstrings = option_a - elif None not in map(lambda z: z[3], option_b): + elif None not in (z[3] for z in option_b): xorstrings = option_b - for f, x, k, r in xorstrings: + for f, _, _, r in xorstrings: if r is not None: output = output.replace(f, b'"' + r + b'"') @@ -420,8 +417,7 @@ def zp_xor_with_key(s: bytes, k: bytes) -> bytes: def clean_up_final_layer(text: bytes) -> bytes: """Remove deobfuscripter artifacts from final layer for display.""" output = regex.sub(rb"\r", b"", text) - output = regex.sub(rb"]+>\n?", b"", output) - return output + return regex.sub(rb"]+>\n?", b"", output) # noinspection PyBroadException def extract_htmlscript(self, text: bytes) -> list[bytes]: @@ -433,7 +429,7 @@ def extract_htmlscript(self, text: bytes) -> list[bytes]: for s in html.find_all(tag_type): objects.append(str(s).encode("utf-8")) except Exception as e: - self.log.warning(f"Failure in extract_htmlscript function: {str(e)}") + self.log.warning(f"Failure in extract_htmlscript function: {e!s}") objects = [] return objects @@ -458,7 +454,6 @@ def execute(self, request: ServiceRequest) -> None: ("MSWord macro vars", self.mswordmacro_vars), ("Powershell vars", self.powershell_vars), ("Hex Charcodes", self.charcode_hex), - # ('Octal Charcodes', self.charcode_oct), ("Unicode Charcodes", self.charcode_unicode), ("XML Charcodes", self.charcode_xml), ("Hex Int Constants", self.hex_constant), @@ -555,7 +550,9 @@ def execute(self, request: ServiceRequest) -> None: # Display obfuscation steps heuristic = Heuristic(1) mres = ResultSection( - "De-obfuscation steps taken by DeobsfuScripter", parent=request.result, heuristic=heuristic + "De-obfuscation steps taken by DeobsfuScripter", + parent=request.result, + heuristic=heuristic, ) tech_count: Counter[str] = Counter() From c156239d582d1de739bfe8b2b8507acc4c8dacf0 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Mon, 16 Oct 2023 10:24:46 -0400 Subject: [PATCH 17/20] Documentation mood --- deobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deobs.py b/deobs.py index 5df9851..7969693 100644 --- a/deobs.py +++ b/deobs.py @@ -290,7 +290,7 @@ def msoffice_embedded_script_string(self, text: bytes) -> bytes | None: return None def mswordmacro_vars(self, text: bytes) -> bytes | None: - """Replaces Microsoft Word variables with their values.""" + """Replace Microsoft Word variables with their values.""" # noinspection PyBroadException try: output = text @@ -371,7 +371,7 @@ def mswordmacro_vars(self, text: bytes) -> bytes | None: return None def simple_xor_function(self, text: bytes) -> bytes | None: - """Tries XORing the text with potential keys found in the text.""" + """Try XORing the text with potential keys found in the text.""" xorstrings = regex.findall(rb'(\w+\("((?:[0-9A-Fa-f][0-9A-Fa-f])+)"\s*,\s*"([^"]+)"\))', text) option_a: list[tuple[bytes, bytes, bytes, bytes | None]] = [] option_b: list[tuple[bytes, bytes, bytes, bytes | None]] = [] From 05506262ae925aa4e8ea02385656663115687850 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Mon, 20 Nov 2023 10:59:31 -0500 Subject: [PATCH 18/20] Fix multidecoder minor version number --- requirements.txt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ac69b97..065e7a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,7 @@ -assemblyline-service-utilities>=4.5,<4.6 beautifulsoup4 lxml regex +# assemblyline-service-utilities also depends on multidecoder and pins the version number. +# Make sure the version ranges are compatible when upgrading. +assemblyline-service-utilities>=4.5,<4.6 +multidecoder>=1.1,<2.0 From cf51f5039031a91a602dac8c4ed89aefd3b92741 Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Mon, 20 Nov 2023 13:07:37 -0500 Subject: [PATCH 19/20] Move atob to multidecoder and remove base64 method --- deobs.py | 66 -------------------------------------------------------- 1 file changed, 66 deletions(-) diff --git a/deobs.py b/deobs.py index f9bab9b..9484c4c 100644 --- a/deobs.py +++ b/deobs.py @@ -47,7 +47,6 @@ def filter_iocs( class DeobfuScripter(ServiceBase): """Service for deobfuscating scripts.""" - FILETYPES = ["application", "document", "exec", "image", "Microsoft", "text"] VALIDCHARS = b" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" BINCHARS = bytes(list(set(range(256)) - set(VALIDCHARS))) @@ -124,70 +123,6 @@ def hex_constant(text: bytes) -> bytes | None: output = regex.sub(rb"(?i)\b0x([a-f0-9]{1,16})\b", lambda m: str(int(m.group(1), 16)).encode("utf-8"), text) return output if output != text else None - def b64decode_str(self, text: bytes) -> Optional[bytes]: - """Decode base64""" - output = text - - head: bytes - bmatch: bytes - tail: bytes - for head, bmatch, tail in regex.findall(rb"((?:atob\()+)\'([A-Za-z0-9+/]+={0,2})\'(\)+)", text): - iters = min(len(head) // 5, len(tail)) - d = bmatch - for _ in range(iters): - try: - d = binascii.a2b_base64(d) - except binascii.Error: - break - output = output.replace(b"atob(" * iters + b"'" + bmatch + b"'" + b")" * iters, b"'" + d + b"'") - - b64str: list[bytes] = regex.findall(b"((?:[A-Za-z0-9+/]{3,}={0,2}(?:&#[x1][A0];)?[\r]?[\n]?){6,})", text) - for bmatch in b64str: - if bmatch not in output: - continue # was already processed by atob - s = ( - bmatch.replace(b"\n", b"") - .replace(b"\r", b"") - .replace(b" ", b"") - .replace(b" ", b"") - .replace(b" ", b"") - ) - uniq_char = set(s) - if len(uniq_char) <= 6 or len(s) < 16 or len(s) % 4: - continue - try: - d = binascii.a2b_base64(s) - except binascii.Error: - continue - sha256hash = hashlib.sha256(d).hexdigest() - if sha256hash not in self.hashes: - if len(d) > 500: - m = magic.Magic(mime=True) - mag = magic.Magic() - ftype = m.from_buffer(d) - mag_ftype = mag.from_buffer(d) - for file_type in self.FILETYPES: - if (file_type in ftype and "octet-stream" not in ftype) or file_type in mag_ftype: - b64_file_name = f"{sha256hash[0:10]}_b64_decoded" - b64_file_path = os.path.join(self.working_directory, b64_file_name) - with open(b64_file_path, "wb") as b64_file: - b64_file.write(d) - self.files_extracted.add(b64_file_path) - self.hashes.add(sha256hash) - break - - if len(set(d)) > 6 and all(8 < c < 127 for c in d) and len(regex.sub(rb"\s", b"", d)) > 14: - output = output.replace(bmatch, d) - else: - # Test for ASCII seperated by \x00 - p = d.replace(b"\x00", b"") - if len(set(p)) > 6 and all(8 < c < 127 for c in p) and len(regex.sub(rb"\s", b"", p)) > 14: - output = output.replace(bmatch, p) - - if output == text: - return None - return output - @staticmethod def vars_of_fake_arrays(text: bytes) -> bytes | None: """Parse variables of fake arrays.""" @@ -459,7 +394,6 @@ def execute(self, request: ServiceRequest) -> None: ("Array of strings", self.array_of_strings), ("Fake array vars", self.vars_of_fake_arrays), ("Simple XOR function", self.simple_xor_function), - ("B64 Decode", self.b64decode_str), ] second_pass: TechniqueList = [ ("MSWord macro vars", self.mswordmacro_vars), From 1463bbeff5bfeb502a0ce6d207ec645f9de20b0c Mon Sep 17 00:00:00 2001 From: cccs-jh <63320703+cccs-jh@users.noreply.github.com> Date: Mon, 20 Nov 2023 13:08:50 -0500 Subject: [PATCH 20/20] update version number for atob --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 065e7a2..dfb6339 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ regex # assemblyline-service-utilities also depends on multidecoder and pins the version number. # Make sure the version ranges are compatible when upgrading. assemblyline-service-utilities>=4.5,<4.6 -multidecoder>=1.1,<2.0 +multidecoder>=1.2,<2.0