diff --git a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.py b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.py index ce2f0db0792b..40ff62f199e8 100644 --- a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.py +++ b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.py @@ -1,10 +1,11 @@ """ DomainTools Iris Detect XSOAR Integration """ + from hashlib import sha256 from hmac import new from math import ceil -from typing import Callable, Tuple +from collections.abc import Callable from urllib.parse import urlencode, urlunparse from urllib3 import disable_warnings from CommonServerPython import * # noqa # pylint: disable=unused-wildcard-import @@ -12,7 +13,7 @@ # Disable insecure warnings disable_warnings() # pylint: disable=no-member -''' CONSTANTS ''' +""" CONSTANTS """ INTEGRATION_CONTEXT_NAME = "DomainToolsIrisDetect" DOMAINTOOLS_PARAMS: Dict[str, Any] = { @@ -21,18 +22,33 @@ "app_version": "1", } -DEFAULT_HEADERS: Dict[str, str] = {"accept": "application/json", "Content-Type": "application/json"} +DEFAULT_HEADERS: Dict[str, str] = { + "accept": "application/json", + "Content-Type": "application/json", +} TIMEOUT = 60.0 RETRY = 3 DOMAINTOOLS_API_BASE_URL = "api.domaintools.com" DOMAINTOOLS_API_VERSION = "v1" -DOMAINTOOLS_MANAGE_WATCHLIST_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/" -DOMAINTOOLS_NEW_DOMAINS_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/new/" -DOMAINTOOLS_WATCHED_DOMAINS_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/watched/" -DOMAINTOOLS_IGNORED_DOMAINS_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/ignored/" -DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/monitors/" -DOMAINTOOLS_ESCALATE_DOMAINS_ENDPOINT = f"/{DOMAINTOOLS_API_VERSION}/iris-detect/escalations/" +DOMAINTOOLS_MANAGE_WATCHLIST_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/" +) +DOMAINTOOLS_NEW_DOMAINS_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/new/" +) +DOMAINTOOLS_WATCHED_DOMAINS_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/watched/" +) +DOMAINTOOLS_IGNORED_DOMAINS_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/domains/ignored/" +) +DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/monitors/" +) +DOMAINTOOLS_ESCALATE_DOMAINS_ENDPOINT = ( + f"/{DOMAINTOOLS_API_VERSION}/iris-detect/escalations/" +) DOMAINTOOLS_ESCALATE_DOMAINS_HEADER = "Escalated Domains" DOMAINTOOLS_WATCHED_DOMAINS_HEADER = "Watched Domains" @@ -71,7 +87,7 @@ DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # ISO8601 format with UTC, default in XSOAR NO_DOMAINS_FOUND = "No Domains Found." LIMIT_ERROR_MSG = "Invalid Input Error: limit should be greater than zero." -DEFAULT_DAYS_BACK = '3 days' +DEFAULT_DAYS_BACK = "3 days" MAX_DAYS_BACK = 30 DATE_FORMAT = "%Y-%m-%d %H:%M:%S.%f" DEFAULT_PAGE_SIZE = 50 @@ -150,18 +166,18 @@ class Client(BaseClient): """ def __init__( - self, - username: str, - api_key: str, - new_domains: str, - changed_domains: str, - blocked_domains: str, - risk_score_ranges: List, - include_domain_data: Optional[bool] = None, - first_fetch: str = '3 days', - fetch_limit: Optional[int] = 50, - verify=None, - proxy=None, + self, + username: str, + api_key: str, + new_domains: str, + changed_domains: str, + blocked_domains: str, + risk_score_ranges: List, + include_domain_data: Optional[bool] = None, + first_fetch: str = "3 days", + fetch_limit: Optional[int] = 50, + verify=None, + proxy=None, ): super().__init__( DOMAINTOOLS_API_BASE_URL, @@ -199,10 +215,11 @@ def query_dt_api(self, end_point: str, method: str, **kwargs): query = { "api_username": self.username, "signature": signer.sign(timestamp, end_point), - "timestamp": timestamp + "timestamp": timestamp, } - full_url = urlunparse(("https", DOMAINTOOLS_API_BASE_URL, end_point, "", urlencode(query), None)) - + full_url = urlunparse( + ("https", DOMAINTOOLS_API_BASE_URL, end_point, "", urlencode(query), None) + ) return self._http_request( method=method, full_url=full_url, @@ -214,7 +231,9 @@ def query_dt_api(self, end_point: str, method: str, **kwargs): error_handler=dt_error_handler, ) - def create_indicator_from_detect_domain(self, item: Dict, term: Dict) -> Dict[str, Any]: + def create_indicator_from_detect_domain( + self, item: Dict, term: Dict + ) -> Dict[str, Any]: """Return the indicator object for the given DomainTools Iris Detect domain object. Args: @@ -224,7 +243,9 @@ def create_indicator_from_detect_domain(self, item: Dict, term: Dict) -> Dict[st Returns: Dict: The indicator object containing various fields and values. """ - risk_score_components = flatten_nested_dict(item.get("risk_score_components", {})) + risk_score_components = flatten_nested_dict( + item.get("risk_score_components", {}) + ) return { "name": "DomainTools Iris Detect", @@ -241,9 +262,14 @@ def create_indicator_from_detect_domain(self, item: Dict, term: Dict) -> Dict[st "irisdetectdiscovereddate": item.get("discovered_date", ""), "irisdetectchangeddate": item.get("changed_date", ""), "irisdetectdomainstatus": item.get("status", ""), - "irisdetectdomainstate": "blocked" if any(result.get("escalation_type", "") == "blocked" for result in - item.get("escalations", [])) else item.get("state", ""), - + "irisdetectdomainstate": ( + "blocked" + if any( + result.get("escalation_type", "") == "blocked" + for result in item.get("escalations", []) + ) + else item.get("state", "") + ), "domaintoolsriskscore": item.get("risk_score", ""), "domaintoolsriskscorestatus": item.get("risk_score_status", ""), "irisdetectdomainid": item.get("id", ""), @@ -282,8 +308,14 @@ def create_indicator_from_detect_domain(self, item: Dict, term: Dict) -> Dict[st }, } - def process_dt_domains_into_xsoar(self, domains_list: List[Dict[str, Any]], incident_name: str, last_run: str, - term: Dict[str, Any], enable_incidents: bool = True) -> List[Any]: + def process_dt_domains_into_xsoar( + self, + domains_list: List[Dict[str, Any]], + incident_name: str, + last_run: str, + term: Dict[str, Any], + enable_incidents: bool = True, + ) -> List[Any]: """ Create indicators and, optionally, an incident in XSOAR for a list of DomainTools Iris Detect domains. @@ -300,8 +332,13 @@ def process_dt_domains_into_xsoar(self, domains_list: List[Dict[str, Any]], inci otherwise an empty list. """ for domain in domains_list: - domain['monitor_term'] = join_dict_values_for_keys(domain.get("monitor_ids", []), term) - indicators = [self.create_indicator_from_detect_domain(item, term) for item in domains_list] + domain["monitor_term"] = join_dict_values_for_keys( + domain.get("monitor_ids", []), term + ) + indicators = [ + self.create_indicator_from_detect_domain(item, term) + for item in domains_list + ] if not indicators: return [] @@ -310,13 +347,19 @@ def process_dt_domains_into_xsoar(self, domains_list: List[Dict[str, Any]], inci demisto.info(f"Added {len(indicators)} indicators to demisto") if enable_incidents: - last_run_dt_without_ms = datetime.strptime(get_last_run(last_run), DATE_FORMAT).replace( - microsecond=0) if get_last_run(last_run) else None - first_run_dt_without_ms = (datetime.now() - timedelta(days=validate_first_fetch(self.first_fetch))).replace( - microsecond=0) + last_run_dt_without_ms = ( + datetime.strptime(get_last_run(last_run), DATE_FORMAT).replace( + microsecond=0 + ) + if get_last_run(last_run) + else None + ) + first_run_dt_without_ms = ( + datetime.now() - timedelta(days=validate_first_fetch(self.first_fetch)) + ).replace(microsecond=0) incident = { "name": f"{incident_name} " - f"{last_run_dt_without_ms or first_run_dt_without_ms}", + f"{last_run_dt_without_ms or first_run_dt_without_ms}", "details": json.dumps(domains_list), "rawJSON": json.dumps({"incidents": domains_list}), "type": INCIDENT_TYPE[incident_name], @@ -325,7 +368,9 @@ def process_dt_domains_into_xsoar(self, domains_list: List[Dict[str, Any]], inci return [] - def fetch_dt_domains_from_api(self, end_point: str, last_run: str) -> Tuple[List[Dict], str]: + def fetch_dt_domains_from_api( + self, end_point: str, last_run: str + ) -> tuple[List[Dict], str]: """ Makes an API call to the Domain Tools API endpoint and retrieves domain data based on the provided parameters. @@ -344,16 +389,20 @@ def fetch_dt_domains_from_api(self, end_point: str, last_run: str) -> Tuple[List if last_run_value: params = DOMAINTOOLS_PARAMS | { DT_TIMESTAMP_DICT[last_run]: last_run_value, - "include_domain_data": INCLUDE_DOMAIN_DATA_VALUE if self.include_domain_data else 0, + "include_domain_data": ( + INCLUDE_DOMAIN_DATA_VALUE if self.include_domain_data else 0 + ), } - demisto.info(f'Found last run, fetching domains from {last_run_value}') + demisto.info(f"Found last run, fetching domains from {last_run_value}") else: days_back = validate_first_fetch(self.first_fetch) params = DOMAINTOOLS_PARAMS | { DT_TIMESTAMP_DICT[last_run]: datetime.now() - timedelta(days=days_back), - "include_domain_data": INCLUDE_DOMAIN_DATA_VALUE if self.include_domain_data else 0, + "include_domain_data": ( + INCLUDE_DOMAIN_DATA_VALUE if self.include_domain_data else 0 + ), } - demisto.info(f'First run, fetching domains from last {days_back} days') + demisto.info(f"First run, fetching domains from last {days_back} days") if self.risk_score_ranges: params["risk_score_ranges[]"] = self.risk_score_ranges @@ -371,11 +420,13 @@ def fetch_and_process_domains(self) -> None: """Fetches DomainTools domain information and creates incidents in XSOAR.""" def process_domains( - process_endpoint: str, - process_timestamp_key: str, - process_incident_name: str, - import_only: bool, - process_filter_func: Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]] = None, + process_endpoint: str, + process_timestamp_key: str, + process_incident_name: str, + import_only: bool, + process_filter_func: Optional[ + Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]] + ] = None, ) -> str: """ Process domains by calling DomainTools API, filtering results, and converting them into XSOAR incidents. @@ -407,7 +458,9 @@ def process_domains( ) return last_run - def filter_blocked_domains(domains: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def filter_blocked_domains( + domains: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: """ Filters the list of domains to return only the blocked domains. @@ -420,12 +473,15 @@ def filter_blocked_domains(domains: List[Dict[str, Any]]) -> List[Dict[str, Any] return [ domain for domain in domains - if domain.get("escalations") and any( - escalation.get("escalation_type") == "blocked" for escalation in domain["escalations"]) + if domain.get("escalations") + and any( + escalation.get("escalation_type") == "blocked" + for escalation in domain["escalations"] + ) ] monitor_result = self.query_dt_api( - DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT, "GET" + DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT, "GET", params=DOMAINTOOLS_PARAMS ) term = { results.get("id"): results.get("term") @@ -459,11 +515,21 @@ def filter_blocked_domains(domains: List[Dict[str, Any]]) -> List[Dict[str, Any] last_runs = {CHANGED_DOMAIN_TIMESTAMP: "", NEW_DOMAIN_TIMESTAMP: ""} - for endpoint, timestamp_key, incident_name, domain_setting, filter_func in domains_to_process: + for ( + endpoint, + timestamp_key, + incident_name, + domain_setting, + filter_func, + ) in domains_to_process: if domain_setting: - last_runs[timestamp_key] = process_domains(endpoint, timestamp_key, incident_name, - domain_setting == "Import Indicators Only", filter_func, - ) + last_runs[timestamp_key] = process_domains( + endpoint, + timestamp_key, + incident_name, + domain_setting == "Import Indicators Only", + filter_func, + ) demisto.setIntegrationContext(last_runs) demisto.info(f"Adding {len(incidents)} incidents to demisto") @@ -561,17 +627,23 @@ def dt_error_handler(response: requests.Response) -> None: 403: "Forbidden: The request is understood, but it has been refused or access is not allowed.", 404: "Not Found: The requested resource could not be found.", 500: "Internal Server Error: An error occurred on the server side.", - 206: "Partial Content: The requested resource has been partially returned." + 206: "Partial Content: The requested resource has been partially returned.", } if response.status_code in {206} | set(range(400, 600)): try: error_json = response.json().get("error", {}) - error_message = (error_json.get("message") or " ".join( - error_json.get("messages", [])) or specific_error_messages.get(response.status_code, - "An unknown error occurred.")) + error_message = ( + error_json.get("message") + or " ".join(error_json.get("messages", [])) + or specific_error_messages.get( + response.status_code, "An unknown error occurred." + ) + ) except ValueError: - error_message = specific_error_messages.get(response.status_code, "An unknown error occurred.") + error_message = specific_error_messages.get( + response.status_code, "An unknown error occurred." + ) raise DemistoException(error_message, res=response) @@ -671,10 +743,10 @@ def format_watchlist_fields(result: Dict[Any, Any]) -> Dict[str, Any]: def format_data( - result: Dict[str, List[Dict[str, Any]]], - field: str, - output_prefix: str, - data_key: str, + result: Dict[str, List[Dict[str, Any]]], + field: str, + output_prefix: str, + data_key: str, ) -> Dict[str, Any]: """ Extracts and formats data. @@ -763,17 +835,22 @@ def create_common_api_arguments(args: Dict[str, Any]) -> Dict[str, Any]: return { "monitor_id": args.get("monitor_id"), "tlds[]": argToList(args.get("tlds")), - "include_domain_data": argToBoolean(args.get("include_domain_data")) if args.get( - "include_domain_data") else None, + "include_domain_data": ( + argToBoolean(args.get("include_domain_data")) + if args.get("include_domain_data") + else None + ), "risk_score_ranges[]": argToList(args.get("risk_score_ranges")), "sort[]": argToList(args.get("sort")), "order": args.get("order"), - "mx_exists": argToBoolean(args.get("mx_exists")) if args.get("mx_exists") else None, + "mx_exists": ( + argToBoolean(args.get("mx_exists")) if args.get("mx_exists") else None + ), "preview": argToBoolean(args.get("preview")) if args.get("preview") else None, "search": args.get("search"), "limit": arg_to_number(args.get("limit")), "page": arg_to_number(args.get("page")), - "page_size": arg_to_number(args.get("page_size")) + "page_size": arg_to_number(args.get("page_size")), } @@ -794,21 +871,23 @@ def create_escalated_api_arguments(args: Dict[str, Any]) -> Dict[str, Any]: return { "escalated_since": args.get("escalated_since"), "escalation_types[]": args.get("escalation_types"), - "changed_since": args.get("changed_since") + "changed_since": args.get("changed_since"), } -def pagination(page: Optional[int], page_size: Optional[int], limit: Optional[int]) -> Tuple[int, int]: +def pagination( + page: Optional[int], page_size: Optional[int], limit: Optional[int] +) -> tuple[int, int]: + """ + Define pagination. + Args: + limit: Records per page. + page: The page number. + page_size: The number of requested results per page. + Returns: + limit (int): Records per page. + offset (int): The number of records to be skipped. """ - Define pagination. - Args: - limit: Records per page. - page: The page number. - page_size: The number of requested results per page. - Returns: - limit (int): Records per page. - offset (int): The number of records to be skipped. - """ if page is not None and page <= 0: raise DemistoException(PAGE_NUMBER_ERROR_MSG) @@ -818,11 +897,14 @@ def pagination(page: Optional[int], page_size: Optional[int], limit: Optional[in raise DemistoException(LIMIT_ERROR_MSG) if page_size and limit: limit = page_size - return limit or page_size or DEFAULT_PAGE_SIZE, (page - 1 if page else DEFAULT_OFFSET) * (page_size or DEFAULT_PAGE_SIZE) + return limit or page_size or DEFAULT_PAGE_SIZE, ( + page - 1 if page else DEFAULT_OFFSET + ) * (page_size or DEFAULT_PAGE_SIZE) -def get_command_title_string(sub_context: str, page: Optional[int], page_size: Optional[int], - hits: Optional[int]) -> str: +def get_command_title_string( + sub_context: str, page: Optional[int], page_size: Optional[int], hits: Optional[int] +) -> str: """ Generates a command title string based on the provided context and pagination information. @@ -837,8 +919,10 @@ def get_command_title_string(sub_context: str, page: Optional[int], page_size: O """ if page and page_size and hits is not None and (page > 0 and page_size > 0): total_page = ceil(hits / page_size) if hits > 0 else 1 - return f'{sub_context} \nCurrent page size: {page_size}\n' \ - f'Showing page {page} out of {total_page}' + return ( + f"{sub_context} \nCurrent page size: {page_size}\n" + f"Showing page {page} out of {total_page}" + ) return f"{sub_context}" @@ -867,16 +951,23 @@ def get_max_limit(end_point: str, dt_args: Dict[str, Any]) -> int: include_domain_data = dt_args.get("include_domain_data", False) return ( - MONITOR_DOMAINS_LIMIT if end_point == DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT and not include_counts else - INCLUDE_COUNTS_LIMIT if include_counts else - INCLUDE_DOMAIN_DATA_LIMIT if include_domain_data else - DEFAULT_LIMIT + MONITOR_DOMAINS_LIMIT + if end_point == DOMAINTOOLS_MONITOR_DOMAINS_ENDPOINT and not include_counts + else ( + INCLUDE_COUNTS_LIMIT + if include_counts + else INCLUDE_DOMAIN_DATA_LIMIT if include_domain_data else DEFAULT_LIMIT + ) ) def get_results_helper( - client: Client, end_point: str, dt_args: Dict[str, Any], result_key: str, tb_header_name: str -) -> Tuple[List[Any], str]: + client: Client, + end_point: str, + dt_args: Dict[str, Any], + result_key: str, + tb_header_name: str, +) -> tuple[List[Any], str]: """ Helper function to get results for the given endpoint and result_key. @@ -902,12 +993,17 @@ def get_results_helper( total_count = 0 while True: - fetch_size = min(limit - len(results), max_limit) if limit is not None else max_limit + fetch_size = ( + min(limit - len(results), max_limit) if limit is not None else max_limit + ) if fetch_size <= 0: break dt_args.update({"offset": offset, "limit": fetch_size}) - response = client.query_dt_api(end_point, "GET", params=DOMAINTOOLS_PARAMS | dt_args) + + response = client.query_dt_api( + end_point, "GET", params=DOMAINTOOLS_PARAMS | dt_args + ) total_count = response.get("total_count", 0) new_results = response.get(result_key, []) @@ -920,11 +1016,13 @@ def get_results_helper( if len(new_results) < fetch_size: break - return results, get_command_title_string(tb_header_name, page, page_size, total_count) + return results, get_command_title_string( + tb_header_name, page, page_size, total_count + ) def fetch_domain_tools_api_results( - client: Client, end_point: str, tb_header_name: str, dt_args: Dict[str, Any] + client: Client, end_point: str, tb_header_name: str, dt_args: Dict[str, Any] ) -> CommandResults: """ Gets the results for a DomainTools API endpoint. @@ -939,7 +1037,10 @@ def fetch_domain_tools_api_results( CommandResults: The results of the command. """ - results, title = get_results_helper(client, end_point, dt_args, "watchlist_domains", tb_header_name) + + results, title = get_results_helper( + client, end_point, dt_args, "watchlist_domains", tb_header_name + ) indicator_list: List[Dict] = [] if results: @@ -964,14 +1065,16 @@ def fetch_domain_tools_api_results( outputs=results, outputs_prefix=f"{INTEGRATION_CONTEXT_NAME}.{CONTEXT_PATH_KEY[tb_header_name]}", outputs_key_field="domain", - readable_output=tableToMarkdown(name=title, t=indicator_list) - if indicator_list - else NO_DOMAINS_FOUND, + readable_output=( + tableToMarkdown(name=title, t=indicator_list) + if indicator_list + else NO_DOMAINS_FOUND + ), ) def domaintools_iris_detect_get_watched_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ domaintools_iris_detect_get_watched_domains_command: Get the watched domains list. @@ -992,7 +1095,7 @@ def domaintools_iris_detect_get_watched_domains_command( def domaintools_iris_detect_get_new_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ domaintools_iris_detect_get_new_domains_command: Get the new domains list. @@ -1009,14 +1112,12 @@ def domaintools_iris_detect_get_new_domains_command( DOMAINTOOLS_NEW_DOMAINS_ENDPOINT, DOMAINTOOLS_NEW_DOMAINS_HEADER, create_common_api_arguments(args) - | { - "discovered_since": args.get("discovered_since") - }, + | {"discovered_since": args.get("discovered_since")}, ) def domaintools_iris_detect_get_ignored_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ domaintools_iris_detect_get_ignored_domains_command: Get the ignored domains list. @@ -1037,7 +1138,7 @@ def domaintools_iris_detect_get_ignored_domains_command( def domaintools_iris_detect_get_blocklist_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ domaintools_iris_detect_get_blocklist_domains_command: Get the blocked domains list. @@ -1053,12 +1154,14 @@ def domaintools_iris_detect_get_blocklist_domains_command( client, DOMAINTOOLS_WATCHED_DOMAINS_ENDPOINT, DOMAINTOOLS_BLOCKED_DOMAINS_HEADER, - create_common_api_arguments(args) | create_escalated_api_arguments(args) | {"escalation_types[]": "blocked"}, + create_common_api_arguments(args) + | create_escalated_api_arguments(args) + | {"escalation_types[]": "blocked"}, ) def domaintools_iris_detect_get_escalated_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ domaintools_iris_detect_get_escalated_domains_command: Get the escalated domains @@ -1075,13 +1178,14 @@ def domaintools_iris_detect_get_escalated_domains_command( client, DOMAINTOOLS_WATCHED_DOMAINS_ENDPOINT, DOMAINTOOLS_ESCALATE_DOMAINS_HEADER, - create_common_api_arguments(args) | create_escalated_api_arguments(args) | { - "escalation_types[]": "google_safe"}, + create_common_api_arguments(args) + | create_escalated_api_arguments(args) + | {"escalation_types[]": "google_safe"}, ) def domaintools_iris_detect_get_monitors_list_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ Get the monitor domains list. @@ -1103,13 +1207,16 @@ def domaintools_iris_detect_get_monitors_list_command( } | create_common_api_arguments(args) | create_escalated_api_arguments(args), - "monitors", DOMAINTOOLS_MONITORS_HEADER + "monitors", + DOMAINTOOLS_MONITORS_HEADER, ) + if results: monitor_data = [format_monitor_fields(result) for result in results] headers = list(monitor_data[0].keys()) - readable_output = tableToMarkdown(name=title, t=monitor_data, - removeNull=True, headers=headers) + readable_output = tableToMarkdown( + name=title, t=monitor_data, removeNull=True, headers=headers + ) else: readable_output = NO_DOMAINS_FOUND return CommandResults( @@ -1121,7 +1228,7 @@ def domaintools_iris_detect_get_monitors_list_command( def handle_domain_action( - client: Client, args: Dict[str, Any], action: str + client: Client, args: Dict[str, Any], action: str ) -> CommandResults: """ Performs the specified action on one or more watchlist domains. @@ -1141,33 +1248,35 @@ def handle_domain_action( DOMAINTOOLS_MANAGE_WATCHLIST_ENDPOINT, DOMAINTOOLS_WATCHED_DOMAINS_HEADER, format_watchlist_fields, - "WatchedDomain" + "WatchedDomain", ), "ignored": ( "PATCH", DOMAINTOOLS_MANAGE_WATCHLIST_ENDPOINT, DOMAINTOOLS_IGNORE_DOMAINS_HEADER, format_watchlist_fields, - "IgnoredDomain" + "IgnoredDomain", ), "google_safe": ( "POST", DOMAINTOOLS_ESCALATE_DOMAINS_ENDPOINT, DOMAINTOOLS_ESCALATE_DOMAINS_HEADER, format_blocklist_fields, - "EscalatedDomain" + "EscalatedDomain", ), "blocked": ( "POST", DOMAINTOOLS_ESCALATE_DOMAINS_ENDPOINT, DOMAINTOOLS_BLOCKED_DOMAINS_HEADER, format_blocklist_fields, - "BlockedDomain" + "BlockedDomain", ), } method, endpoint, header, format_func, context_output_string = action_params[action] - data = {"watchlist_domain_ids": argToList(args.get("watchlist_domain_ids"))} | DOMAINTOOLS_PARAMS + data = { + "watchlist_domain_ids": argToList(args.get("watchlist_domain_ids")) + } | DOMAINTOOLS_PARAMS if action in ["watched", "ignored"]: data |= {"state": action} @@ -1175,24 +1284,28 @@ def handle_domain_action( data |= {"escalation_type": action} indicators_list = [ - dict(format_func(result)) for result in - client.query_dt_api(endpoint, method, json_data=data).get( - "watchlist_domains" if action in ["watched", "ignored"] else "escalations", []) + dict(format_func(result)) + for result in client.query_dt_api(endpoint, method, json_data=data).get( + "watchlist_domains" if action in ["watched", "ignored"] else "escalations", + [], + ) ] return CommandResults( outputs=indicators_list, outputs_prefix=f"{INTEGRATION_CONTEXT_NAME}.{context_output_string}", outputs_key_field="", - readable_output=tableToMarkdown(name=header, t=indicators_list) - if indicators_list - else NO_DOMAINS_FOUND, + readable_output=( + tableToMarkdown(name=header, t=indicators_list) + if indicators_list + else NO_DOMAINS_FOUND + ), raw_response=indicators_list, ) def domaintools_iris_detect_watch_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ Watch domains for changes using DomainTools Iris API. @@ -1209,7 +1322,7 @@ def domaintools_iris_detect_watch_domains_command( def domaintools_iris_detect_ignore_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ Ignore domains using DomainTools Iris API. @@ -1226,7 +1339,7 @@ def domaintools_iris_detect_ignore_domains_command( def domaintools_iris_detect_escalate_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ Escalate domains to Google Safe Browsing using DomainTools Iris API. @@ -1243,7 +1356,7 @@ def domaintools_iris_detect_escalate_domains_command( def domaintools_iris_detect_blocklist_domains_command( - client: Client, args: Dict[str, Any] + client: Client, args: Dict[str, Any] ) -> CommandResults: """ Blocklist domains using DomainTools Iris API. @@ -1272,15 +1385,15 @@ def main() -> None: command = demisto.command() args = demisto.args() params = demisto.params() - username = params.get('credentials', {}).get('identifier') - api_key = params.get('credentials', {}).get('password') + username = params.get("credentials", {}).get("identifier") + api_key = params.get("credentials", {}).get("password") verify_certificate = not params.get("insecure", False) proxy = params.get("proxy", False) handle_proxy() risk_score_ranges = argToList(params.get("risk_score_ranges")) include_domain_data = params.get("include_domain_data") - first_fetch_time = params.get('first_fetch', DEFAULT_DAYS_BACK).strip() - fetch_limit = arg_to_number(params.get('max_fetch', 50)) + first_fetch_time = params.get("first_fetch", DEFAULT_DAYS_BACK).strip() + fetch_limit = arg_to_number(params.get("max_fetch", 50)) new_domains = params.get("new_domains") changed_domains = params.get("changed_domains") blocked_domains = params.get("blocked_domains") diff --git a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.yml b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.yml index 493ca164efb6..fac966d9cb89 100644 --- a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.yml +++ b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect.yml @@ -104,7 +104,7 @@ script: commands: - arguments: - description: |- - List of Iris Detect domain IDs to escalate. The domain ID can be + List of Iris Detect domain IDs to escalate. The domain ID can be found using 'domaintools-iris-detect-get-new-domains' command. isArray: true name: watchlist_domain_ids @@ -132,8 +132,8 @@ script: type: String - arguments: - description: |- - List of Iris Detect domain IDs to escalate. The domain ID can be - found using 'domaintools-iris-detect-get-new-domains, + List of Iris Detect domain IDs to escalate. The domain ID can be + found using 'domaintools-iris-detect-get-new-domains, domaintools-iris-detect-get-watched-domains' commands. isArray: true name: watchlist_domain_ids @@ -161,7 +161,7 @@ script: type: String - arguments: - description: |- - List of Iris Detect domain IDs to escalate. The domain ID can be + List of Iris Detect domain IDs to escalate. The domain ID can be found using 'domaintools-iris-detect-get-new-domains' command. isArray: true name: watchlist_domain_ids @@ -192,7 +192,7 @@ script: type: String - arguments: - description: |- - List of Iris Detect domain IDs to escalate. The domain ID can be + List of Iris Detect domain IDs to escalate. The domain ID can be found using 'domaintools-iris-detect-get-new-domains, domaintools-iris-detect-get-watched-domains' command. isArray: true name: watchlist_domain_ids @@ -282,8 +282,8 @@ script: - description: Filter domains by when they were discovered. Provide a datetime in ISO 8601 format, for example 2022-05-18T12:19:51.685496. name: discovered_since - description: |- - Monitor ID is used when requesting domains for a specific monitor. - The monitor ID can be found using the + Monitor ID is used when requesting domains for a specific monitor. + The monitor ID can be found using the 'domaintools-iris-detect-get-monitors-list' command. name: monitor_id - auto: PREDEFINED @@ -436,8 +436,8 @@ script: - blocked - google_safe - description: |- - Monitor ID is used when requesting domains for a specific monitor. - The monitor ID can be found using the + Monitor ID is used when requesting domains for a specific monitor. + The monitor ID can be found using the 'domaintools-iris-detect-get-monitors-list' command. name: monitor_id - auto: PREDEFINED @@ -597,8 +597,8 @@ script: - 70-99 - 100-100 - description: |- - Monitor ID is used when requesting domains for a specific monitor. - The monitor ID can be found using the + Monitor ID is used when requesting domains for a specific monitor. + The monitor ID can be found using the 'domaintools-iris-detect-get-monitors-list' command. name: monitor_id - auto: PREDEFINED @@ -749,8 +749,8 @@ script: - 70-99 - 100-100 - description: |- - Monitor ID is used when requesting domains for a specific monitor. - The monitor ID can be found using the + Monitor ID is used when requesting domains for a specific monitor. + The monitor ID can be found using the 'domaintools-iris-detect-get-monitors-list' command. name: monitor_id - auto: PREDEFINED @@ -892,8 +892,8 @@ script: type: String - arguments: - description: |- - Monitor ID is used when requesting domains for a specific monitor. - The monitor ID can be found using the + Monitor ID is used when requesting domains for a specific monitor. + The monitor ID can be found using the 'domaintools-iris-detect-get-monitors-list' command. name: monitor_id - auto: PREDEFINED diff --git a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect_test.py b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect_test.py index 50f2abee06b7..695db4be34b1 100644 --- a/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect_test.py +++ b/Packs/DomainToolsIrisDetect/Integrations/DomainToolsIrisDetect/DomainToolsIrisDetect_test.py @@ -2,11 +2,10 @@ DomainTools Iris Detect Test Cases """ import hmac -import io import json import time from hashlib import sha256 -from typing import Any, Dict, Optional, Tuple +from typing import Any import pytest import requests @@ -213,7 +212,7 @@ def load_json(path): JSONDecodeError: If the file at the specified path contains invalid JSON. """ - with io.open(path, mode="r", encoding="utf-8") as file: + with open(path, encoding="utf-8") as file: return json.loads(file.read()) @@ -1012,7 +1011,7 @@ def test_validate_first_fetch_parametrized(value, expected): (1, 10, 10, (10, 0)), ], ) -def test_pagination(page: Optional[int], page_size: Optional[int], limit: Optional[int], expected: Tuple[int, int]): +def test_pagination(page: int | None, page_size: int | None, limit: int | None, expected: tuple[int, int]): """ Test the pagination function with various input cases, including when page, page_size, and limit are None, when only page is provided, when page and page_size are provided, and when all parameters are provided. @@ -1038,7 +1037,7 @@ def test_pagination(page: Optional[int], page_size: Optional[int], limit: Option (1, 10, 0, LIMIT_ERROR_MSG), ], ) -def test_pagination_errors(page: Optional[int], page_size: Optional[int], limit: Optional[int], error_msg: str): +def test_pagination_errors(page: int | None, page_size: int | None, limit: int | None, error_msg: str): """ Test the pagination function with invalid input cases that should raise exceptions. @@ -1061,7 +1060,7 @@ def test_pagination_errors(page: Optional[int], page_size: Optional[int], limit: ("Test Context", 1, 10, 0, "Test Context \nCurrent page size: 10\nShowing page 1 out of 1"), ], ) -def test_get_command_title_string(sub_context: str, page: Optional[int], page_size: Optional[int], hits: Optional[int], +def test_get_command_title_string(sub_context: str, page: int | None, page_size: int | None, hits: int | None, expected_output: str): """ Test the get_command_title_string function with various input cases. @@ -1086,7 +1085,7 @@ def test_get_command_title_string(sub_context: str, page: Optional[int], page_si ("some_other_endpoint", {"include_counts": False, "include_domain_data": False}, DEFAULT_LIMIT), ], ) -def test_get_max_limit(end_point: str, dt_args: Dict[str, Any], expected_max_limit: int): +def test_get_max_limit(end_point: str, dt_args: dict[str, Any], expected_max_limit: int): """ Test the get_max_limit function with various input cases, including different endpoints and argument combinations. diff --git a/Packs/DomainToolsIrisDetect/ReleaseNotes/1_0_14.md b/Packs/DomainToolsIrisDetect/ReleaseNotes/1_0_14.md new file mode 100644 index 000000000000..ba26b37f9ef9 --- /dev/null +++ b/Packs/DomainToolsIrisDetect/ReleaseNotes/1_0_14.md @@ -0,0 +1,9 @@ + +#### Integrations + +##### DomainTools Iris Detect + +- Fixed an issue in **fetch_incident** command that calls monitor domains API without the default following api parameters: + - *app_partner* + - *app_name* + - *app_version* diff --git a/Packs/DomainToolsIrisDetect/pack_metadata.json b/Packs/DomainToolsIrisDetect/pack_metadata.json index 2ef388318da3..aa32d0a80909 100644 --- a/Packs/DomainToolsIrisDetect/pack_metadata.json +++ b/Packs/DomainToolsIrisDetect/pack_metadata.json @@ -2,7 +2,7 @@ "name": "DomainTools Iris Detect", "description": "Iris Detect protects against malicious domains impersonating your brands and supply chain.", "support": "partner", - "currentVersion": "1.0.13", + "currentVersion": "1.0.14", "author": "DomainTools Integrations", "url": "http://www.domaintools.com", "email": "enterprisesupport@domaintools.com",