Skip to content

Commit

Permalink
Merge pull request #752 from CybercentreCanada/signatures
Browse files Browse the repository at this point in the history
Signatures
  • Loading branch information
cccs-jh authored Nov 28, 2024
2 parents ae4ae2f + 008b217 commit 5592310
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 240 deletions.
135 changes: 61 additions & 74 deletions signatures/abstracts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from re import findall
import re
from typing import Any, Dict, List, Optional, Union

from assemblyline.common.str_utils import safe_str
Expand All @@ -15,14 +15,14 @@ class Signature:

def __init__(
self,
heuristic_id: int = None,
name: str = None,
description: str = None,
ttp: List[str] = None,
families: List[str] = None,
indicators: List[str] = None,
severity: int = None,
safelist: List[str] = None,
heuristic_id: int | None = None,
name: str | None = None,
description: str | None = None,
ttp: List[str] | None = None,
families: List[str] | None = None,
indicators: List[str] | None = None,
severity: int = 0,
safelist: List[str] | None = None,
):
"""
This method instantiates the base Signature class and performs some validtion checks
Expand Down Expand Up @@ -51,11 +51,11 @@ def __init__(
if severity is None:
self.severity: int = 0
elif severity < 0:
self.severity: int = 0
self.severity = 0
elif severity > 3:
self.severity: int = 3
self.severity = 3
else:
self.severity: int = severity
self.severity = severity

self.safelist: List[str] = [] if safelist is None else safelist

Expand All @@ -69,28 +69,20 @@ def check_indicators_in_list(self, output: List[str], match_all: bool = False) -
:param match_all: All indicators must be found in a single line for a mark to be added
"""
for string in output:
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
split_line = string.split("] ")
if len(split_line) == 2:
string = split_line[1]
elif len(split_line) > 2:
string = "] ".join(split_line[1:])

# If we want to match all indicators in a line and nothing from the safelist is in that line, mark it!
if (
match_all
and all(indicator.lower() in string.lower() for indicator in self.indicators)
and not any(item.lower() in string.lower() for item in self.safelist)
):
string = self.remove_timestamp(string)
lower = string.lower()
any_all = all if match_all else any
# If we match indicators in a line and nothing from the safelist is in that line, mark it!
if any_all(indicator.lower() in lower for indicator in self.indicators) and not self.safelisted(lower):
self.add_mark(string)

# If we only want to match at least one indicator in a line, then mark it!
if not match_all:
for indicator in self.indicators:
if indicator.lower() in string.lower() and not any(
item.lower() in string.lower() for item in self.safelist
):
self.add_mark(string)
def safelisted(self, string: str) -> bool:
"""
This method checks if the string contains any safelisted terms
:param string: The string to check
"""
string = string.lower()
return any(item.lower() in string for item in self.safelist)

@staticmethod
def check_regex(regex: str, string: str) -> List[str]:
Expand All @@ -99,11 +91,7 @@ def check_regex(regex: str, string: str) -> List[str]:
:param regex: A regular expression to be applied to the string
:param string: A line of output
"""
result = findall(regex, string)
if len(result) > 0:
return result
else:
return []
return re.findall(regex, string)

def process_output(self, output: List[str]):
"""
Expand All @@ -117,13 +105,25 @@ def add_mark(self, mark: Any) -> bool:
:param mark: The mark to be added
:return: A boolean indicating if the mark was added
"""
if mark:
if safe_str(mark).strip() not in self.marks:
# Sometimes lines end with trailing semi-colons and sometimes they do not. These are not unique marks
if safe_str(mark).strip() + ";" not in self.marks:
self.marks.append(safe_str(mark).strip())
else:
if not mark:
return False
mark = safe_str(mark).strip()
if mark not in self.marks and mark + ";" not in self.marks:
# Sometimes lines end with trailing semi-colons and sometimes they do not. These are not unique marks
self.marks.append(mark)
return True
return False

@staticmethod
def remove_timestamp(line: str) -> str:
"""
This method removes the timestamp from the start of an output line
:param line: The line to strip the timestamp from
"""
if not line.startswith("["):
return line
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
return line.split("] ", 1)[-1]

def check_multiple_indicators_in_list(self, output: List[str], indicators: List[Dict[str, List[str]]]) -> None:
"""
Expand All @@ -144,35 +144,22 @@ def check_multiple_indicators_in_list(self, output: List[str], indicators: List[

for string in output:
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
split_line = string.split("] ")
if len(split_line) == 2:
string = split_line[1]

# If all_indicators
are_indicators_matched = True
for all_indicator in all_indicators:
if are_indicators_matched and all(
indicator.lower() in string.lower() for indicator in all_indicator["indicators"]
):
for any_indicator in any_indicators:
if are_indicators_matched and any(
indicator.lower() in string.lower() for indicator in any_indicator["indicators"]
):
pass
else:
are_indicators_matched = False
else:
are_indicators_matched = False

# If no all_indicators
if not all_indicators:
for any_indicator in any_indicators:
if are_indicators_matched and any(
indicator.lower() in string.lower() for indicator in any_indicator["indicators"]
):
pass
else:
are_indicators_matched = False

if are_indicators_matched and not any(item.lower() in string.lower() for item in self.safelist):
string = self.remove_timestamp(string)
lower = string.lower()

# We want all of the indicators to match
if (
all(
# With all_indicators matching all of their indicators
all(indicator.lower() in lower for indicator in all_indicator["indicators"])
for all_indicator in all_indicators
)
and all(
# And any_indicators matching any of their indicators
any(indicator.lower() in lower for indicator in any_indicator["indicators"])
for any_indicator in any_indicators
)
# But nothing from the safelist
and not self.safelisted(lower)
):
self.add_mark(string)
29 changes: 15 additions & 14 deletions signatures/save_to_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
These are all of the signatures related to saving a file
"""

import re

from assemblyline.common.str_utils import safe_str

from signatures.abstracts import ANY, Signature

# List of commands used to save a file to disk
Expand Down Expand Up @@ -34,11 +37,13 @@ def __init__(self):
)

def process_output(self, output):
indicator_list = [
{"method": ANY, "indicators": save_commands},
{"method": ANY, "indicators": self.indicators},
]
self.check_multiple_indicators_in_list(output, indicator_list)
for line in output:
line = self.remove_timestamp(line)
lower = line.lower()
if re.search(r"[.](exe|dll)\b", lower) and any(
save_command.lower() in lower for save_command in save_commands
):
self.add_mark(line)


class WritesArchive(Signature):
Expand All @@ -48,7 +53,7 @@ def __init__(self):
name="writes_archive",
description="JavaScript writes archive file to disk",
# File extensions based on https://github.com/CybercentreCanada/assemblyline-service-cape/blob/2412416fd8040897d25d00bdaba6356d514398f4/cape/cape_main.py#L1343
indicators=["\\.zip", "\\.iso", "\\.rar", "\\.vhd", "\\.udf", "\\.7z"],
indicators=["zip", "iso", "rar", "vhd", "udf", "7z"],
severity=3,
)

Expand All @@ -57,21 +62,17 @@ def process_output(self, output):
results = []

# First look for file extensions
extension_regex = f"(?i)({'|'.join(self.indicators)})\\b"
extension_regex = f"(?i)\\w[.]({'|'.join(self.indicators)})\\b"
for line in output:
split_line = line.split("] ")
if len(split_line) == 2:
string = split_line[1]
else:
string = line
if self.check_regex(extension_regex, string.lower()):
string = self.remove_timestamp(line)
if re.search(extension_regex, string.lower()):
extension_results.append(string)

# Next look for the command
escaped_save_commands = [save_command.replace("(", "\\(") for save_command in save_commands]
commands_regex = f"({'|'.join(escaped_save_commands)})"
for line in extension_results:
if self.check_regex(commands_regex, line):
if re.search(commands_regex, line):
results.append(line)

results_set = sorted(set(results))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
},
{
"auto_collapse": false,
"body": "The prefix '_0x' in names of variables and functions suggests that obfuscated code exists\n\t\tvar _0x4b = [\"\\\\ProgramData\\\\\", \"Scripting.FileSystemObject\", \"WinHttp.WinHttpRequest.5.1\", \"WScript...\n\t\tvar _a = new ActiveXObject(_0x4b[1])\n\t\tvar _b = new ActiveXObject(_0x4b[2])\n\t\tvar _c = new ActiveXObject(_0x4b[3])\n\t\tvar _d = _0x4b[18]\n\t\tvar _e = _0x4b[19]\n\t\tvar _f = _0x4b[20]\n\t\t_b[_0x4b[5]](_0x4b[4], _e, false)\n\t\t_b[_0x4b[6]]()\n\t\tvar _g = new ActiveXObject(_0x4b[9])\n\t\t[6 Mark(s) Truncated]",
"body": "The prefix '_0x' in names of variables and functions suggests that obfuscated code exists\n\t\tvar _0x4b = [\"\\\\ProgramData\\\\\", \"Scripting.FileSystemObject\", \"WinHttp.WinHttpRequest.5.1\", \"WScript...\n\t\tvar _a = new ActiveXObject(_0x4b[1])\n\t\tvar _b = new ActiveXObject(_0x4b[2])\n\t\tvar _c = new ActiveXObject(_0x4b[3])\n\t\tvar _d = _0x4b[18]\n\t\tvar _e = _0x4b[19]\n\t\tvar _f = _0x4b[20]\n\t\t_b[_0x4b[5]](_0x4b[4], _e, false)\n\t\t_b[_0x4b[6]]()\n\t\tif (_b[_0x4b[7]] == 200) {\n\t\t[9 Mark(s) Truncated]",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"extra": {
"drop_file": false,
"score": 2021,
"score": 2031,
"sections": [
{
"auto_collapse": false,
Expand Down Expand Up @@ -136,7 +136,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript uses a MemoryStream object to manipulate memory\n\t\t\"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System.IO.MemorySt...\n\t\tTypeError: \"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System....\n\t\tfunction szxwJG($PQNgWB){$JUxiC = [System.IO.MemoryStream]::new()",
"body": "JavaScript uses a MemoryStream object to manipulate memory\n\t\t\"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System.IO.MemorySt...\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...\n\t\tTypeError: \"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System....\n\t\tfunction szxwJG($PQNgWB){$JUxiC = [System.IO.MemoryStream]::new()",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand All @@ -160,7 +160,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript runs PowerShell via powershell.exe\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body": "JavaScript runs PowerShell via powershell.exe\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand All @@ -182,6 +182,30 @@
"title_text": "Signature: RunsPowerShell",
"zeroize_on_tag_safe": false
},
{
"auto_collapse": false,
"body": "JavaScript runs PowerShell to call out to a URI\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
"depth": 1,
"heuristic": {
"attack_ids": [],
"frequency": 1,
"heur_id": 3,
"score": 10,
"score_map": {
"runs_ps1_to_download": 10
},
"signatures": {
"runs_ps1_to_download": 1
}
},
"promote_to": null,
"tags": {},
"title_text": "Signature: PowerShellDownloader",
"zeroize_on_tag_safe": false
},
{
"auto_collapse": false,
"body": [
Expand Down Expand Up @@ -356,6 +380,13 @@
"runs_ps1"
]
},
{
"attack_ids": [],
"heur_id": 3,
"signatures": [
"runs_ps1_to_download"
]
},
{
"attack_ids": [],
"heur_id": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript uses charCodeAt/fromCharCode to obfuscate/de-obfuscate characters\n\t\t= jUVqAOisGJal[zWCwJfTxMEaZ].charCodeAt(0)",
"body": "JavaScript uses charCodeAt/fromCharCode to obfuscate/de-obfuscate characters\n\t\tGdxgkbZtlbkX[i] = jUVqAOisGJal[zWCwJfTxMEaZ].charCodeAt(0)",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand Down
Loading

0 comments on commit 5592310

Please sign in to comment.