Skip to content

Commit

Permalink
Merge pull request #1112 from SEKOIA-IO/lv/fix_eventhub_epoch_bug
Browse files Browse the repository at this point in the history
Azure EventHub - fix epoch error
  • Loading branch information
squioc authored Sep 30, 2024
2 parents 4649aed + c05db61 commit 259b0b5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
6 changes: 6 additions & 0 deletions Azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2024-09-30 - 2.5.5

### Changed

- Change the way we run async workflow

## 2024-09-20 - 2.5.4

### Changed
Expand Down
73 changes: 45 additions & 28 deletions Azure/connectors/azure_eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,10 @@
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient, PartitionContext
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from dateutil.parser import isoparse
from sekoia_automation.aio.connector import AsyncConnector
from sekoia_automation.connector import DefaultConnectorConfiguration

from .metrics import (
EVENTS_LAG,
FORWARD_EVENTS_DURATION,
INCOMING_MESSAGES,
OUTCOMING_EVENTS,
MESSAGES_AGE,
)
from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, MESSAGES_AGE, OUTCOMING_EVENTS


class AzureEventsHubConfiguration(DefaultConnectorConfiguration):
Expand Down Expand Up @@ -167,25 +160,49 @@ async def handle_exception(self, partition_context: PartitionContext, exception:
message="Error raised when consuming messages",
)

def run(self) -> None: # pragma: no cover
self.log(message="Azure EventHub Trigger has started", level="info")
async def receive_events(self) -> None:
try:
await self.client.receive_batch(
on_event_batch=self.handle_messages,
on_error=self.handle_exception,
max_wait_time=self._consumption_max_wait_time,
)

except asyncio.CancelledError:
# Handle the cancellation properly and ensure the client is closed.
await self.client.close()
raise

except Exception as ex:
self.log_exception(ex, message="Failed to consume messages")
raise ex

async def async_run(self) -> None:
while self.running:
task = asyncio.create_task(self.receive_events())

try:
loop = asyncio.get_event_loop()

while self.running:
try:
loop.run_until_complete(
self.client.receive_batch(
on_event_batch=self.handle_messages,
on_error=self.handle_exception,
max_wait_time=self._consumption_max_wait_time,
)
)

except Exception as ex:
self.log_exception(ex, message="Failed to consume messages")
raise ex

except Exception as error:
self.log_exception(error, message="Failed to forward events")
# Allow the task to run for the specified duration (10 minutes)
await asyncio.sleep(600)

# Cancel the receiving task after the duration
task.cancel()

# Wait for the task to handle the cancellation
await task

except asyncio.CancelledError:
self.log(message="Receiving task was cancelled", level="info")

finally:
# Ensure the client is closed properly
await self.client.close()

# Sleep for a short period before starting the next cycle
await asyncio.sleep(5) # Adjust if necessary

def run(self) -> None: # pragma: no cover
self.log(message="Azure EventHub Trigger has started", level="info")

loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())
2 changes: 1 addition & 1 deletion Azure/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"name": "Microsoft Azure",
"uuid": "525eecc0-9eee-484d-92bd-039117cf4dac",
"slug": "azure",
"version": "2.5.4",
"version": "2.5.5",
"categories": [
"Cloud Providers"
]
Expand Down

0 comments on commit 259b0b5

Please sign in to comment.