diff --git a/parser.json b/parser.json
index d4ac20d..20b7bbe 100644
--- a/parser.json
+++ b/parser.json
@@ -42,10 +42,6 @@
{
"module": "python_magic",
"input_file": "wheels/shared/python_magic-0.4.18-py2.py3-none-any.whl"
- },
- {
- "module": "simplejson",
- "input_file": "wheels/py36/simplejson-3.17.2-cp36-cp36m-manylinux2010_x86_64.manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl"
}
]
},
@@ -636,6 +632,10 @@
"module": "defusedxml",
"input_file": "wheels/shared/defusedxml-0.7.1-py2.py3-none-any.whl"
},
+ {
+ "module": "lxml",
+ "input_file": "wheels/py39/lxml-5.3.0-cp39-cp39-manylinux_2_28_x86_64.whl"
+ },
{
"module": "pdfminer.six",
"input_file": "wheels/py3/pdfminer.six-20211012-py3-none-any.whl"
@@ -648,13 +648,17 @@
"module": "pycparser",
"input_file": "wheels/py3/pycparser-2.22-py3-none-any.whl"
},
+ {
+ "module": "python_docx",
+ "input_file": "wheels/py3/python_docx-1.1.2-py3-none-any.whl"
+ },
{
"module": "python_magic",
"input_file": "wheels/shared/python_magic-0.4.18-py2.py3-none-any.whl"
},
{
- "module": "simplejson",
- "input_file": "wheels/py39/simplejson-3.17.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl"
+ "module": "typing_extensions",
+ "input_file": "wheels/py3/typing_extensions-4.12.2-py3-none-any.whl"
}
]
}
diff --git a/parser_connector.py b/parser_connector.py
index 042934a..1c85de8 100644
--- a/parser_connector.py
+++ b/parser_connector.py
@@ -15,60 +15,106 @@
#
#
import calendar
+import dataclasses
import email
import json
import sys
import threading
import time
+from typing import Any, NamedTuple, Optional, cast
import phantom.app as phantom
import phantom.rules as ph_rules
-from bs4 import UnicodeDammit
+from bs4.dammit import UnicodeDammit
from phantom.action_result import ActionResult
from phantom.base_connector import BaseConnector
+import parser_const as consts
import parser_email
import parser_methods
-from parser_const import *
-class RetVal(tuple):
- def __new__(cls, val1, val2):
- return tuple.__new__(RetVal, (val1, val2))
+@dataclasses.dataclass()
+class ParseFileParams:
+ remap_cef_fields: str = ""
+ is_structured: bool = False
+ run_automation: bool = True
+ parse_domains: bool = True
+ keep_raw: bool = False
+ severity: str = "medium"
+ artifact_tags: str = ""
+ artifact_tags_list: list[str] = dataclasses.field(init=False)
+ custom_remap_json: str = "{}"
+ custom_mapping: dict[str, Any] = dataclasses.field(init=False)
+ custom_mapping_error: Optional[Exception] = None
+ text: str = ""
+
+ vault_id: Optional[str] = None
+ file_type: Optional[str] = None
+ label: Optional[str] = None
+ max_artifacts: Optional[int] = None
+ container_id: Optional[int] = None
+
+ def __post_init__(self) -> None:
+ self.severity = self.severity.lower()
+ self.remap_cef_fields = self.remap_cef_fields.lower()
+ self.artifact_tags_list = [tag for tag in (_tag.strip().replace(" ", "") for _tag in self.artifact_tags.split(",")) if tag]
+
+ if self.custom_remap_json:
+ try:
+ self.custom_mapping = json.loads(self.custom_remap_json)
+ except Exception as e:
+ self.custom_mapping_error = e
+ @classmethod
+ def from_dict(cls, d: dict[str, Any]) -> "ParseFileParams":
+ fields = {field.name for field in dataclasses.fields(cls) if field.init}
+ return cls(**{k: v for k, v in d.items() if k in fields})
-class RetVal2(RetVal):
- pass
+class SaveContainerResult(NamedTuple):
+ success: bool
+ container_id: Optional[int]
-class RetVal3(tuple):
- def __new__(cls, val1, val2, val3):
- return tuple.__new__(RetVal3, (val1, val2, val3))
+class FileInfoResult(NamedTuple):
+ success: bool
+ file_info: Optional[parser_methods.FileInfo]
-class ParserConnector(BaseConnector):
- def __init__(self):
+class HeaderResult(NamedTuple):
+ success: bool
+ headers: Optional[dict[str, str]]
+
+
+class EmailVaultData(NamedTuple):
+ success: bool
+ email_data: Optional[str]
+ email_id: Optional[str]
+
+
+class ParserConnector(BaseConnector):
+ def __init__(self) -> None:
super(ParserConnector, self).__init__()
self._lock = None
self._done = False
- def initialize(self):
+ def initialize(self) -> bool:
self._lock = threading.Lock()
self._done = False
return phantom.APP_SUCCESS
- def _dump_error_log(self, error, message="Exception occurred."):
+ def _dump_error_log(self, error: Exception, message: str = "Exception occurred.") -> None:
self.error_print(message, dump_object=error)
- def _get_error_message_from_exception(self, e):
+ def _get_error_message_from_exception(self, e: Exception) -> str:
"""This function is used to get appropriate error message from the exception.
:param e: Exception object
:return: error message
"""
error_code = None
- error_msg = ERROR_MSG_UNAVAILABLE
+ error_msg = consts.ERROR_MSG_UNAVAILABLE
try:
if hasattr(e, "args"):
@@ -87,36 +133,36 @@ def _get_error_message_from_exception(self, e):
return error_text
- def finalize(self):
+ def finalize(self) -> bool:
return phantom.APP_SUCCESS
- def _get_mail_header_dict(self, email_data, action_result):
+ def _get_mail_header_dict(self, email_data: str, action_result: ActionResult) -> HeaderResult:
try:
mail = email.message_from_string(email_data)
except Exception as e:
self._dump_error_log(e)
- return RetVal2(
- action_result.set_status(phantom.APP_ERROR, "Unable to create email object from data. Does not seem to be valid email"), None
+ return HeaderResult(
+ action_result.set_status(
+ phantom.APP_ERROR,
+ "Unable to create email object from data. Does not seem to be valid email",
+ ),
+ None,
)
headers = mail.__dict__.get("_headers")
if not headers:
- return RetVal2(
+ return HeaderResult(
action_result.set_status(
- phantom.APP_ERROR, "Could not extract header info from email object data. Does not seem to be valid email"
+ phantom.APP_ERROR,
+ "Could not extract header info from email object data. Does not seem to be valid email",
),
None,
)
- ret_val = {}
- for header in headers:
- ret_val[header[0]] = header[1]
-
- return RetVal2(phantom.APP_SUCCESS, ret_val)
-
- def _get_email_data_from_vault(self, vault_id, action_result):
+ return HeaderResult(phantom.APP_SUCCESS, dict(headers))
+ def _get_email_data_from_vault(self, vault_id: str, action_result: ActionResult) -> EmailVaultData:
email_data = None
email_id = vault_id
@@ -124,15 +170,27 @@ def _get_email_data_from_vault(self, vault_id, action_result):
_, _, vault_meta_info = ph_rules.vault_info(container_id=self.get_container_id(), vault_id=vault_id)
if not vault_meta_info:
self.debug_print("Error while fetching meta information for vault ID: {}".format(vault_id))
- return RetVal3(action_result.set_status(phantom.APP_ERROR, PARSER_ERR_FILE_NOT_IN_VAULT), None, None)
+ return EmailVaultData(
+ action_result.set_status(phantom.APP_ERROR, consts.PARSER_ERR_FILE_NOT_IN_VAULT),
+ None,
+ None,
+ )
vault_meta_info = list(vault_meta_info)
file_path = vault_meta_info[0]["path"]
except Exception as e:
self._dump_error_log(e)
- return RetVal3(action_result.set_status(phantom.APP_ERROR, "Could not get file path for vault item"), None, None)
+ return EmailVaultData(
+ action_result.set_status(phantom.APP_ERROR, "Could not get file path for vault item"),
+ None,
+ None,
+ )
if file_path is None:
- return RetVal3(action_result.set_status(phantom.APP_ERROR, "No file with vault ID found"), None, None)
+ return EmailVaultData(
+ action_result.set_status(phantom.APP_ERROR, "No file with vault ID found"),
+ None,
+ None,
+ )
try:
with open(file_path, "rb") as f:
@@ -140,25 +198,41 @@ def _get_email_data_from_vault(self, vault_id, action_result):
except Exception as e:
self._dump_error_log(e)
error_text = self._get_error_message_from_exception(e)
- return RetVal3(
- action_result.set_status(phantom.APP_ERROR, "Could not read file contents for vault item. {}".format(error_text)), None, None
+ return EmailVaultData(
+ action_result.set_status(
+ phantom.APP_ERROR,
+ "Could not read file contents for vault item. {}".format(error_text),
+ ),
+ None,
+ None,
)
- return RetVal3(phantom.APP_SUCCESS, email_data, email_id)
+ return EmailVaultData(phantom.APP_SUCCESS, email_data, email_id)
- def _get_file_info_from_vault(self, action_result, vault_id, file_type=None):
- file_info = {"id": vault_id}
+ def _get_file_info_from_vault(
+ self,
+ action_result: ActionResult,
+ vault_id: str,
+ file_type: Optional[str] = None,
+ ) -> FileInfoResult:
+ file_info = cast(parser_methods.FileInfo, {"id": vault_id})
# Check for file in vault
try:
_, _, vault_meta = ph_rules.vault_info(container_id=self.get_container_id(), vault_id=vault_id)
if not vault_meta:
self.debug_print("Error while fetching meta information for vault ID: {}".format(vault_id))
- return RetVal(action_result.set_status(phantom.APP_ERROR, PARSER_ERR_FILE_NOT_IN_VAULT), None)
+ return FileInfoResult(
+ action_result.set_status(phantom.APP_ERROR, consts.PARSER_ERR_FILE_NOT_IN_VAULT),
+ None,
+ )
vault_meta = list(vault_meta)
except Exception as e:
self._dump_error_log(e)
- return RetVal(action_result.set_status(phantom.APP_ERROR, PARSER_ERR_FILE_NOT_IN_VAULT), None)
+ return FileInfoResult(
+ action_result.set_status(phantom.APP_ERROR, consts.PARSER_ERR_FILE_NOT_IN_VAULT),
+ None,
+ )
file_meta = None
try:
@@ -193,15 +267,27 @@ def _get_file_info_from_vault(self, action_result, vault_id, file_type=None):
if file_type:
file_info["type"] = file_type
else:
- file_type = file_meta["name"].split(".")[-1]
+ file_type = cast(str, file_meta["name"].split(".")[-1])
file_info["type"] = file_type
- return RetVal(phantom.APP_SUCCESS, file_info)
+ return FileInfoResult(phantom.APP_SUCCESS, file_info)
+
+ def _handle_email(
+ self,
+ action_result: ActionResult,
+ vault_id: str,
+ label: Optional[str],
+ container_id: Optional[int],
+ run_automation: bool = True,
+ parse_domains: bool = True,
+ artifact_tags_list: Optional[list[str]] = None,
+ ) -> bool:
+ if artifact_tags_list is None:
+ artifact_tags_list = []
- def _handle_email(self, action_result, vault_id, label, container_id, run_automation=True, parse_domains=True, artifact_tags_list=[]):
ret_val, email_data, email_id = self._get_email_data_from_vault(vault_id, action_result)
- if phantom.is_fail(ret_val):
+ if phantom.is_fail(ret_val) or email_data is None:
return action_result.get_status()
ret_val, header_dict = self._get_mail_header_dict(email_data, action_result)
@@ -234,7 +320,18 @@ def _handle_email(self, action_result, vault_id, label, container_id, run_automa
return action_result.set_status(phantom.APP_SUCCESS)
- def _save_artifacts(self, action_result, artifacts, container_id, severity, max_artifacts=None, run_automation=True, tags=[]):
+ def _save_artifacts(
+ self,
+ action_result: ActionResult,
+ artifacts: list[dict[str, Any]],
+ container_id: int,
+ severity: str,
+ max_artifacts: Optional[int] = None,
+ run_automation: bool = True,
+ tags: Optional[list[str]] = None,
+ ) -> bool:
+ if tags is None:
+ tags = []
if max_artifacts:
artifacts = artifacts[:max_artifacts]
@@ -254,131 +351,151 @@ def _save_artifacts(self, action_result, artifacts, container_id, severity, max_
return phantom.APP_SUCCESS
def _save_to_container(
- self, action_result, artifacts, file_name, label, severity, max_artifacts=None, run_automation=True, artifact_tags_list=[]
- ):
- container = {"name": "{0} Parse Results".format(file_name), "label": label, "severity": severity}
+ self,
+ action_result: ActionResult,
+ artifacts: list[dict[str, Any]],
+ file_name: str,
+ label: Optional[str],
+ severity: str,
+ max_artifacts: Optional[int] = None,
+ run_automation: bool = True,
+ artifact_tags_list: Optional[list[str]] = None,
+ ) -> SaveContainerResult:
+ if artifact_tags_list is None:
+ artifact_tags_list = []
+
+ container = {
+ "name": "{0} Parse Results".format(file_name),
+ "label": label,
+ "severity": severity,
+ }
status, message, container_id = self.save_container(container)
if phantom.is_fail(status):
- return RetVal(action_result.set_status(phantom.APP_ERROR, message), None)
- return RetVal(
- self._save_artifacts(action_result, artifacts, container_id, severity, max_artifacts, run_automation, artifact_tags_list),
+ return SaveContainerResult(action_result.set_status(phantom.APP_ERROR, message), None)
+ return SaveContainerResult(
+ self._save_artifacts(
+ action_result,
+ artifacts,
+ container_id,
+ severity,
+ max_artifacts,
+ run_automation,
+ artifact_tags_list,
+ ),
container_id,
)
def _save_to_existing_container(
- self, action_result, artifacts, container_id, severity, max_artifacts=None, run_automation=True, artifact_tags_list=[]
- ):
- return self._save_artifacts(action_result, artifacts, container_id, severity, max_artifacts, run_automation, artifact_tags_list)
-
- def get_artifact_tags_list(self, artifact_tags):
- """
- Get list of tags from comma separated tags string
- Args:
- artifact_tags: Comma separated string of tags
-
- Returns:
- list: tags
- """
- tags = artifact_tags.split(",")
- tags = [tag.strip().replace(" ", "") for tag in tags]
- return list(filter(None, tags))
-
- def _handle_parse_file(self, param): # noqa
-
- action_result = self.add_action_result(ActionResult(dict(param)))
+ self,
+ action_result: ActionResult,
+ artifacts: list[dict[str, Any]],
+ container_id: int,
+ severity: str,
+ max_artifacts: Optional[int] = None,
+ run_automation: bool = True,
+ artifact_tags_list: Optional[list[str]] = None,
+ ) -> bool:
+ return self._save_artifacts(
+ action_result,
+ artifacts,
+ container_id,
+ severity,
+ max_artifacts,
+ run_automation,
+ artifact_tags_list,
+ )
- container_id = param.get("container_id")
+ def _validate_parse_file_params(self, param: ParseFileParams) -> None:
try:
- if container_id is not None:
- container_id = int(container_id)
+ if param.container_id is not None:
+ param.container_id = int(param.container_id)
except Exception as e:
self._dump_error_log(e)
- return action_result.set_status(phantom.APP_ERROR, "Please provide a valid integer value in container_id")
-
- label = param.get("label")
- file_info = {}
- if container_id is None and label is None:
- return action_result.set_status(phantom.APP_ERROR, "A label must be specified if no container ID is provided")
- if container_id:
- ret_val, message, _ = self.get_container_info(container_id)
- if phantom.is_fail(ret_val):
- return action_result.set_status(phantom.APP_ERROR, "Unable to find container: {}".format(message))
+ raise ValueError("Please provide a valid integer value in container_id") from None
- vault_id = param.get("vault_id")
- text_val = param.get("text")
- file_type = param.get("file_type")
- is_structured = param.get("is_structured", False)
- run_automation = param.get("run_automation", True)
- parse_domains = param.get("parse_domains", True)
- keep_raw = param.get("keep_raw", False)
- severity = param.get("severity", "medium").lower()
- artifact_tags = param.get("artifact_tags", "")
+ if param.container_id is None and param.label is None:
+ raise ValueError("A label must be specified if no container ID is provided")
- artifact_tags_list = self.get_artifact_tags_list(artifact_tags)
+ if param.container_id:
+ ret_val, message, _ = self.get_container_info(param.container_id)
+ if phantom.is_fail(ret_val):
+ raise ValueError(f"Unable to find container: {message}")
# --- remap cef fields ---
- custom_remap_json = param.get("custom_remap_json", "{}")
- custom_mapping = None
- if custom_remap_json:
- try:
- custom_mapping = json.loads(custom_remap_json)
- except Exception as e:
- self._dump_error_log(e)
- error_text = self._get_error_message_from_exception(e)
- return action_result.set_status(phantom.APP_ERROR, "Error: custom_remap_json parameter is not valid json. {}".format(error_text))
- if not isinstance(custom_mapping, dict):
- return action_result.set_status(phantom.APP_ERROR, "Error: custom_remap_json parameter is not a dictionary")
+ if param.custom_mapping_error is not None:
+ self._dump_error_log(param.custom_mapping_error)
+ error_text = self._get_error_message_from_exception(param.custom_mapping_error)
+ raise ValueError(f"Error: custom_remap_json parameter is not valid json. {error_text}")
+ if not isinstance(param.custom_mapping, dict):
+ raise ValueError("Error: custom_remap_json parameter is not a dictionary")
# ---
- if vault_id and text_val:
- return action_result.set_status(
- phantom.APP_ERROR,
- "Either text can be parsed or "
- "a file from the vault can be parsed but both the 'text' and "
- "'vault_id' parameters cannot be used simultaneously",
+ if param.vault_id and param.text:
+ raise ValueError(
+ "Either text can be parsed or a file from the vault can be parsed but both "
+ "the 'text' and 'vault_id' parameters cannot be used simultaneously"
)
- if text_val and file_type not in ["txt", "csv", "html"]:
- return action_result.set_status(phantom.APP_ERROR, "When using text input, only csv, html, or txt file_type can be used")
- elif not (vault_id or text_val):
- return action_result.set_status(phantom.APP_ERROR, "Either 'text' or 'vault_id' must be submitted, both cannot be blank")
+ if param.text and param.file_type not in ("txt", "csv", "html"):
+ raise ValueError("When using text input, only csv, html, or txt file_type can be used")
+ if not (param.vault_id or param.text):
+ raise ValueError("Either 'text' or 'vault_id' must be submitted, both cannot be blank")
- max_artifacts = param.get("max_artifacts")
- if max_artifacts is not None:
+ if param.max_artifacts is not None:
try:
- max_artifacts = int(max_artifacts)
- if max_artifacts <= 0:
- return action_result.set_status(phantom.APP_ERROR, "Please provide a valid non-zero positive integer value in max_artifacts")
- param["max_artifacts"] = max_artifacts
+ param.max_artifacts = int(param.max_artifacts)
except Exception as e:
self._dump_error_log(e)
- return action_result.set_status(phantom.APP_ERROR, "Please provide a valid non-zero positive integer value in max_artifacts")
+ raise ValueError("Please provide a valid non-zero positive integer value in max_artifacts") from None
+ if param.max_artifacts <= 0:
+ raise ValueError("Please provide a valid non-zero positive integer value in max_artifacts")
- if vault_id:
- if file_type == "email":
- return self._handle_email(action_result, vault_id, label, container_id, run_automation, parse_domains, artifact_tags_list)
+ def _handle_parse_file(self, action_result: ActionResult, param: ParseFileParams) -> bool:
+ try:
+ self._validate_parse_file_params(param)
+ except ValueError as e:
+ return action_result.set_status(phantom.APP_ERROR, str(e))
- ret_val, file_info = self._get_file_info_from_vault(action_result, vault_id, file_type)
- if phantom.is_fail(ret_val):
+ file_info = {}
+ if param.vault_id:
+ if param.file_type == "email":
+ return self._handle_email(
+ action_result,
+ param.vault_id,
+ param.label,
+ param.container_id,
+ param.run_automation,
+ param.parse_domains,
+ param.artifact_tags_list,
+ )
+
+ ret_val, file_info = self._get_file_info_from_vault(action_result, param.vault_id, param.file_type)
+ if phantom.is_fail(ret_val) or file_info is None:
return ret_val
self.debug_print("File Info", file_info)
- if is_structured:
+ if param.is_structured:
ret_val, response = parser_methods.parse_structured_file(action_result, file_info)
else:
- ret_val, response = parser_methods.parse_file(self, action_result, file_info, parse_domains, keep_raw)
+ ret_val, response = parser_methods.parse_file(self, action_result, file_info, param.parse_domains, param.keep_raw)
if phantom.is_fail(ret_val):
return ret_val
else:
- text_val = text_val.replace(",", ", ")
- ret_val, response = parser_methods.parse_text(self, action_result, file_type, text_val, parse_domains)
+ param.text = param.text.replace(",", ", ")
+ ret_val, response = parser_methods.parse_text(self, action_result, param.file_type, param.text, param.parse_domains)
file_info["name"] = "Parser_Container_{0}".format(calendar.timegm(time.gmtime()))
+ if not response:
+ return action_result.set_status(
+ phantom.APP_ERROR,
+ "Unexpected null response; this should not be possible",
+ )
+
artifacts = response["artifacts"]
# --- remap cef fields ---
- def _apply_remap(artifacts, mapping):
+ def _apply_remap(artifacts: list[parser_methods.Artifact], mapping: dict[str, Any]) -> list[parser_methods.Artifact]:
if not isinstance(artifacts, list) or not isinstance(mapping, dict):
return artifacts
if len(artifacts) == 0 or len(mapping) == 0:
@@ -393,35 +510,48 @@ def _apply_remap(artifacts, mapping):
a["cef"] = new_cef
return artifacts
- remap_cef_fields = param.get("remap_cef_fields", "").lower()
- if "do not" in remap_cef_fields:
+ if "do not" in param.remap_cef_fields:
# --- do not perform CEF -> CIM remapping
- artifacts = _apply_remap(artifacts, custom_mapping)
- elif "before" in remap_cef_fields:
+ artifacts = _apply_remap(artifacts, param.custom_mapping)
+ elif "before" in param.remap_cef_fields:
# --- apply CEF -> CIM remapping and then custom remapping
- artifacts = _apply_remap(artifacts, CEF2CIM_MAPPING)
- artifacts = _apply_remap(artifacts, custom_mapping)
- elif "after" in remap_cef_fields:
+ artifacts = _apply_remap(artifacts, consts.CEF2CIM_MAPPING)
+ artifacts = _apply_remap(artifacts, param.custom_mapping)
+ elif "after" in param.remap_cef_fields:
# --- apply custom remapping and then CEF -> CIM remapping
- artifacts = _apply_remap(artifacts, custom_mapping)
- artifacts = _apply_remap(artifacts, CEF2CIM_MAPPING)
+ artifacts = _apply_remap(artifacts, param.custom_mapping)
+ artifacts = _apply_remap(artifacts, consts.CEF2CIM_MAPPING)
# ---
- if not container_id:
+ if not param.container_id:
ret_val, container_id = self._save_to_container(
- action_result, artifacts, file_info["name"], label, severity, max_artifacts, run_automation, artifact_tags_list
+ action_result,
+ cast(list[dict[str, Any]], artifacts),
+ file_info["name"],
+ param.label,
+ param.severity,
+ param.max_artifacts,
+ param.run_automation,
+ param.artifact_tags_list,
)
if phantom.is_fail(ret_val):
return ret_val
else:
+ container_id = param.container_id
ret_val = self._save_to_existing_container(
- action_result, artifacts, container_id, severity, max_artifacts, run_automation, artifact_tags_list
+ action_result,
+ cast(list[dict[str, Any]], artifacts),
+ container_id,
+ param.severity,
+ param.max_artifacts,
+ param.run_automation,
+ param.artifact_tags_list,
)
if phantom.is_fail(ret_val):
return ret_val
- if max_artifacts:
- len_artifacts = len(artifacts[:max_artifacts])
+ if param.max_artifacts:
+ len_artifacts = len(artifacts[: param.max_artifacts])
else:
len_artifacts = len(artifacts)
@@ -432,35 +562,38 @@ def _apply_remap(artifacts, mapping):
return action_result.set_status(phantom.APP_SUCCESS)
- def handle_action(self, param):
-
+ def handle_action(self, param: dict[str, Any]) -> bool:
ret_val = phantom.APP_SUCCESS
action_id = self.get_action_identifier()
self.debug_print("action_id", self.get_action_identifier())
+ action_result = self.add_action_result(ActionResult(dict(param)))
if action_id == "parse_file":
- ret_val = self._handle_parse_file(param)
+ ret_val = self._handle_parse_file(action_result, ParseFileParams.from_dict(param))
return ret_val
if __name__ == "__main__":
-
import argparse
- import pudb
import requests
- pudb.set_trace()
-
argparser = argparse.ArgumentParser()
argparser.add_argument("input_test_json", help="Input Test JSON file")
- argparser.add_argument("-u", "--username", help="username", required=False)
- argparser.add_argument("-p", "--password", help="password", required=False)
- argparser.add_argument("-v", "--verify", action="store_true", help="verify", required=False, default=False)
+ argparser.add_argument("-u", "--username", help="username", default="soar_local_admin")
+ argparser.add_argument("-p", "--password", help="password", default="password")
+ argparser.add_argument(
+ "-v",
+ "--verify",
+ action="store_true",
+ help="verify",
+ required=False,
+ default=False,
+ )
args = argparser.parse_args()
session_id = None
@@ -470,14 +603,28 @@ def handle_action(self, param):
if args.username and args.password:
login_url = BaseConnector._get_phantom_base_url() + "login"
try:
- print("Accessing the Login page")
- r = requests.get(login_url, verify=verify, timeout=DEFAULT_REQUEST_TIMEOUT)
+ print(f"Accessing the Login page: {login_url}")
+ r = requests.get(login_url, verify=verify, timeout=consts.DEFAULT_REQUEST_TIMEOUT)
csrftoken = r.cookies["csrftoken"]
- data = {"username": args.username, "password": args.password, "csrfmiddlewaretoken": csrftoken}
- headers = {"Cookie": "csrftoken={0}".format(csrftoken), "Referer": login_url}
+ data = {
+ "username": args.username,
+ "password": args.password,
+ "csrfmiddlewaretoken": csrftoken,
+ }
+ headers = {
+ "Cookie": "csrftoken={0}".format(csrftoken),
+ "Referer": login_url,
+ }
print("Logging into Platform to get the session id")
- r2 = requests.post(login_url, verify=verify, data=data, headers=headers, timeout=DEFAULT_REQUEST_TIMEOUT)
+ r2 = requests.post(
+ login_url,
+ verify=verify,
+ data=data,
+ headers=headers,
+ timeout=consts.DEFAULT_REQUEST_TIMEOUT,
+ )
+ r2.raise_for_status()
session_id = r2.cookies["sessionid"]
except Exception as e:
diff --git a/parser_email.py b/parser_email.py
index 897de6f..4ef17fa 100644
--- a/parser_email.py
+++ b/parser_email.py
@@ -14,6 +14,7 @@
# and limitations under the License.
import email
import hashlib
+import json
import mimetypes
import operator
import os
@@ -30,7 +31,6 @@
import phantom.app as phantom
import phantom.rules as ph_rules
import phantom.utils as ph_utils
-import simplejson as json
from bs4 import BeautifulSoup, UnicodeDammit
from django.core.validators import URLValidator
from phantom.vault import Vault
@@ -123,7 +123,6 @@
def _get_string(input_str, charset):
-
try:
if input_str:
input_str = UnicodeDammit(input_str).unicode_markup.encode(charset).decode(charset)
@@ -218,7 +217,6 @@ def is_ipv6(input_ip):
def _get_file_contains(file_path):
-
contains = []
ext = os.path.splitext(file_path)[1]
contains.extend(FILE_EXTENSIONS.get(ext, []))
@@ -231,7 +229,6 @@ def _get_file_contains(file_path):
def _debug_print(*args):
-
if _base_connector and hasattr(_base_connector, "debug_print"):
_base_connector.debug_print(*args)
@@ -239,7 +236,6 @@ def _debug_print(*args):
def _error_print(*args):
-
if _base_connector and hasattr(_base_connector, "error_print"):
_base_connector.error_print(*args)
@@ -252,7 +248,6 @@ def _dump_error_log(error, message="Exception occurred."):
def _extract_urls_domains(file_data, urls, domains):
-
if (not _config[PROC_EMAIL_JSON_EXTRACT_DOMAINS]) and (not _config[PROC_EMAIL_JSON_EXTRACT_URLS]):
return
@@ -331,7 +326,6 @@ def _extract_urls_domains(file_data, urls, domains):
def _get_ips(file_data, ips):
-
# First extract what looks like an IP from the file, this is a faster operation
ips_in_mail = re.findall(ip_regexc, file_data)
ip6_in_mail = re.findall(ipv6_regexc, file_data)
@@ -352,7 +346,6 @@ def _get_ips(file_data, ips):
def _handle_body(body, parsed_mail, body_index, email_id):
-
local_file_path = body["file_path"]
charset = body.get("charset")
@@ -402,10 +395,8 @@ def _handle_body(body, parsed_mail, body_index, email_id):
def _add_artifacts(cef_key, input_set, artifact_name, start_index, artifacts):
-
added_artifacts = 0
for entry in input_set:
-
# ignore empty entries
if not entry:
continue
@@ -422,7 +413,6 @@ def _add_artifacts(cef_key, input_set, artifact_name, start_index, artifacts):
def _parse_email_headers_as_inline(file_data, parsed_mail, charset, email_id):
-
# remove the 'Forwarded Message' from the email text and parse it
p = re.compile(r"(?<=\r\n).*Forwarded Message.*\r\n", re.IGNORECASE)
email_text = p.sub("", file_data.strip())
@@ -440,10 +430,8 @@ def _parse_email_headers_as_inline(file_data, parsed_mail, charset, email_id):
def _add_email_header_artifacts(email_header_artifacts, start_index, artifacts):
-
added_artifacts = 0
for artifact in email_header_artifacts:
-
artifact["source_data_identifier"] = start_index + added_artifacts
artifacts.append(artifact)
added_artifacts += 1
@@ -452,7 +440,6 @@ def _add_email_header_artifacts(email_header_artifacts, start_index, artifacts):
def _create_artifacts(parsed_mail):
-
# get all the artifact data in their own list objects
ips = parsed_mail[PROC_EMAIL_JSON_IPS]
hashes = parsed_mail[PROC_EMAIL_JSON_HASHES]
@@ -486,7 +473,6 @@ def _create_artifacts(parsed_mail):
def _decode_uni_string(input_str, def_name):
-
# try to find all the decoded strings, we could have multiple decoded strings
# or a single decoded string between two normal strings separated by \r\n
# YEAH...it could get that messy
@@ -511,7 +497,6 @@ def _decode_uni_string(input_str, def_name):
new_str = ""
new_str_create_count = 0
for i, encoded_string in enumerate(encoded_strings):
-
decoded_string = decoded_strings.get(i)
if not decoded_string:
@@ -550,7 +535,6 @@ def _decode_uni_string(input_str, def_name):
def _get_container_name(parsed_mail, email_id):
-
# Create the default name
def_cont_name = "Email ID: {0}".format(email_id)
@@ -566,7 +550,16 @@ def _get_container_name(parsed_mail, email_id):
return _decode_uni_string(subject, def_cont_name)
-def _handle_if_body(content_disp, content_id, content_type, part, bodies, file_path, parsed_mail, file_name):
+def _handle_if_body(
+ content_disp,
+ content_id,
+ content_type,
+ part,
+ bodies,
+ file_path,
+ parsed_mail,
+ file_name,
+):
process_as_body = False
# if content disposition is None then assume that it is
@@ -597,7 +590,6 @@ def _handle_if_body(content_disp, content_id, content_type, part, bodies, file_p
def _handle_part(part, part_index, tmp_dir, extract_attach, parsed_mail):
-
bodies = parsed_mail[PROC_EMAIL_JSON_BODIES]
# get the file_name
@@ -632,11 +624,24 @@ def _handle_part(part, part_index, tmp_dir, extract_attach, parsed_mail):
try:
file_path = "{0}/{1}_{2}".format(tmp_dir, part_index, file_name.translate(None, "".join(["<", ">", " "])))
except TypeError: # py3
- file_path = "{0}/{1}_{2}".format(tmp_dir, part_index, file_name.translate(file_name.maketrans("", "", "".join(["<", ">", " "]))))
+ file_path = "{0}/{1}_{2}".format(
+ tmp_dir,
+ part_index,
+ file_name.translate(file_name.maketrans("", "", "".join(["<", ">", " "]))),
+ )
_debug_print("file_path: {0}".format(file_path))
# is the part representing the body of the email
- status, process_further = _handle_if_body(content_disp, content_id, content_type, part, bodies, file_path, parsed_mail, file_name)
+ status, process_further = _handle_if_body(
+ content_disp,
+ content_id,
+ content_type,
+ part,
+ bodies,
+ file_path,
+ parsed_mail,
+ file_name,
+ )
if not process_further:
return phantom.APP_SUCCESS
@@ -651,7 +656,6 @@ def _handle_part(part, part_index, tmp_dir, extract_attach, parsed_mail):
def _handle_attachment(part, file_name, file_path, parsed_mail):
-
files = parsed_mail[PROC_EMAIL_JSON_FILES]
if not _config[PROC_EMAIL_JSON_EXTRACT_ATTACHMENTS]:
return phantom.APP_SUCCESS
@@ -665,7 +669,6 @@ def _handle_attachment(part, file_name, file_path, parsed_mail):
attach_meta_info = {"headers": dict(headers)}
for curr_attach in _attachments:
-
if curr_attach.get("should_ignore", False):
continue
@@ -691,7 +694,8 @@ def _handle_attachment(part, file_name, file_path, parsed_mail):
if "File name too long" in error_message:
new_file_name = "ph_long_file_name_temp"
file_path = "{}{}".format(
- remove_child_info(file_path).rstrip(file_name.replace("<", "").replace(">", "").replace(" ", "")), new_file_name
+ remove_child_info(file_path).rstrip(file_name.replace("<", "").replace(">", "").replace(" ", "")),
+ new_file_name,
)
_debug_print("Original filename: {}".format(file_name))
_base_connector.debug_print("Modified filename: {}".format(new_file_name))
@@ -710,7 +714,14 @@ def _handle_attachment(part, file_name, file_path, parsed_mail):
return
file_hash = hashlib.sha1(part_payload).hexdigest() # nosemgrep
- files.append({"file_name": file_name, "file_path": file_path, "file_hash": file_hash, "meta_info": attach_meta_info})
+ files.append(
+ {
+ "file_name": file_name,
+ "file_path": file_path,
+ "file_hash": file_hash,
+ "meta_info": attach_meta_info,
+ }
+ )
def remove_child_info(file_path):
@@ -721,7 +732,6 @@ def remove_child_info(file_path):
def _get_email_headers_from_part(part, charset=None):
-
email_headers = list(part.items())
# TODO: the next 2 ifs can be condensed to use 'or'
@@ -766,7 +776,6 @@ def _get_email_headers_from_part(part, charset=None):
def _parse_email_headers(parsed_mail, part, charset=None, add_email_id=None):
-
global _email_id_contains
email_header_artifacts = parsed_mail[PROC_EMAIL_JSON_EMAIL_HEADERS]
@@ -892,7 +901,6 @@ def _add_body_in_email_headers(parsed_mail, file_path, charset, content_type, fi
def _handle_mail_object(mail, email_id, rfc822_email, tmp_dir, start_time_epoch):
-
parsed_mail = OrderedDict()
# Create a tmp directory for this email, will extract all files here
@@ -953,7 +961,13 @@ def _handle_mail_object(mail, email_id, rfc822_email, tmp_dir, start_time_epoch)
with open(file_path, "wb") as f:
f.write(mail.get_payload(decode=True))
- bodies.append({"file_path": file_path, "charset": mail.get_content_charset(), "content-type": "text/plain"})
+ bodies.append(
+ {
+ "file_path": file_path,
+ "charset": mail.get_content_charset(),
+ "content-type": "text/plain",
+ }
+ )
_add_body_in_email_headers(parsed_mail, file_path, mail.get_content_charset(), "text/plain", file_name)
# get the container name
@@ -1003,7 +1017,6 @@ def _handle_mail_object(mail, email_id, rfc822_email, tmp_dir, start_time_epoch)
def _init():
-
global _base_connector
global _config
global _container
@@ -1020,7 +1033,6 @@ def _init():
def _set_email_id_contains(email_id):
-
global _base_connector
global _email_id_contains
@@ -1055,7 +1067,6 @@ def _del_tmp_dirs():
def _int_process_email(rfc822_email, email_id, start_time_epoch):
-
global _base_connector
global _config
global _tmp_dirs
@@ -1083,13 +1094,19 @@ def _int_process_email(rfc822_email, email_id, start_time_epoch):
_dump_error_log(e)
return phantom.APP_ERROR, message, []
- results = [{"container": _container, "artifacts": _artifacts, "files": _attachments, "temp_directory": tmp_dir}]
+ results = [
+ {
+ "container": _container,
+ "artifacts": _artifacts,
+ "files": _attachments,
+ "temp_directory": tmp_dir,
+ }
+ ]
return ret_val, "Email Parsed", results
def process_email(base_connector, rfc822_email, email_id, config, label, container_id, epoch):
-
try:
_init()
except Exception as e:
@@ -1117,7 +1134,11 @@ def process_email(base_connector, rfc822_email, email_id, config, label, contain
try:
cid, artifacts, successful_artifacts = _parse_results(
- results, label, container_id, _config[PROC_EMAIL_JSON_RUN_AUTOMATION], _config["tags"]
+ results,
+ label,
+ container_id,
+ _config[PROC_EMAIL_JSON_RUN_AUTOMATION],
+ _config["tags"],
)
except Exception:
_del_tmp_dirs()
@@ -1135,7 +1156,6 @@ def process_email(base_connector, rfc822_email, email_id, config, label, contain
def _parse_results(results, label, update_container_id, run_automation=True, tags=[]):
-
global _base_connector
param = _base_connector.get_current_param()
@@ -1150,7 +1170,6 @@ def _parse_results(results, label, update_container_id, run_automation=True, tag
total_artifacts = []
successful_artifacts = []
for result in results:
-
if not update_container_id:
container = result.get("container")
@@ -1197,7 +1216,12 @@ def _parse_results(results, label, update_container_id, run_automation=True, tag
for curr_file in files:
# Generate a new Vault artifact for the file and save it to a container
ret_val, vault_artifact = _handle_file(
- curr_file, vault_ids, container_id, vault_artifacts_count, run_automation=run_automation, tags=tags
+ curr_file,
+ vault_ids,
+ container_id,
+ vault_artifacts_count,
+ run_automation=run_automation,
+ tags=tags,
)
vault_artifacts_count += 1
vault_artifacts.append(vault_artifact)
@@ -1211,7 +1235,6 @@ def _parse_results(results, label, update_container_id, run_automation=True, tag
_base_connector.debug_print(len_artifacts)
for j, artifact in enumerate(artifacts):
-
if not artifact:
continue
@@ -1243,7 +1266,6 @@ def _parse_results(results, label, update_container_id, run_automation=True, tag
def _add_vault_hashes_to_dictionary(cef_artifact, vault_id):
-
_, _, vault_info = ph_rules.vault_info(vault_id=vault_id)
vault_info = list(vault_info)
@@ -1277,7 +1299,6 @@ def _add_vault_hashes_to_dictionary(cef_artifact, vault_id):
def _handle_file(curr_file, vault_ids, container_id, artifact_id, run_automation=False, tags=[]):
-
file_name = curr_file.get("file_name")
local_file_path = curr_file["file_path"]
@@ -1300,7 +1321,10 @@ def _handle_file(curr_file, vault_ids, container_id, artifact_id, run_automation
try:
success, message, vault_id = ph_rules.vault_add(
- file_location=local_file_path, container=container_id, file_name=file_name, metadata=vault_attach_dict
+ file_location=local_file_path,
+ container=container_id,
+ file_name=file_name,
+ metadata=vault_attach_dict,
)
except Exception as e:
error_code, error_message = _get_error_message_from_exception(e)
@@ -1340,7 +1364,6 @@ def _handle_file(curr_file, vault_ids, container_id, artifact_id, run_automation
def _set_sdi(default_id, input_dict):
-
if "source_data_identifier" in input_dict:
del input_dict["source_data_identifier"]
dict_hash = None
diff --git a/parser_methods.py b/parser_methods.py
index 53802a5..46b88a2 100644
--- a/parser_methods.py
+++ b/parser_methods.py
@@ -15,29 +15,25 @@
import csv
import re
import struct
+import threading
+import time
import zipfile
from html import unescape
+from io import StringIO
+from typing import TYPE_CHECKING, Any, Optional, TypedDict, Union, cast
from urllib.parse import urlparse
-import pdfminer
-from bs4 import BeautifulSoup, UnicodeDammit
-from defusedxml import ElementTree
-from defusedxml.common import EntitiesForbidden
-from django.core.validators import URLValidator
-
-try:
- from cStringIO import StringIO
-except Exception:
- from io import StringIO
-
-import threading
-import time
-
+import docx
import phantom.app as phantom
import phantom.utils as ph_utils
+from bs4 import BeautifulSoup
+from bs4.dammit import UnicodeDammit
+from django.core.validators import URLValidator
+from docx.opc.constants import RELATIONSHIP_TYPE as REL_TYPE
+from docx.opc.part import Part as DocxPart
from pdfminer.converter import TextConverter
from pdfminer.layout import LAParams
-from pdfminer.pdfdocument import PDFDocument
+from pdfminer.pdfdocument import PDFDocument, PDFEncryptionError, PDFPasswordIncorrect
from pdfminer.pdfinterp import PDFPageInterpreter, PDFResourceManager
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
@@ -45,6 +41,10 @@
from pdfminer.psparser import PSKeyword, PSLiteral
from pdfminer.utils import isnumber
+if TYPE_CHECKING:
+ from phantom.action_result import ActionResult
+ from phantom.base_connector import BaseConnector
+
_container_common = {"run_automation": False} # Don't run any playbooks, when this artifact is added
@@ -53,38 +53,53 @@
EMAIL_REGEX2 = r'".*"@[A-Z0-9.-]+\.[A-Z]{2,}\b'
HASH_REGEX = r"\b[0-9a-fA-F]{32}\b|\b[0-9a-fA-F]{40}\b|\b[0-9a-fA-F]{64}\b"
IP_REGEX = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
-IPV6_REGEX = r"\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|"
-IPV6_REGEX += r"(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))"
-IPV6_REGEX += r"|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})"
-IPV6_REGEX += r"|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|"
-IPV6_REGEX += r"(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})"
-IPV6_REGEX += r"|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
-IPV6_REGEX += r"(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})"
-IPV6_REGEX += r"|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
-IPV6_REGEX += r"(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})"
-IPV6_REGEX += r"|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
-IPV6_REGEX += r"(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})"
-IPV6_REGEX += r"|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
-IPV6_REGEX += r"(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
-IPV6_REGEX += r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*"
+IPV6_REGEX = (
+ r"\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|"
+ r"(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))"
+ r"|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})"
+ r"|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|"
+ r"(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})"
+ r"|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
+ r"(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})"
+ r"|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
+ r"(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})"
+ r"|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
+ r"(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})"
+ r"|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|"
+ r"(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)"
+ r"(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*"
+)
DOMAIN_REGEX = r"(?!:\/\/)((?:[a-zA-Z0-9-_]+\.)*[a-zA-Z0-9][a-zA-Z0-9-_]+\.[a-zA-Z]{2,11})"
ESCAPE = set(map(ord, '&<>"'))
-def _extract_domain_from_url(url):
+class Artifact(TypedDict):
+ source_data_identifier: int
+ cef: dict[str, Any]
+ name: str
+
+
+class FileInfo(TypedDict):
+ type: str
+ path: str
+ name: str
+ id: Optional[str]
+
+
+def _extract_domain_from_url(url: str) -> Optional[str]:
domain = phantom.get_host_from_url(url)
if domain and not _is_ip(domain):
return domain
return None
-def _is_ip(input_ip):
+def _is_ip(input_ip: str) -> bool:
if ph_utils.is_ip(input_ip):
return True
@@ -94,7 +109,7 @@ def _is_ip(input_ip):
return False
-def _is_url(input_url):
+def _is_url(input_url: str) -> bool:
validate_url = URLValidator(schemes=["http", "https"])
try:
validate_url(input_url)
@@ -103,11 +118,11 @@ def _is_url(input_url):
return False
-def is_ipv6(input_ip):
+def is_ipv6(input_ip: str) -> bool:
return bool(re.match(IPV6_REGEX, input_ip))
-def _refang_url(url):
+def _refang_url(url: str) -> str:
parsed = urlparse(url)
scheme = parsed.scheme
@@ -121,7 +136,7 @@ def _refang_url(url):
return refang_url
-def _clean_url(url):
+def _clean_url(url: str) -> str:
url = url.strip(">),.]\r\n")
# Check before splicing, find returns -1 if not found
@@ -136,7 +151,7 @@ def _clean_url(url):
return url
-def _get_error_message_from_exception(e):
+def _get_error_message_from_exception(e: Exception) -> tuple[Union[str, int], str]:
"""This method is used to get appropriate error message from the exception.
:param e: Exception object
:return: error message
@@ -172,7 +187,12 @@ class TextIOCParser:
"name": "IP Artifact", # Name of artifact
"validator": _is_ip, # Additional function to verify matched string (Should return true or false)
},
- {"cef": "sourceAddress", "pattern": IPV6_REGEX, "name": "IP Artifact", "validator": _is_ip},
+ {
+ "cef": "sourceAddress",
+ "pattern": IPV6_REGEX,
+ "name": "IP Artifact",
+ "validator": _is_ip,
+ },
{
"cef": "requestURL",
"pattern": URI_REGEX,
@@ -184,12 +204,20 @@ class TextIOCParser:
{"cef": "email", "pattern": EMAIL_REGEX, "name": "Email Artifact"},
{"cef": "email", "pattern": EMAIL_REGEX2, "name": "Email Artifact"},
]
- DOMAIN_PATTERN = {"cef": "destinationDnsDomain", "pattern": DOMAIN_REGEX, "name": "Domain Artifact"} # Name of CEF field # Regex to match
+ DOMAIN_PATTERN = {
+ "cef": "destinationDnsDomain",
+ "pattern": DOMAIN_REGEX,
+ "name": "Domain Artifact",
+ } # Name of CEF field # Regex to match
URL_DOMAIN_SUBTYPES_DICT = {
"subtypes": [ # Additional IOCs to find in a matched one
# If you really wanted to, you could also have subtypes in the subtypes
- {"cef": "destinationDnsDomain", "name": "Domain Artifact", "callback": _extract_domain_from_url} # Method to extract substring
+ {
+ "cef": "destinationDnsDomain",
+ "name": "Domain Artifact",
+ "callback": _extract_domain_from_url,
+ } # Method to extract substring
]
}
@@ -206,7 +234,7 @@ class TextIOCParser:
found_values = set()
- def __init__(self, parse_domains, patterns=None):
+ def __init__(self, parse_domains: bool, patterns: Optional[list[dict[str, Any]]] = None):
self.patterns = self.BASE_PATTERNS if patterns is None else patterns
if parse_domains:
@@ -222,22 +250,23 @@ def __init__(self, parse_domains, patterns=None):
self.patterns.append(self.DOMAIN_PATTERN)
self.added_artifacts = 0
- def _create_artifact(self, artifacts, value, cef, name):
- artifact = {}
- artifact["source_data_identifier"] = self.added_artifacts
- artifact["cef"] = {cef: value}
- artifact["name"] = name
+ def _create_artifact(self, artifacts: list[Artifact], value: Any, cef: str, name: str) -> None:
+ artifact = Artifact(
+ source_data_identifier=self.added_artifacts,
+ cef={cef: value},
+ name=name,
+ )
artifacts.append(artifact)
self.added_artifacts += 1
self.found_values.add(value)
- def _parse_ioc_subtype(self, artifacts, value, subtype):
+ def _parse_ioc_subtype(self, artifacts: list[Artifact], value: Any, subtype: dict[str, Any]) -> None:
callback = subtype.get("callback")
if callback:
sub_val = callback(value)
self._pass_over_value(artifacts, sub_val, subtype)
- def _pass_over_value(self, artifacts, value, ioc):
+ def _pass_over_value(self, artifacts: list[Artifact], value: Any, ioc: dict[str, Any]) -> None:
validator = ioc.get("validator")
clean = ioc.get("clean")
subtypes = ioc.get("subtypes", [])
@@ -253,11 +282,10 @@ def _pass_over_value(self, artifacts, value, ioc):
for st in subtypes:
self._parse_ioc_subtype(artifacts, value, st)
- def parse_to_artifacts(self, text):
+ def parse_to_artifacts(self, text: str) -> list[Artifact]:
artifacts = []
for ioc in self.patterns:
- regexp = re.compile(ioc["pattern"], re.IGNORECASE)
- found = regexp.findall(text)
+ found = re.findall(ioc["pattern"], text, flags=re.IGNORECASE)
for match in found:
if isinstance(match, tuple):
for x in match:
@@ -266,24 +294,24 @@ def parse_to_artifacts(self, text):
self._pass_over_value(artifacts, match, ioc)
return artifacts
- def add_artifact(self, text):
- artifact = {}
- artifact["source_data_identifier"] = self.added_artifacts
- artifact["cef"] = {"message": text}
- artifact["name"] = "Raw Text Artifact"
+ def add_artifact(self, text: str) -> Artifact:
+ artifact = Artifact(
+ source_data_identifier=self.added_artifacts,
+ cef={"message": text},
+ name="Raw Text Artifact",
+ )
self.added_artifacts += 1
self.found_values.add(text)
return artifact
-def _grab_raw_text(action_result, txt_file):
+def _grab_raw_text(action_result: "ActionResult", txt_file: str) -> tuple[bool, Optional[str]]:
"""This function will actually really work for any file which is basically raw text.
html, rtf, and the list could go on
"""
try:
- fp = open(txt_file, "rb")
- text = UnicodeDammit(fp.read()).unicode_markup
- fp.close()
+ with open(txt_file, "rb") as fp:
+ text = UnicodeDammit(fp.read()).unicode_markup
return phantom.APP_SUCCESS, text
except Exception as e:
error_code, error_message = _get_error_message_from_exception(e)
@@ -299,7 +327,7 @@ class PDFXrefObjectsToXML:
"""
@classmethod
- def encode(cls, data):
+ def encode(cls, data: bytes) -> str:
"""Encode characters of text"""
buf = StringIO()
for byte in data:
@@ -310,7 +338,7 @@ def encode(cls, data):
return buf.getvalue()
@classmethod
- def dump_xml(cls, text, obj):
+ def dump_xml(cls, text: str, obj: Any) -> str:
"""Convert PDF xref object to XML"""
if obj is None:
text += ""
@@ -364,16 +392,17 @@ def dump_xml(cls, text, obj):
raise TypeError("Unable to extract the object from PDF. Reason: {}".format(obj))
@classmethod
- def dump_trailers(cls, text, doc):
+ def dump_trailers(cls, text: str, doc: PDFDocument) -> str:
"""Iterate trough xrefs and convert trailer of xref to XML"""
for xref in doc.xrefs:
- text += "\n"
- cls.dump_xml(text, xref.trailer)
- text += "\n\n\n"
+ if trailer := getattr(xref, "trailer"):
+ text += "\n"
+ cls.dump_xml(text, trailer)
+ text += "\n\n\n"
return text
@classmethod
- def convert_objects_to_xml_text(cls, text, doc):
+ def convert_objects_to_xml_text(cls, text: str, doc: PDFDocument) -> str:
"""Iterate trough xrefs and convert objects of xref to XML"""
visited = set()
text += ""
@@ -396,7 +425,7 @@ def convert_objects_to_xml_text(cls, text, doc):
return text
@classmethod
- def pdf_xref_objects_to_xml(cls, pdf_file):
+ def pdf_xref_objects_to_xml(cls, pdf_file: str) -> str:
"""Converts PDF cross reference table(xref) objects to XML
The xref is the index by which all of the indirect objects, in the PDF file are located.
https://labs.appligent.com/pdfblog/pdf_cross_reference_table/
@@ -409,32 +438,37 @@ def pdf_xref_objects_to_xml(cls, pdf_file):
return text
-def _pdf_to_text(action_result, pdf_file):
+def _pdf_to_text(action_result: "ActionResult", pdf_file: str) -> tuple[bool, Optional[str]]:
try:
pagenums = set()
output = StringIO()
manager = PDFResourceManager()
converter = TextConverter(manager, output, laparams=LAParams())
interpreter = PDFPageInterpreter(manager, converter)
- # if sys.version_info[0] == 3:
- infile = open(pdf_file, "rb")
- # elif sys.version_info[0] < 3:
- # infile = file(pdf_file, 'rb')
- for page in PDFPage.get_pages(infile, pagenums):
- interpreter.process_page(page)
- infile.close()
+ with open(pdf_file, "rb") as infile:
+ for page in PDFPage.get_pages(infile, pagenums):
+ interpreter.process_page(page)
converter.close()
text = output.getvalue()
output.close()
text += PDFXrefObjectsToXML.pdf_xref_objects_to_xml(pdf_file)
return phantom.APP_SUCCESS, text
- except pdfminer.pdfdocument.PDFPasswordIncorrect:
- return action_result.set_status(phantom.APP_ERROR, "Failed to parse pdf: The provided pdf is password protected"), None
- except pdfminer.pdfdocument.PDFEncryptionError:
+ except PDFPasswordIncorrect:
+ return (
+ action_result.set_status(
+ phantom.APP_ERROR,
+ "Failed to parse pdf: The provided pdf is password protected",
+ ),
+ None,
+ )
+ except PDFEncryptionError:
return action_result.set_status(phantom.APP_ERROR, "Failed to parse pdf: The provided pdf is encrypted"), None
except struct.error:
return (
- action_result.set_status(phantom.APP_ERROR, "Failed to parse pdf: The provided pdf is password protected or is in different format"),
+ action_result.set_status(
+ phantom.APP_ERROR,
+ "Failed to parse pdf: The provided pdf is password protected or is in different format",
+ ),
None,
)
except Exception as e:
@@ -443,56 +477,57 @@ def _pdf_to_text(action_result, pdf_file):
return action_result.set_status(phantom.APP_ERROR, "Failed to parse pdf: {0}".format(error_text)), None
-def _docx_to_text(action_result, docx_file):
- """docx is literally a zip file, and all the words in the document are in one xml document
- doc does not work this way at all
- """
- WORD_NAMESPACE = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
- PARA = WORD_NAMESPACE + "p"
- TEXT = WORD_NAMESPACE + "t"
-
+def _docx_to_text(action_result: "ActionResult", docx_file: str) -> tuple[bool, Optional[str]]:
try:
- zf = zipfile.ZipFile(docx_file)
- fp = zf.open("word/document.xml")
- txt = fp.read()
- fp.close()
- root = ElementTree.fromstring(txt)
- paragraphs = []
- for paragraph in root.iter(PARA):
- texts = [node.text for node in paragraph.iter(TEXT) if node.text]
- if texts:
- paragraphs.append("".join(texts))
-
- return phantom.APP_SUCCESS, "\n\n".join(paragraphs)
+ doc = docx.Document(docx_file)
except zipfile.BadZipfile:
return (
action_result.set_status(
- phantom.APP_ERROR, "Failed to parse docx: The file might be corrupted or password protected or not a docx file"
+ phantom.APP_ERROR,
+ "Failed to parse docx: The file might be corrupted or password protected or not a docx file",
),
None,
)
- except EntitiesForbidden as e:
- error_message = e
- return action_result.set_status(phantom.APP_ERROR, error_message), None
except Exception as e:
error_code, error_message = _get_error_message_from_exception(e)
error_text = "Error Code: {0}. Error Message: {1}".format(error_code, error_message)
return action_result.set_status(phantom.APP_ERROR, "Failed to parse docx: {0}".format(error_text)), None
+ full_text = []
-def _csv_to_text(action_result, csv_file):
+ # First, render the text in the doc into a string
+ for paragraph in doc.paragraphs:
+ para_text = "".join((run.text.strip() for run in paragraph.runs)).strip()
+ # Add the processed paragraph to the full text
+ if para_text:
+ full_text.append(para_text)
+
+ # Next, expand and append relationship targets present in the document, for searching later
+ for rel in doc.part.rels.values():
+ # Simple hyperlink, make sure its target is present in the text
+ if rel.reltype == REL_TYPE.HYPERLINK:
+ full_text.append(rel._target)
+ # This is like an embedded HTML within a docx file, stored as bytes
+ elif rel.reltype == REL_TYPE.A_F_CHUNK:
+ target = cast(DocxPart, rel._target)
+ full_text.extend(target.blob.decode(errors="replace").splitlines())
+
+ return phantom.APP_SUCCESS, "\n".join(full_text)
+
+
+def _csv_to_text(action_result: "ActionResult", csv_file: str) -> tuple[bool, Optional[str]]:
"""This function really only exists due to a misunderstanding on how word boundaries (\b) work
As it turns out, only word characters can invalidate word boundaries. So stuff like commas,
brackets, gt and lt signs, etc. do not
"""
text = ""
try:
- fp = open(csv_file, "rt")
- reader = csv.reader(fp)
- for row in reader:
- text += " ".join(row)
- text += " " # The humanity of always having a trailing space
- fp.close()
+ with open(csv_file, "rt") as fp:
+ reader = csv.reader(fp)
+ for row in reader:
+ text += " ".join(row)
+ text += " " # The humanity of always having a trailing space
+
return phantom.APP_SUCCESS, text
except Exception as e:
error_code, error_message = _get_error_message_from_exception(e)
@@ -500,18 +535,21 @@ def _csv_to_text(action_result, csv_file):
return action_result.set_status(phantom.APP_ERROR, "Failed to parse csv: {0}".format(error_text)), None
-def _html_to_text(action_result, html_file, text_val=None):
+def _html_to_text(
+ action_result: "ActionResult",
+ html_file: Optional[str],
+ text_val: Optional[str] = None,
+) -> tuple[bool, Optional[str]]:
"""Similar to CSV, this is also unnecessary. It will trim /some/ of that fat from a normal HTML, however"""
try:
- if text_val is None:
- fp = open(html_file, "rb")
- html_text = UnicodeDammit(fp.read()).unicode_markup
- fp.close()
+ if text_val is None and html_file is not None:
+ with open(html_file, "rb") as fp:
+ html_text = UnicodeDammit(fp.read()).unicode_markup
else:
html_text = text_val
# To unescape html escaped body
- html_text = unescape(html_text)
+ html_text = unescape(html_text or "")
soup = BeautifulSoup(html_text, "html.parser")
read_text = soup.findAll(text=True)
@@ -525,14 +563,14 @@ def _html_to_text(action_result, html_file, text_val=None):
return action_result.set_status(phantom.APP_ERROR, "Failed to parse html: {0}".format(error_text)), None
-def _join_thread(base_connector, thread):
+def _join_thread(base_connector: "BaseConnector", thread: threading.Thread) -> None:
base_connector._lock.acquire()
base_connector._done = True
base_connector._lock.release()
thread.join()
-def _wait_for_parse(base_connector):
+def _wait_for_parse(base_connector: "BaseConnector") -> None:
i = 0
base_msg = "Parsing PDF document"
while True:
@@ -547,7 +585,13 @@ def _wait_for_parse(base_connector):
return
-def parse_file(base_connector, action_result, file_info, parse_domains=True, keep_raw=False):
+def parse_file(
+ base_connector: "BaseConnector",
+ action_result: "ActionResult",
+ file_info: FileInfo,
+ parse_domains: bool = True,
+ keep_raw: bool = False,
+) -> tuple[bool, Optional[dict[str, list[Artifact]]]]:
"""Parse a non-email file"""
try:
@@ -577,7 +621,8 @@ def parse_file(base_connector, action_result, file_info, parse_domains=True, kee
ret_val, raw_text = _html_to_text(action_result, file_info["path"])
else:
return action_result.set_status(phantom.APP_ERROR, "Unexpected file type"), None
- if phantom.is_fail(ret_val):
+
+ if phantom.is_fail(ret_val) or raw_text is None:
return ret_val, None
base_connector.save_progress("Parsing for IOCs")
@@ -593,28 +638,43 @@ def parse_file(base_connector, action_result, file_info, parse_domains=True, kee
return phantom.APP_SUCCESS, {"artifacts": artifacts}
-def parse_structured_file(action_result, file_info):
-
+def parse_structured_file(action_result: "ActionResult", file_info: FileInfo) -> tuple[bool, Optional[dict[str, list[Artifact]]]]:
if file_info["type"] == "csv":
csv_file = file_info["path"]
artifacts = []
try:
- fp = open(csv_file, "rt")
- reader = csv.DictReader(fp, restkey="other") # need to handle lines terminated in commas
- for row in reader:
- row["source_file"] = file_info["name"]
- artifacts.append({"name": "CSV entry", "cef": {k: v for k, v in list(row.items())}}) # make CSV entry artifact
- fp.close()
+ with open(csv_file, "rt") as fp:
+ reader = csv.DictReader(fp, restkey="other") # need to handle lines terminated in commas
+ for row in reader:
+ row["source_file"] = file_info["name"]
+ artifacts.append(
+ {
+ "name": "CSV entry",
+ "cef": {k: v for k, v in list(row.items())},
+ }
+ ) # make CSV entry artifact
except Exception as e:
error_code, error_message = _get_error_message_from_exception(e)
error_text = "Error Code: {0}. Error Message: {1}".format(error_code, error_message)
- return action_result.set_status(phantom.APP_ERROR, "Failed to parse structured CSV: {0}".format(error_text)), None
+ return (
+ action_result.set_status(
+ phantom.APP_ERROR,
+ "Failed to parse structured CSV: {0}".format(error_text),
+ ),
+ None,
+ )
else:
return action_result.set_status(phantom.APP_ERROR, "Structured extraction only supported for CSV files"), None
return phantom.APP_SUCCESS, {"artifacts": artifacts}
-def parse_text(base_connector, action_result, file_type, text_val, parse_domains=True):
+def parse_text(
+ base_connector: "BaseConnector",
+ action_result: "ActionResult",
+ file_type: Optional[str],
+ text_val: str,
+ parse_domains: bool = True,
+) -> tuple[bool, Optional[dict[str, list[Artifact]]]]:
"""Parse a non-email file"""
try:
@@ -629,7 +689,8 @@ def parse_text(base_connector, action_result, file_type, text_val, parse_domains
ret_val, raw_text = phantom.APP_SUCCESS, text_val
else:
return action_result.set_status(phantom.APP_ERROR, "Unexpected file type"), None
- if phantom.is_fail(ret_val):
+
+ if phantom.is_fail(ret_val) or raw_text is None:
return ret_val, None
base_connector.save_progress("Parsing for IOCs")
diff --git a/requirements.txt b/requirements.txt
index d7acedf..4cd32e4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,5 +2,5 @@ beautifulsoup4==4.9.1
defusedxml==0.7.1
git+https://github.com/phantomcyber/pdfminer.six.git@20211012-fips
pyOpenSSL==24.1.0
+python-docx==1.1.2
python-magic==0.4.18
-simplejson==3.17.2