Skip to content

Commit

Permalink
Merge pull request #1183 from SEKOIA-IO/fix/azure_stop_method
Browse files Browse the repository at this point in the history
Fix: Azure stop method
  • Loading branch information
squioc authored Nov 20, 2024
2 parents 847aa0d + 7a49ceb commit 0062bff
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
8 changes: 8 additions & 0 deletions Azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2024-11-20 - 2.6.3

### Fixed

- Remove shutdown to simplify the workflow
- Fix the way to handle stop


## 2024-11-15 - 2.6.2

### Fixed
Expand Down
12 changes: 10 additions & 2 deletions Azure/connectors/azure_eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from azure.eventhub.aio import EventHubConsumerClient, PartitionContext
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from sekoia_automation.aio.connector import AsyncConnector
from sekoia_automation.connector import DefaultConnectorConfiguration
from sekoia_automation.connector import DefaultConnectorConfiguration, Connector

from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, MESSAGES_AGE, OUTCOMING_EVENTS

Expand Down Expand Up @@ -165,6 +165,12 @@ async def receive_events(self) -> None:
max_wait_time=self._consumption_max_wait_time,
)

def stop(self, *args: Any, **kwargs: Optional[Any]) -> None: # pragma: no cover
"""
Stop the connector
"""
super(Connector, self).stop(*args, **kwargs)

async def async_run(self) -> None: # pragma: no cover
while self.running:
try:
Expand All @@ -174,10 +180,12 @@ async def async_run(self) -> None: # pragma: no cover
self.log_exception(ex, message="Failed to consume messages")
self._has_more_events = False

await self.client.close()

if not self._has_more_events:
await asyncio.sleep(self._frequency)

await self.client.close()
await self._session.close()

def run(self) -> None: # pragma: no cover
self.log("Azure EventHub Trigger has started")
Expand Down
2 changes: 1 addition & 1 deletion Azure/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"name": "Microsoft Azure",
"uuid": "525eecc0-9eee-484d-92bd-039117cf4dac",
"slug": "azure",
"version": "2.6.2",
"version": "2.6.3",
"categories": [
"Cloud Providers"
]
Expand Down

0 comments on commit 0062bff

Please sign in to comment.