diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py index dd3834b..25e2573 100644 --- a/examples/4_service_to_service/example_client.py +++ b/examples/4_service_to_service/example_client.py @@ -35,6 +35,8 @@ def event_callback( payload: Value of the response from the Service. """ print(payload) + # break out of pubsub loop + raise Exception if __name__ == '__main__': diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index 32b768c..5bdd9ae 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -125,6 +125,8 @@ def __init__( self.got_valid_response: bool = False self.response_fn = response_handler self.waiting: bool = False + self.cleanup_req = False + """When this flag is set to True, mark this request for GC deletion.""" def __init__( self, @@ -507,14 +509,6 @@ def create_external_request( self._external_requests_lock.release_lock() return request_uuid - def _delete_external_request(self, req_id: UUID) -> None: - req_id_str = str(req_id) - if req_id_str in self._external_requests: - self._external_requests_lock.acquire_lock(blocking=True) - req: IntersectService._ExternalRequest = self._external_requests.pop(req_id_str) - del req - self._external_requests_lock.release_lock() - def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalRequest | None: req_id_str = str(req_id) if req_id_str in self._external_requests: @@ -524,14 +518,25 @@ def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalReque def _process_external_requests(self) -> None: self._external_requests_lock.acquire_lock(blocking=True) + + # process requests for extreq in self._external_requests.values(): if not extreq.processed: self._process_external_request(extreq) + # delete requests + cleanup_list = [ + str(extreq.request_id) + for extreq in self._external_requests.values() + if extreq.cleanup_req + ] + for extreq_id in cleanup_list: + extreq = self._external_requests.pop(extreq_id) + del extreq + self._external_requests_lock.release_lock() def _process_external_request(self, extreq: IntersectService._ExternalRequest) -> None: response = None - cleanup_req = False now = datetime.now(timezone.utc) logger.debug(f'Processing external request {extreq.request_id} @ {now}') @@ -555,7 +560,7 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - logger.warning( f'External service request encountered an error: {error_msg}' ) - cleanup_req = True + extreq.cleanup_req = True else: logger.debug('Request wait timed-out!') extreq.waiting = False @@ -570,9 +575,6 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) - ): extreq.response_fn(response) - if cleanup_req: - self._delete_external_request(extreq.request_id) - def _handle_service_message_raw(self, raw: bytes) -> None: """Main broker callback function. diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index c65071f..f1990df 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -137,5 +137,5 @@ def test_example_3_ping_pong_events_amqp(): def test_example_4_service_to_service(): assert ( run_example_test('4_service_to_service') - == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!' + == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n' )