Skip to content

Commit

Permalink
feat(AWS): Async
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkheir committed Feb 26, 2024
1 parent bd923c8 commit a11ab9b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
73 changes: 46 additions & 27 deletions AWS/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from abc import ABCMeta
from functools import cached_property
from typing import Any, Callable

from pydantic import BaseModel, Field
from sekoia_automation.aio.connector import AsyncConnector
Expand Down Expand Up @@ -74,34 +75,52 @@ async def next_batch(self) -> tuple[list[str], list[int]]:

def run(self) -> None: # pragma: no cover
"""Run the connector."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())

async def async_run(self) -> None:
"""Run the connector."""
background_tasks = set()

Check warning on line 83 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L83

Added line #L83 was not covered by tests
while self.running:
try:
loop = asyncio.get_event_loop()

while self.running:
processing_start = time.time()

batch_result: tuple[list[str], list[int]] = loop.run_until_complete(self.next_batch())
message_ids, messages_timestamp = batch_result

# Identify delay between message timestamp ( when it was pushed to sqs )
# and current timestamp ( when it was processed )
processing_end = time.time()
for message_timestamp in messages_timestamp:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(
processing_end - (message_timestamp / 1000)
)

OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(message_ids))
FORWARD_EVENTS_DURATION.labels(intake_key=self.configuration.intake_key).observe(
processing_end - processing_start
)

if len(message_ids) > 0:
self.log(message="Pushed {0} records".format(len(message_ids)), level="info")
else:
self.log(message="No records to forward", level="info")
time.sleep(self.configuration.frequency)

processing_start = time.time()
result = await self.next_batch()
records, messages_timestamp = result

Check warning on line 88 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L86-L88

Added lines #L86 - L88 were not covered by tests
if records:
task = asyncio.create_task(self.push_data_to_intakes(events=records))
background_tasks.add(task)
task.add_done_callback(

Check warning on line 92 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L90-L92

Added lines #L90 - L92 were not covered by tests
background_tasks.discard
) # Remove the task from the one that must be awaited when exiting
task.add_done_callback(self.push_data_to_intakes_callback(processing_start, messages_timestamp))

Check warning on line 95 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L95

Added line #L95 was not covered by tests
else:
self.log(message="No records to forward", level="info")
await asyncio.sleep(self.configuration.frequency)

Check warning on line 98 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L97-L98

Added lines #L97 - L98 were not covered by tests
except Exception as e:
self.log_exception(e)

# Wait for all logs to be pushed before exiting
await asyncio.gather(*background_tasks, return_exceptions=True)

Check warning on line 103 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L103

Added line #L103 was not covered by tests

def push_data_to_intakes_callback(
self, processing_start: float, messages_timestamp: list[int]
) -> Callable[[asyncio.Task[Any]], None]:
"""Callback to remove the task from the background tasks set."""

def callback(task: asyncio.Task[Any]) -> None:

Check warning on line 110 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L110

Added line #L110 was not covered by tests
"""Callback to remove the task from the background tasks set."""
message_ids = task.result()
processing_end = time.time()

Check warning on line 113 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L112-L113

Added lines #L112 - L113 were not covered by tests
for message_timestamp in messages_timestamp:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(

Check warning on line 115 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L115

Added line #L115 was not covered by tests
processing_end - (message_timestamp / 1000)
)

OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(message_ids))
FORWARD_EVENTS_DURATION.labels(intake_key=self.configuration.intake_key).observe(

Check warning on line 120 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L119-L120

Added lines #L119 - L120 were not covered by tests
processing_end - processing_start
)
if len(message_ids) > 0:
self.log(message="Pushed {0} records".format(len(message_ids)), level="info")

Check warning on line 124 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L124

Added line #L124 was not covered by tests

return callback

Check warning on line 126 in AWS/connectors/__init__.py

View check run for this annotation

Codecov / codecov/patch

AWS/connectors/__init__.py#L126

Added line #L126 was not covered by tests
5 changes: 2 additions & 3 deletions AWS/connectors/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABCMeta
from functools import cached_property
from gzip import decompress
from typing import Any

import orjson

Expand Down Expand Up @@ -147,6 +148,4 @@ async def next_batch(self, previous_processing_end: float | None = None) -> tupl
if len(records) >= self.configuration.records_in_queue_per_batch or not records:
continue_receiving = False

result = await self.push_data_to_intakes(events=records)

return result, timestamps_to_log
return records, timestamps_to_log

0 comments on commit a11ab9b

Please sign in to comment.