diff --git a/fixbackend/domain_events/subscriber.py b/fixbackend/domain_events/subscriber.py index eb0ce48d..d1bd2108 100644 --- a/fixbackend/domain_events/subscriber.py +++ b/fixbackend/domain_events/subscriber.py @@ -13,7 +13,7 @@ # along with this program. If not, see . -from datetime import timedelta +from datetime import datetime, timedelta from logging import getLogger from typing import Any, Awaitable, Callable, Dict, Generic, Tuple, Type, TypeVar @@ -26,6 +26,7 @@ from fixbackend.config import Config from fixbackend.domain_events import DomainEventsStreamName from fixbackend.domain_events.events import Event +import asyncio Kind = str @@ -44,6 +45,9 @@ def with_callback(self, callback: Callable[[Evt], Awaitable[None]]) -> "HandlerD return HandlerDescriptor(callbacks=self.callbacks + (callback,), event_cls=self.event_cls) +T = TypeVar("T") + + class DomainEventSubscriber(Service): def __init__(self, redis: Redis, config: Config) -> None: self.redis = redis @@ -72,6 +76,13 @@ def subscribe(self, event_cls: Type[Evt], handler: Callable[[Evt], Awaitable[Non self.subscribers[event_cls.kind] = new_descriptor log.info(f"Added domain event handler for {event_cls.kind}") + async def timed(self, callback: Callable[[Evt], Awaitable[None]], event: Evt) -> None: + before = datetime.utcnow() + await callback(event) + after = datetime.utcnow() + elapsed = after - before + log.info(f"Processed domain event {event} in {elapsed}") + async def process_domain_event(self, message: Json, context: MessageContext) -> None: log.info(f"Processing domain event: {message} {context}") handler = self.subscribers.get(context.kind) @@ -79,5 +90,4 @@ async def process_domain_event(self, message: Json, context: MessageContext) -> return event = handler.event_cls.from_json(message) for callback in handler.callbacks: - await callback(event) - log.info(f"Processed domain event: {event} {context}") + asyncio.create_task(self.timed(callback, event))