Skip to content

Commit

Permalink
Using Typed ResultSections
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-kevin committed Mar 11, 2022
1 parent f164f6f commit 31f8984
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions jsjaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from assemblyline_v4_service.common.base import ServiceBase
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology
from assemblyline_v4_service.common.request import ServiceRequest
from assemblyline_v4_service.common.result import Result, ResultSection, BODY_FORMAT
from assemblyline_v4_service.common.result import Result, ResultTextSection, ResultTableSection, TableRow, ResultSection

import signatures
from signatures.abstracts import Signature
Expand Down Expand Up @@ -415,14 +415,15 @@ def _extract_urls(self, result: Result) -> None:
if not path.exists(self.malware_jail_urls_json_path) and not path.exists(self.boxjs_iocs):
return

urls_result_section = ResultSection("URLs")
urls_result_section = ResultTableSection("URLs")

urls_json = []
urls_rows: List[TableRow] = []
if path.exists(self.malware_jail_urls_json_path):
with open(self.malware_jail_urls_json_path, "r") as f:
file_contents = f.read()
urls_json = loads(file_contents)
for url in urls_json:
[urls_rows.append(TableRow(**item)) for item in urls_json]
for url in urls_rows:
self._tag_uri(url["url"], urls_result_section)

if path.exists(self.boxjs_iocs):
Expand All @@ -432,16 +433,16 @@ def _extract_urls(self, result: Result) -> None:
for ioc in ioc_json:
value = ioc["value"]
if ioc["type"] == "UrlFetch":
if any(value["url"] == url["url"] for url in urls_json):
if any(value["url"] == url["url"] for url in urls_rows):
continue
urls_json.append(
{"url": value["url"],
"method": value["method"],
"request_headers": value["headers"]})
urls_rows.append(TableRow(**
{"url": value["url"],
"method": value["method"],
"request_headers": value["headers"]}))
self._tag_uri(value["url"], urls_result_section)

if urls_json:
urls_result_section.set_body(dumps(urls_json), BODY_FORMAT.TABLE)
if urls_rows:
[urls_result_section.add_row(urls_row) for urls_row in urls_rows]
urls_result_section.set_heuristic(1)
result.add_section(urls_result_section)

Expand Down Expand Up @@ -589,8 +590,8 @@ def _run_signatures(self, output: List[str], result: Result, display_iocs: bool
if len(signatures_that_hit) > 0:
sigs_res_sec = ResultSection("Signatures")
for sig_that_hit in signatures_that_hit:
sig_res_sec = ResultSection(f"Signature: {type(sig_that_hit).__name__}",
body=sig_that_hit.description, parent=sigs_res_sec)
sig_res_sec = ResultTextSection(f"Signature: {type(sig_that_hit).__name__}", parent=sigs_res_sec)
sig_res_sec.add_line(sig_that_hit.description)
sig_res_sec.set_heuristic(sig_that_hit.heuristic_id)
translated_score = TRANSLATED_SCORE[sig_that_hit.severity]
sig_res_sec.heuristic.add_signature_id(sig_that_hit.name, score=translated_score)
Expand Down Expand Up @@ -650,18 +651,19 @@ def _extract_boxjs_iocs(self, result: Result) -> None:
elif type == "FileRead" and "file" in value:
file_reads.add(value["file"])
if commands:
cmd_result_section = ResultSection("The script ran the following commands", parent=ioc_result_section)
cmd_result_section = ResultTextSection(
"The script ran the following commands", parent=ioc_result_section)
cmd_result_section.add_lines(list(commands))
[cmd_result_section.add_tag("dynamic.process.command_line", command) for command in list(commands)]
self._extract_iocs_from_text_blob(cmd_result_section.body, cmd_result_section, ".js")
if file_writes:
file_writes_result_section = ResultSection(
file_writes_result_section = ResultTextSection(
"The script wrote the following files", parent=ioc_result_section)
file_writes_result_section.add_lines(list(file_writes))
[file_writes_result_section.add_tag("dynamic.process.file_name", file_write)
for file_write in list(file_writes)]
if file_reads:
file_reads_result_section = ResultSection(
file_reads_result_section = ResultTextSection(
"The script read the following files", parent=ioc_result_section)
file_reads_result_section.add_lines(list(file_reads))
[file_reads_result_section.add_tag("dynamic.process.file_name", file_read)
Expand All @@ -671,7 +673,7 @@ def _extract_boxjs_iocs(self, result: Result) -> None:
ioc_result_section.set_heuristic(2)
result.add_section(ioc_result_section)

def _tag_uri(self, url: str, urls_result_section: ResultSection) -> None:
def _tag_uri(self, url: str, urls_result_section: ResultTableSection) -> None:
"""
This method tags components of a URI
:param url: The url to be analyzed
Expand Down Expand Up @@ -712,7 +714,7 @@ def _flag_jsxray_iocs(output: Dict[str, Any], result: Result) -> None:
:param result: A Result object containing the service results
:return: None
"""
jsxray_iocs_result_section = ResultSection("JS-X-Ray IOCs Detected")
jsxray_iocs_result_section = ResultTextSection("JS-X-Ray IOCs Detected")
warnings: List[Dict[str, Any]] = output.get("warnings", [])
for warning in warnings:
kind = warning["kind"]
Expand Down Expand Up @@ -769,8 +771,8 @@ def _extract_filtered_jquery(self, result: Result, file_contents: str):
same.discard('\n')

if len(same) > 0:
embedded_jquery_res_sec = ResultSection("Embedded code was found in jQuery library",
body=f"View extracted file {self.filtered_jquery} for details.")
embedded_jquery_res_sec = ResultTextSection("Embedded code was found in jQuery library")
embedded_jquery_res_sec.add_line(f"View extracted file {self.filtered_jquery} for details.")
embedded_jquery_res_sec.set_heuristic(4)
result.add_section(embedded_jquery_res_sec)
with open(self.filtered_jquery_path, "w") as f:
Expand Down

0 comments on commit 31f8984

Please sign in to comment.