Skip to content

Commit

Permalink
more logging, make domain event processing async
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Nov 16, 2023
1 parent f95aca5 commit db82c81
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions fixbackend/domain_events/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.


from datetime import timedelta
from datetime import datetime, timedelta
from logging import getLogger
from typing import Any, Awaitable, Callable, Dict, Generic, Tuple, Type, TypeVar

Expand All @@ -26,6 +26,7 @@
from fixbackend.config import Config
from fixbackend.domain_events import DomainEventsStreamName
from fixbackend.domain_events.events import Event
import asyncio

Kind = str

Expand All @@ -44,6 +45,9 @@ def with_callback(self, callback: Callable[[Evt], Awaitable[None]]) -> "HandlerD
return HandlerDescriptor(callbacks=self.callbacks + (callback,), event_cls=self.event_cls)


T = TypeVar("T")


class DomainEventSubscriber(Service):
def __init__(self, redis: Redis, config: Config) -> None:
self.redis = redis
Expand Down Expand Up @@ -72,12 +76,18 @@ def subscribe(self, event_cls: Type[Evt], handler: Callable[[Evt], Awaitable[Non
self.subscribers[event_cls.kind] = new_descriptor
log.info(f"Added domain event handler for {event_cls.kind}")

async def timed(self, callback: Callable[[Evt], Awaitable[None]], event: Evt) -> None:
before = datetime.utcnow()
await callback(event)
after = datetime.utcnow()
elapsed = after - before
log.info(f"Processed domain event {event} in {elapsed}")

async def process_domain_event(self, message: Json, context: MessageContext) -> None:
log.info(f"Processing domain event: {message} {context}")
handler = self.subscribers.get(context.kind)
if not handler:
return
event = handler.event_cls.from_json(message)
for callback in handler.callbacks:
await callback(event)
log.info(f"Processed domain event: {event} {context}")
asyncio.create_task(self.timed(callback, event))

0 comments on commit db82c81

Please sign in to comment.