diff --git a/LICENSE b/LICENSE index 271de55..c0cc138 100644 --- a/LICENSE +++ b/LICENSE @@ -198,4 +198,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/README.md b/README.md index b50455c..8ad9c36 100644 --- a/README.md +++ b/README.md @@ -93,10 +93,11 @@ error. ## App ID - Optionally, you can specify an **App ID** to be used with the Crowdstrike OAuth API used in the - on poll action. If one isn't set, it will default to the asset ID. + on poll action. If one isn't set, it will default to the App ID with its last letters replaced by the Asset ID. - It is recommended to have a unique **App ID** for each connection to the Crowdstrike OAuth API. That is to say, if you are planning on having multiple assets using the Crowdstrike OAuth API at once, you should give them unique App IDs. +- Max length of an **APP ID** should be 32 characters ## On Poll diff --git a/crowdstrikeoauthapi.json b/crowdstrikeoauthapi.json index b3a74ce..3458a3d 100644 --- a/crowdstrikeoauthapi.json +++ b/crowdstrikeoauthapi.json @@ -22799,4 +22799,4 @@ } ] } -} +} \ No newline at end of file diff --git a/crowdstrikeoauthapi_connector.py b/crowdstrikeoauthapi_connector.py index 872fb9c..548e6e3 100644 --- a/crowdstrikeoauthapi_connector.py +++ b/crowdstrikeoauthapi_connector.py @@ -65,6 +65,11 @@ def __init__(self): self._poll_interval = None self._required_detonation = False self._stream_file_data = False + self._refresh_token_url = None + self._start_time = time.time() + self._interval_poll = False + self._refresh_token_timeout = None + self._total_events = 0 def initialize(self): """Automatically called by the BaseConnector before the calls to the handle_action function""" @@ -3020,7 +3025,9 @@ def _get_stream(self, action_result): if not self._data_feed_url: return action_result.set_status(phantom.APP_ERROR, CROWDSTRIKE_DATAFEED_EMPTY_ERROR) - session_token = resources[0].get("sessionToken") + session_token = resources[0].get('sessionToken') + self._refresh_token_url = resources[0].get('refreshActiveSessionURL') + self._refresh_token_timeout = resources[0].get('refreshActiveSessionInterval', 1800) if not session_token: return action_result.set_status(phantom.APP_ERROR, CROWDSTRIKE_SESSION_TOKEN_NOT_FOUND_ERROR) @@ -3070,11 +3077,11 @@ def _validate_integers(self, action_result, parameter, key, allow_zero=False): def _validate_on_poll_config_params(self, action_result, config): self.debug_print("Validating 'max_crlf' asset configuration parameter") - max_crlf = self._validate_integers( - action_result, - config.get("max_crlf", DEFAULT_BLANK_LINES_ALLOWABLE_LIMIT), - "max_crlf", - ) + + if "max_crlf" in config: + max_crlf = self._validate_integers(action_result, config.get("max_crlf"), "max_crlf") + else: + max_crlf = None self.debug_print("Validating 'merge_time_interval' asset configuration parameter") merge_time_interval = self._validate_integers( @@ -3113,23 +3120,16 @@ def _validate_on_poll_config_params(self, action_result, config): return max_crlf, merge_time_interval, max_events - def _on_poll(self, param): + def _on_poll(self, param): # noqa: C901 self.save_progress("In action handler for: {0}".format(self.get_action_identifier())) action_result = self.add_action_result(ActionResult(dict(param))) - # Connect to the server - if phantom.is_fail(self._get_stream(action_result)): - return action_result.get_status() - - if self._data_feed_url is None: - return action_result.set_status(phantom.APP_SUCCESS, CROWDSTRIKE_NO_MORE_FEEDS_AVAILABLE) - config = self.get_config() max_crlf, merge_time_interval, max_events = self._validate_on_poll_config_params(action_result, config) - if max_crlf is None or merge_time_interval is None or max_events is None: + if merge_time_interval is None or max_events is None: return action_result.get_status() lower_id = 0 @@ -3153,6 +3153,17 @@ def _on_poll(self, param): self.save_progress(CROWDSTRIKE_GETTING_EVENTS_MESSAGE.format(lower_id=lower_id, max_events=max_events)) + return self._start_data_feed(param, action_result, max_crlf, max_events, config, lower_id) + + def _start_data_feed(self, param, action_result, max_crlf, max_events, config, lower_id): + + # Connect to the server + if phantom.is_fail(self._get_stream(action_result)): + return action_result.get_status() + + if self._data_feed_url is None: + return action_result.set_status(phantom.APP_SUCCESS, CROWDSTRIKE_NO_MORE_FEEDS_AVAILABLE) + # Query for the events try: # Need to check both event types @@ -3187,11 +3198,48 @@ def _on_poll(self, param): ) # Parse the events - counter = 0 # counter for continuous blank lines - total_blank_lines_count = 0 # counter for total number of blank lines + counter = 0 # counter for continuous blank lines + total_blank_lines_count = 0 # counter for total number of blank lines + is_error_occurred = False + restart_process = False try: for stream_data in r.iter_lines(chunk_size=None): + # Check if it is time to refresh the stream connection and creating new bearer token [after 29 Min] + if int(time.time() - self._start_time) > (self._refresh_token_timeout - 60): + header = { + 'Authorization': 'Bearer {0}'.format(self._oauth_access_token), + 'Connection': 'Keep-Alive', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + ret_val, resp = self._make_rest_call_helper_oauth2( + action_result, self._refresh_token_url, headers=header, method="post", append=False) + + if phantom.is_fail(ret_val): + err_message = action_result.get_message() + self.debug_print(f"{CROWDSTRIKE_REFRESH_TOKEN_ERROR}: {err_message}") + if "no active stream session found" in err_message: + restart_process = True + if self._events: + self.save_progress(f"{CROWDSTRIKE_REFRESH_TOKEN_ERROR}. Saving the events...") + action_result.set_status(phantom.APP_ERROR, + f"{CROWDSTRIKE_REFRESH_TOKEN_ERROR}: {action_result.get_message()}") + is_error_occurred = True + break + else: + if restart_process: + self.save_progress("Restarting feed...") + self._start_time = time.time() + return self._start_data_feed(param, action_result, max_crlf, max_events, + config, lower_id) + return action_result.get_status() + elif max_crlf is None: + # Save events after refreshing the token + self._save_events_on_poll(config, total_blank_lines_count, param) + + self._start_time = time.time() + if stream_data is None: # Done with all the event data for now self.debug_print(CROWDSTRIKE_NO_DATA_MESSAGE) @@ -3203,7 +3251,7 @@ def _on_poll(self, param): counter += 1 total_blank_lines_count += 1 - if counter > max_crlf: + if max_crlf and counter > max_crlf: self.debug_print(CROWDSTRIKE_REACHED_CR_LF_COUNT_MESSAGE.format(counter)) self.save_progress(CROWDSTRIKE_REACHED_CR_LF_COUNT_MESSAGE.format(counter)) break @@ -3227,12 +3275,10 @@ def _on_poll(self, param): # Check for both event types if stream_data and stream_data.get("metadata", {}).get("eventType") in CROWDSTRIKE_EVENT_TYPES: self._events.append(stream_data) - counter = 0 # reset the continuous blank lines counter as we received a valid data in between + self._total_events += 1 + counter = 0 # reset the continuous blank lines counter as we received a valid data in between - # Calculate length of DetectionSummaryEvents until now - len_events = len(self._events) - - if max_events and len_events >= max_events: + if max_events and self._total_events >= max_events: self._events = self._events[:max_events] break @@ -3241,11 +3287,28 @@ def _on_poll(self, param): except Exception as e: err_message = self._get_error_message_from_exception(e) - return action_result.set_status( - phantom.APP_ERROR, - "{}. Error response from server: {}".format(CROWDSTRIKE_EVENTS_FETCH_ERROR, err_message), - ) + self.debug_print(f"{CROWDSTRIKE_EVENTS_FETCH_ERROR}. Error response from server: {err_message}") + if self._events: + self.save_progress(f"{CROWDSTRIKE_EVENTS_FETCH_ERROR}. Saving the events...") + action_result.set_status(phantom.APP_ERROR, "{}. Error response from server: {}".format( + CROWDSTRIKE_EVENTS_FETCH_ERROR, err_message)) + is_error_occurred = True + else: + return action_result.set_status(phantom.APP_ERROR, "{}. Error response from server: {}".format( + CROWDSTRIKE_EVENTS_FETCH_ERROR, err_message)) + + self._save_events_on_poll(config, total_blank_lines_count, param) + + if is_error_occurred: + if restart_process: + self.save_progress("Restarting feed...") + self._start_time = time.time() + return self._start_data_feed(param, action_result, max_crlf, max_events, config, lower_id) + return action_result.get_status() + return action_result.set_status(phantom.APP_SUCCESS) + + def _save_events_on_poll(self, config, total_blank_lines_count, param): # Check if to collate the data or not collate = config.get("collate", True) @@ -3260,19 +3323,21 @@ def _on_poll(self, param): # Update messages to reference both event types self.send_progress("Parsing the fetched Detection Events...") results = events_parser.parse_events(self._events, self, collate) - self.save_progress("Created {0} relevant results from the fetched Detection Events".format(len(results))) + self.save_progress( + "Created {0} relevant results from the fetched DetectionSummaryEvents".format(len(results))) if results: self.save_progress( "Adding {0} event artifact{1}. Empty containers will be skipped.".format(len(results), "s" if len(results) > 1 else "") ) self._save_results(results, param) - self.send_progress("Done") + self.send_progress("Events have been stored") if not self.is_poll_now(): last_event = self._events[-1] - last_offset_id = last_event["metadata"]["offset"] - self._state["last_offset_id"] = last_offset_id + 1 + last_offset_id = last_event['metadata']['offset'] + self._state['last_offset_id'] = last_offset_id + 1 + self.save_state(self._state) - return action_result.set_status(phantom.APP_SUCCESS) + self._events = [] def _handle_list_processes(self, param): @@ -4708,16 +4773,8 @@ def _make_rest_call_oauth2( return self._process_response(r, action_result, is_download) def _make_rest_call_helper_oauth2( - self, - action_result, - endpoint, - headers=None, - params=None, - data=None, - json_data=None, - method="get", - ): - """Function that helps setting REST call to the app. + self, action_result, endpoint, headers=None, params=None, data=None, json_data=None, method="get", append=True): + """ Function that helps setting REST call to the app. :param endpoint: REST endpoint that needs to appended to the service address :param action_result: object of ActionResult class @@ -4729,7 +4786,7 @@ def _make_rest_call_helper_oauth2( :return: status phantom.APP_ERROR/phantom.APP_SUCCESS(along with appropriate message), response obtained by making an API call """ - url = "{0}{1}".format(self._base_url_oauth, endpoint) + url = "{0}{1}".format(self._base_url_oauth, endpoint) if append else endpoint if headers is None: headers = {} @@ -4818,6 +4875,7 @@ def handle_action(self, param): self.debug_print("action_id ", self.get_action_identifier()) if self.get_action_identifier() == phantom.ACTION_ID_INGEST_ON_POLL: + self._interval_poll = True start_time = time.time() result = self._on_poll(param) end_time = time.time() diff --git a/crowdstrikeoauthapi_consts.py b/crowdstrikeoauthapi_consts.py index 55cbe35..e505612 100644 --- a/crowdstrikeoauthapi_consts.py +++ b/crowdstrikeoauthapi_consts.py @@ -95,6 +95,8 @@ CROWDSTRIKE_NO_MORE_FEEDS_AVAILABLE = "No more feeds available" CROWDSTRIKE_GETTING_EVENTS_MESSAGE = "Getting maximum {max_events} events from id {lower_id} onwards (ids might not be contiguous)" CROWDSTRIKE_CONNECTIVITY_ERROR = "Error connecting to server" +CROWDSTRIKE_REFRESH_TOKEN_ERROR = "Error while refreshing token" +CROWDSTRIKE_FROM_SERVER_ERROR = "Error from Server, Status Code: {status}, Message: {message}" CROWDSTRIKE_USING_BASE_URL_ERROR = "Using base url: {base_url}" CROWDSTRIKE_META_KEY_EMPTY_ERROR = "Meta key empty or not present" CROWDSTRIKE_RESOURCES_KEY_EMPTY_ERROR = "Resources key empty or not present. Please try after sometime" diff --git a/manual_readme_content.md b/manual_readme_content.md index f6fdb73..e7cc16b 100644 --- a/manual_readme_content.md +++ b/manual_readme_content.md @@ -81,10 +81,11 @@ error. ## App ID - Optionally, you can specify an **App ID** to be used with the Crowdstrike OAuth API used in the - on poll action. If one isn't set, it will default to the asset ID. + on poll action. If one isn't set, it will default to the App ID with its last letters replaced by the Asset ID. - It is recommended to have a unique **App ID** for each connection to the Crowdstrike OAuth API. That is to say, if you are planning on having multiple assets using the Crowdstrike OAuth API at once, you should give them unique App IDs. +- Max length of an **APP ID** should be 32 characters ## On Poll diff --git a/release_notes/unreleased.md b/release_notes/unreleased.md index fbcb2fd..adf9531 100644 --- a/release_notes/unreleased.md +++ b/release_notes/unreleased.md @@ -1 +1,2 @@ **Unreleased** +* Add support for refreshing session token in 'on poll' to ingest data for more than 29minutes [PAPP-32493]