diff --git a/Azure/CHANGELOG.md b/Azure/CHANGELOG.md index 45cba8832..a9c34137a 100644 --- a/Azure/CHANGELOG.md +++ b/Azure/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 2024-09-30 - 2.5.5 + +### Changed + +- Change the way we run async workflow + ## 2024-09-20 - 2.5.4 ### Changed diff --git a/Azure/connectors/azure_eventhub.py b/Azure/connectors/azure_eventhub.py index 88848b04e..fae776cc1 100644 --- a/Azure/connectors/azure_eventhub.py +++ b/Azure/connectors/azure_eventhub.py @@ -10,17 +10,10 @@ from azure.eventhub import EventData from azure.eventhub.aio import EventHubConsumerClient, PartitionContext from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore -from dateutil.parser import isoparse from sekoia_automation.aio.connector import AsyncConnector from sekoia_automation.connector import DefaultConnectorConfiguration -from .metrics import ( - EVENTS_LAG, - FORWARD_EVENTS_DURATION, - INCOMING_MESSAGES, - OUTCOMING_EVENTS, - MESSAGES_AGE, -) +from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, MESSAGES_AGE, OUTCOMING_EVENTS class AzureEventsHubConfiguration(DefaultConnectorConfiguration): @@ -167,25 +160,49 @@ async def handle_exception(self, partition_context: PartitionContext, exception: message="Error raised when consuming messages", ) - def run(self) -> None: # pragma: no cover - self.log(message="Azure EventHub Trigger has started", level="info") + async def receive_events(self) -> None: + try: + await self.client.receive_batch( + on_event_batch=self.handle_messages, + on_error=self.handle_exception, + max_wait_time=self._consumption_max_wait_time, + ) + + except asyncio.CancelledError: + # Handle the cancellation properly and ensure the client is closed. + await self.client.close() + raise + + except Exception as ex: + self.log_exception(ex, message="Failed to consume messages") + raise ex + + async def async_run(self) -> None: while self.running: + task = asyncio.create_task(self.receive_events()) + try: - loop = asyncio.get_event_loop() - - while self.running: - try: - loop.run_until_complete( - self.client.receive_batch( - on_event_batch=self.handle_messages, - on_error=self.handle_exception, - max_wait_time=self._consumption_max_wait_time, - ) - ) - - except Exception as ex: - self.log_exception(ex, message="Failed to consume messages") - raise ex - - except Exception as error: - self.log_exception(error, message="Failed to forward events") + # Allow the task to run for the specified duration (10 minutes) + await asyncio.sleep(600) + + # Cancel the receiving task after the duration + task.cancel() + + # Wait for the task to handle the cancellation + await task + + except asyncio.CancelledError: + self.log(message="Receiving task was cancelled", level="info") + + finally: + # Ensure the client is closed properly + await self.client.close() + + # Sleep for a short period before starting the next cycle + await asyncio.sleep(5) # Adjust if necessary + + def run(self) -> None: # pragma: no cover + self.log(message="Azure EventHub Trigger has started", level="info") + + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_run()) diff --git a/Azure/manifest.json b/Azure/manifest.json index a2437ff51..06228a8e9 100644 --- a/Azure/manifest.json +++ b/Azure/manifest.json @@ -8,7 +8,7 @@ "name": "Microsoft Azure", "uuid": "525eecc0-9eee-484d-92bd-039117cf4dac", "slug": "azure", - "version": "2.5.4", + "version": "2.5.5", "categories": [ "Cloud Providers" ]