From a58f8da2693d2c46dd6480ef59e382447935a1b4 Mon Sep 17 00:00:00 2001 From: lvoloshyn-sekoia Date: Thu, 26 Sep 2024 17:35:13 +0300 Subject: [PATCH 1/3] Azure EventHub - fix epoch error --- Azure/connectors/azure_eventhub.py | 73 ++++++++++++++++++------------ 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/Azure/connectors/azure_eventhub.py b/Azure/connectors/azure_eventhub.py index 88848b04e..1f1a5c960 100644 --- a/Azure/connectors/azure_eventhub.py +++ b/Azure/connectors/azure_eventhub.py @@ -10,17 +10,10 @@ from azure.eventhub import EventData from azure.eventhub.aio import EventHubConsumerClient, PartitionContext from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore -from dateutil.parser import isoparse from sekoia_automation.aio.connector import AsyncConnector from sekoia_automation.connector import DefaultConnectorConfiguration -from .metrics import ( - EVENTS_LAG, - FORWARD_EVENTS_DURATION, - INCOMING_MESSAGES, - OUTCOMING_EVENTS, - MESSAGES_AGE, -) +from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, MESSAGES_AGE, OUTCOMING_EVENTS class AzureEventsHubConfiguration(DefaultConnectorConfiguration): @@ -167,25 +160,49 @@ async def handle_exception(self, partition_context: PartitionContext, exception: message="Error raised when consuming messages", ) - def run(self) -> None: # pragma: no cover - self.log(message="Azure EventHub Trigger has started", level="info") + async def receive_events(self): + try: + await self.client.receive_batch( + on_event_batch=self.handle_messages, + on_error=self.handle_exception, + max_wait_time=self._consumption_max_wait_time, + ) + + except asyncio.CancelledError: + # Handle the cancellation properly and ensure the client is closed. + await self.client.close() + raise + + except Exception as ex: + self.log_exception(ex, message="Failed to consume messages") + raise ex + + async def async_run(self) -> None: while self.running: + task = asyncio.create_task(self.receive_events()) + try: - loop = asyncio.get_event_loop() - - while self.running: - try: - loop.run_until_complete( - self.client.receive_batch( - on_event_batch=self.handle_messages, - on_error=self.handle_exception, - max_wait_time=self._consumption_max_wait_time, - ) - ) - - except Exception as ex: - self.log_exception(ex, message="Failed to consume messages") - raise ex - - except Exception as error: - self.log_exception(error, message="Failed to forward events") + # Allow the task to run for the specified duration (10 minutes) + await asyncio.sleep(600) + + # Cancel the receiving task after the duration + task.cancel() + + # Wait for the task to handle the cancellation + await task + + except asyncio.CancelledError: + self.log(message="Receiving task was cancelled", level="info") + + finally: + # Ensure the client is closed properly + await self.client.close() + + # Sleep for a short period before starting the next cycle + await asyncio.sleep(5) # Adjust if necessary + + def run(self) -> None: # pragma: no cover + self.log(message="Azure EventHub Trigger has started", level="info") + + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_run()) From 752ceab5d2ccf7e22886c6455f7d5a19544f7af2 Mon Sep 17 00:00:00 2001 From: lvoloshyn-sekoia Date: Thu, 26 Sep 2024 17:41:18 +0300 Subject: [PATCH 2/3] fix type annotation --- Azure/connectors/azure_eventhub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Azure/connectors/azure_eventhub.py b/Azure/connectors/azure_eventhub.py index 1f1a5c960..fae776cc1 100644 --- a/Azure/connectors/azure_eventhub.py +++ b/Azure/connectors/azure_eventhub.py @@ -160,7 +160,7 @@ async def handle_exception(self, partition_context: PartitionContext, exception: message="Error raised when consuming messages", ) - async def receive_events(self): + async def receive_events(self) -> None: try: await self.client.receive_batch( on_event_batch=self.handle_messages, From c05db6114ff5f5106344beb9e446794f3c0bac5f Mon Sep 17 00:00:00 2001 From: lvoloshyn-sekoia Date: Mon, 30 Sep 2024 16:40:16 +0300 Subject: [PATCH 3/3] manifest and changelog --- Azure/CHANGELOG.md | 6 ++++++ Azure/manifest.json | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Azure/CHANGELOG.md b/Azure/CHANGELOG.md index 68a1a862e..007feb4b7 100644 --- a/Azure/CHANGELOG.md +++ b/Azure/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## 2024-09-30 - 2.5.5 + +### Changed + +- Change the way we run async workflow + ## 2024-09-20 - 2.5.4 ### Changed diff --git a/Azure/manifest.json b/Azure/manifest.json index a2437ff51..06228a8e9 100644 --- a/Azure/manifest.json +++ b/Azure/manifest.json @@ -8,7 +8,7 @@ "name": "Microsoft Azure", "uuid": "525eecc0-9eee-484d-92bd-039117cf4dac", "slug": "azure", - "version": "2.5.4", + "version": "2.5.5", "categories": [ "Cloud Providers" ]