Skip to content

Commit

Permalink
Allow subscribe_to to accept batch_size (#76)
Browse files Browse the repository at this point in the history
This commit adds an optional "batch_size" parameter to the subscribe_to method.
  • Loading branch information
bram2000 authored and bobthemighty committed Dec 21, 2018
1 parent 8717514 commit 34b69fb
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
10 changes: 7 additions & 3 deletions photonpump/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,17 @@ async def connect_subscription(

return await future

async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1):
async def subscribe_to(
self, stream, start_from=-1, resolve_link_tos=True, batch_size: int = 100
):

"""
Subscribe to receive notifications when a new event is published
to a stream.
Args:
stream: The name of the stream.
start_from: The first event to read.
start_from (optional): The first event to read.
This parameter defaults to the magic value -1 which is treated
as meaning "from the end of the stream". IF this value is used,
no historical events will be returned.
Expand All @@ -849,6 +851,8 @@ async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1):
sent direct to the master node, otherwise False.
correlation_id (optional): A unique identifer for this
command.
batch_size (optioal): The number of events to pull down from
eventstore in one go.
Returns:
A VolatileSubscription.
Expand All @@ -872,7 +876,7 @@ async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1):
if start_from == -1:
cmd = convo.SubscribeToStream(stream, resolve_link_tos)
else:
cmd = convo.CatchupSubscription(stream, start_from)
cmd = convo.CatchupSubscription(stream, start_from, batch_size)

future = await self.dispatcher.start_conversation(cmd)

Expand Down
11 changes: 9 additions & 2 deletions photonpump/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,14 @@ async def __aexit__(self, exc_type, exc, tb):


class CatchupSubscription(ReadStreamEventsBehaviour, PageStreamEventsBehaviour):
def __init__(self, stream, start_from=0, credential=None, conversation_id=None):
def __init__(
self,
stream,
start_from=0,
batch_size=100,
credential=None,
conversation_id=None,
):
self.stream = stream
self.iterator = StreamingIterator()
self.conversation_id = conversation_id or uuid4()
Expand All @@ -950,7 +957,7 @@ def __init__(self, stream, start_from=0, credential=None, conversation_id=None):
)
self.from_event = start_from
self.direction = StreamDirection.Forward
self.batch_size = 100
self.batch_size = batch_size
self.has_first_page = False
self.require_master = False
self.resolve_link_tos = True
Expand Down
2 changes: 1 addition & 1 deletion photonpump/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TcpCommand(IntEnum):
UpdatePersistentSubscription = 0xCE
UpdatePersistentSubscriptionCompleted = 0xCF

BadRequest = 0xf0
BadRequest = 0xF0
NotHandled = 0xF1
Authenticate = 0xF2
Authenticated = 0xF3
Expand Down
15 changes: 15 additions & 0 deletions test/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,18 @@ async def test_connect_subscription(event_loop):

event = await subscription.events.anext()
assert event.original_event_id == event_id


@pytest.mark.asyncio
async def test_subscribe_to(event_loop):

async with connect(username="admin", password="changeit", loop=event_loop) as conn:
stream_name = str(uuid.uuid4())
event_id = uuid.uuid4()

await conn.publish_event(stream_name, "my-event-type", id=event_id)

subscription = await conn.subscribe_to(stream_name, start_from=0)

event = await subscription.events.anext()
assert event.original_event_id == event_id

0 comments on commit 34b69fb

Please sign in to comment.