Skip to content

Commit

Permalink
fix svc2svc request cleanup issue and E2E test
Browse files Browse the repository at this point in the history
Signed-off-by: Lance Drane <[email protected]>
  • Loading branch information
Lance-Drane committed Aug 21, 2024
1 parent 2ca43b7 commit ccca786
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
2 changes: 2 additions & 0 deletions examples/4_service_to_service/example_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def event_callback(
payload: Value of the response from the Service.
"""
print(payload)
# break out of pubsub loop
raise Exception


if __name__ == '__main__':
Expand Down
28 changes: 15 additions & 13 deletions src/intersect_sdk/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __init__(
self.got_valid_response: bool = False
self.response_fn = response_handler
self.waiting: bool = False
self.cleanup_req = False
"""When this flag is set to True, mark this request for GC deletion."""

def __init__(
self,
Expand Down Expand Up @@ -507,14 +509,6 @@ def create_external_request(
self._external_requests_lock.release_lock()
return request_uuid

def _delete_external_request(self, req_id: UUID) -> None:
req_id_str = str(req_id)
if req_id_str in self._external_requests:
self._external_requests_lock.acquire_lock(blocking=True)
req: IntersectService._ExternalRequest = self._external_requests.pop(req_id_str)
del req
self._external_requests_lock.release_lock()

def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalRequest | None:
req_id_str = str(req_id)
if req_id_str in self._external_requests:
Expand All @@ -524,14 +518,25 @@ def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalReque

def _process_external_requests(self) -> None:
self._external_requests_lock.acquire_lock(blocking=True)

# process requests
for extreq in self._external_requests.values():
if not extreq.processed:
self._process_external_request(extreq)
# delete requests
cleanup_list = [
str(extreq.request_id)
for extreq in self._external_requests.values()
if extreq.cleanup_req
]
for extreq_id in cleanup_list:
extreq = self._external_requests.pop(extreq_id)
del extreq

self._external_requests_lock.release_lock()

def _process_external_request(self, extreq: IntersectService._ExternalRequest) -> None:
response = None
cleanup_req = False

now = datetime.now(timezone.utc)
logger.debug(f'Processing external request {extreq.request_id} @ {now}')
Expand All @@ -555,7 +560,7 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) -
logger.warning(
f'External service request encountered an error: {error_msg}'
)
cleanup_req = True
extreq.cleanup_req = True
else:
logger.debug('Request wait timed-out!')
extreq.waiting = False
Expand All @@ -570,9 +575,6 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) -
):
extreq.response_fn(response)

if cleanup_req:
self._delete_external_request(extreq.request_id)

def _handle_service_message_raw(self, raw: bytes) -> None:
"""Main broker callback function.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,5 @@ def test_example_3_ping_pong_events_amqp():
def test_example_4_service_to_service():
assert (
run_example_test('4_service_to_service')
== 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!'
== 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n'
)

0 comments on commit ccca786

Please sign in to comment.