diff --git a/photonpump/connection.py b/photonpump/connection.py index 67faf6b..83a4106 100644 --- a/photonpump/connection.py +++ b/photonpump/connection.py @@ -826,7 +826,9 @@ async def connect_subscription( return await future - async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1): + async def subscribe_to( + self, stream, start_from=-1, resolve_link_tos=True, batch_size: int = 100 + ): """ Subscribe to receive notifications when a new event is published @@ -834,7 +836,7 @@ async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1): Args: stream: The name of the stream. - start_from: The first event to read. + start_from (optional): The first event to read. This parameter defaults to the magic value -1 which is treated as meaning "from the end of the stream". IF this value is used, no historical events will be returned. @@ -849,6 +851,8 @@ async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1): sent direct to the master node, otherwise False. correlation_id (optional): A unique identifer for this command. + batch_size (optioal): The number of events to pull down from + eventstore in one go. Returns: A VolatileSubscription. @@ -872,7 +876,7 @@ async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1): if start_from == -1: cmd = convo.SubscribeToStream(stream, resolve_link_tos) else: - cmd = convo.CatchupSubscription(stream, start_from) + cmd = convo.CatchupSubscription(stream, start_from, batch_size) future = await self.dispatcher.start_conversation(cmd) diff --git a/photonpump/conversations.py b/photonpump/conversations.py index a572f8e..5de9138 100644 --- a/photonpump/conversations.py +++ b/photonpump/conversations.py @@ -941,7 +941,14 @@ async def __aexit__(self, exc_type, exc, tb): class CatchupSubscription(ReadStreamEventsBehaviour, PageStreamEventsBehaviour): - def __init__(self, stream, start_from=0, credential=None, conversation_id=None): + def __init__( + self, + stream, + start_from=0, + batch_size=100, + credential=None, + conversation_id=None, + ): self.stream = stream self.iterator = StreamingIterator() self.conversation_id = conversation_id or uuid4() @@ -950,7 +957,7 @@ def __init__(self, stream, start_from=0, credential=None, conversation_id=None): ) self.from_event = start_from self.direction = StreamDirection.Forward - self.batch_size = 100 + self.batch_size = batch_size self.has_first_page = False self.require_master = False self.resolve_link_tos = True diff --git a/photonpump/messages.py b/photonpump/messages.py index fdb4ac5..c171464 100644 --- a/photonpump/messages.py +++ b/photonpump/messages.py @@ -65,7 +65,7 @@ class TcpCommand(IntEnum): UpdatePersistentSubscription = 0xCE UpdatePersistentSubscriptionCompleted = 0xCF - BadRequest = 0xf0 + BadRequest = 0xF0 NotHandled = 0xF1 Authenticate = 0xF2 Authenticated = 0xF3 diff --git a/test/connection_test.py b/test/connection_test.py index cbdad61..dc68d34 100644 --- a/test/connection_test.py +++ b/test/connection_test.py @@ -27,3 +27,18 @@ async def test_connect_subscription(event_loop): event = await subscription.events.anext() assert event.original_event_id == event_id + + +@pytest.mark.asyncio +async def test_subscribe_to(event_loop): + + async with connect(username="admin", password="changeit", loop=event_loop) as conn: + stream_name = str(uuid.uuid4()) + event_id = uuid.uuid4() + + await conn.publish_event(stream_name, "my-event-type", id=event_id) + + subscription = await conn.subscribe_to(stream_name, start_from=0) + + event = await subscription.events.anext() + assert event.original_event_id == event_id