Skip to content

Commit

Permalink
Merge pull request #1059 from SEKOIA-IO/refactor/MessageQueueEventsLa…
Browse files Browse the repository at this point in the history
…gMetrics

Azure & AWS: add two new metrics to monitor events lag
  • Loading branch information
squioc authored Aug 5, 2024
2 parents d4e3652 + 354256b commit d7f0b6e
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 10 deletions.
6 changes: 6 additions & 0 deletions AWS/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 2024-08-01 - 1.31.4

### Added

- Add two new metrics to follow the messages age

## 2024-07-22 - 1.31.3

### Added
Expand Down
14 changes: 11 additions & 3 deletions AWS/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sekoia_automation.connector import DefaultConnectorConfiguration
from sekoia_automation.module import Module

from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, OUTCOMING_EVENTS
from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, OUTCOMING_EVENTS, AVERAGE_MESSAGES_AGE, OLDER_MESSAGE_AGE


class AwsModuleConfiguration(BaseModel):
Expand Down Expand Up @@ -78,6 +78,8 @@ def run(self) -> None: # pragma: no cover
while self.running:
processing_start = time.time()
current_lag: int = 0
avg_lag: int = 0
max_lag: int = 0

batch_result: tuple[list[str], list[int]] = loop.run_until_complete(self.next_batch())
message_ids, messages_timestamp = batch_result
Expand All @@ -96,13 +98,19 @@ def run(self) -> None: # pragma: no cover

# Identify delay between message timestamp ( when it was pushed to sqs )
# and current timestamp ( when it was processed )
max_message_timestamp = max(messages_timestamp)
current_lag = int(processing_end - max_message_timestamp / 1000)
messages_age = [
int(processing_end - message_timestamp / 1000) for message_timestamp in messages_timestamp
]
current_lag = min(messages_age)
avg_lag = int(sum(messages_age) / len(messages_age))
max_lag = max(messages_age)
else:
self.log(message="No records to forward", level="info")

# report the current lag
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(current_lag)
AVERAGE_MESSAGES_AGE.labels(intake_key=self.configuration.intake_key).set(avg_lag)
OLDER_MESSAGE_AGE.labels(intake_key=self.configuration.intake_key).set(max_lag)

# compute the remaining sleeping time. If greater than 0 and no messages were fetched, sleep
delta_sleep = self.configuration.frequency - batch_duration
Expand Down
14 changes: 14 additions & 0 deletions AWS/connectors/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@
namespace=prom_namespace,
labelnames=["intake_key"],
)

AVERAGE_MESSAGES_AGE = Gauge(
name="average_messages_age",
documentation="The average age of messages seen",
namespace=prom_namespace,
labelnames=["intake_key"],
)

OLDER_MESSAGE_AGE = Gauge(
name="older_message_age",
documentation="The age of the older message seen",
namespace=prom_namespace,
labelnames=["intake_key"],
)
4 changes: 2 additions & 2 deletions AWS/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
"name": "AWS",
"uuid": "b4462429-6f0f-42b5-87b8-430111697d28",
"slug": "aws",
"version": "1.31.3"
}
"version": "1.31.4"
}
6 changes: 6 additions & 0 deletions Azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 2024-08-01 - 2.5.3

### Added

- Add two new metrics to follow the messages age

## 2024-07-24 - 2.5.2

### Fixed
Expand Down
29 changes: 25 additions & 4 deletions Azure/connectors/azure_eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
from sekoia_automation.aio.connector import AsyncConnector
from sekoia_automation.connector import DefaultConnectorConfiguration

from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, OUTCOMING_EVENTS
from .metrics import (
EVENTS_LAG,
FORWARD_EVENTS_DURATION,
INCOMING_MESSAGES,
OUTCOMING_EVENTS,
OLDER_MESSAGE_AGE,
AVERAGE_MESSAGES_AGE,
)


class AzureEventsHubConfiguration(DefaultConnectorConfiguration):
Expand Down Expand Up @@ -93,7 +100,11 @@ async def handle_messages(self, partition_context: PartitionContext, messages: l
),
level="info",
)

# reset the metrics
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(0)
AVERAGE_MESSAGES_AGE.labels(intake_key=self.configuration.intake_key).set(0)
OLDER_MESSAGE_AGE.labels(intake_key=self.configuration.intake_key).set(0)
await self.client.close()

# acknowledge the messages
Expand Down Expand Up @@ -142,9 +153,19 @@ async def forward_events(self, messages: list[EventData]) -> None:
enqueued_times = [message.enqueued_time for message in messages if message.enqueued_time is not None]
if len(enqueued_times) > 0:
now = datetime.now(timezone.utc)
most_recent_enqueued_time = max(enqueued_times)
current_lag = now - most_recent_enqueued_time
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(int(current_lag.total_seconds()))
messages_age = [int((now - enqueued_time).total_seconds()) for enqueued_time in enqueued_times]

# Compute the distance from the most recent message consumed
current_lag = min(messages_age)
EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(current_lag)

# Compute the distance from the older message consumed
max_lag = max(messages_age)
OLDER_MESSAGE_AGE.labels(intake_key=self.configuration.intake_key).set(max_lag)

# Compute the distance from the older message consumed
avg_lag = int(sum(messages_age) / len(messages_age))
AVERAGE_MESSAGES_AGE.labels(intake_key=self.configuration.intake_key).set(avg_lag)

async def handle_exception(self, partition_context: PartitionContext, exception: Exception) -> None:
self.log_exception(
Expand Down
14 changes: 14 additions & 0 deletions Azure/connectors/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@
namespace=prom_namespace,
labelnames=["intake_key"],
)

AVERAGE_MESSAGES_AGE = Gauge(
name="average_messages_age",
documentation="The average age of messages seen",
namespace=prom_namespace,
labelnames=["intake_key"],
)

OLDER_MESSAGE_AGE = Gauge(
name="older_message_age",
documentation="The age of the older message seen",
namespace=prom_namespace,
labelnames=["intake_key"],
)
2 changes: 1 addition & 1 deletion Azure/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"name": "Microsoft Azure",
"uuid": "525eecc0-9eee-484d-92bd-039117cf4dac",
"slug": "azure",
"version": "2.5.2"
"version": "2.5.3"
}

0 comments on commit d7f0b6e

Please sign in to comment.