Skip to content

Commit

Permalink
Revert "strip out >>> which were making it hard to copypaste the docs (
Browse files Browse the repository at this point in the history
…#69)"

This reverts commit dcbdca4.
  • Loading branch information
Bob Gregory committed Nov 9, 2018
1 parent b186404 commit 2298958
Showing 1 changed file with 91 additions and 91 deletions.
182 changes: 91 additions & 91 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,46 @@ Usually you will want to interact with photon pump via the :class:`~photonpump.C

First you will need to create a connection:

import asyncio
from photonpump import connect

loop = asyncio.get_event_loop()

async with connect(loop=loop) as c:
await c.ping()
>>> import asyncio
>>> from photonpump import connect
>>>
>>> loop = asyncio.get_event_loop()
>>>
>>> async with connect(loop=loop) as c:
>>> await c.ping()


The :func:`photonpump.connect` function returns an async context manager so that the connection will be automatically closed when you are finished. Alternatively you can create a client and manage its lifetime yourself.

import asyncio
from photonpump import connect

loop = asyncio.get_event_loop()

client = connect(loop=loop)
await client.connect()
await client.ping()
await client.close()
>>> import asyncio
>>> from photonpump import connect
>>>
>>> loop = asyncio.get_event_loop()
>>>
>>> client = connect(loop=loop)
>>> await client.connect()
>>> await client.ping()
>>> await client.close()

Reading and Writing single events
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A connection can be used for both reading and writing events. You can publish a single event with the :meth:`~photonpump.Client.publish_event` method:

# When publishing events, you must provide the stream name.
stream = 'ponies'
event_type = 'PonyJumped'

result = await conn.publish_event(stream, event_type, body={
'Pony': 'Derpy Hooves',
'Height': 10,
'Distance': 13
})
>>> # When publishing events, you must provide the stream name.
>>> stream = 'ponies'
>>> event_type = 'PonyJumped'
>>>
>>> result = await conn.publish_event(stream, event_type, body={
>>> 'Pony': 'Derpy Hooves',
>>> 'Height': 10,
>>> 'Distance': 13
>>> })

We can fetch a single event with the complementary :meth:`~photonpump.Client.get_event` method if we know its `event number` and the stream where it was published:

event_number = result.last_event_number
event = await conn.get_event(stream, event_number)
>>> event_number = result.last_event_number
>>> event = await conn.get_event(stream, event_number)

Assuming that your event was published as json, you can load the body with the :meth:`~photonpump.messages.Event.json` method:

Expand Down Expand Up @@ -129,75 +129,75 @@ Assuming that your event was published as json, you can load the body with the :
print(event)
data = event.json()
assert data['Pony'] == 'Derpy Hooves'
>>> data = event.json()
>>> assert data['Pony'] == 'Derpy Hooves'
Reading and Writing in Batches
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We can read and write several events in a request using the :meth:`~photonpump.Client.get` and :meth:`~photonpump.Client.publish` methods of our :class:`~photonpump.Client`. the :func:`photonpump.message.NewEvent` function is a helper for constructing events.
stream = 'more_ponies'
events = [
NewEvent('PonyJumped',
data={
'Pony': 'Peculiar Hooves',
'Height': 9,
'Distance': 13
}),
NewEvent('PonyJumped',
data={
'Pony': 'Sparkly Hooves',
'Height': 12,
'Distance': 12
}),
NewEvent('PonyJumped',
data={
'Pony': 'Sparkly Hooves',
'Height': 11,
'Distance': 14
})]
await conn.publish(stream, events)
>>> stream = 'more_ponies'
>>> events = [
>>> NewEvent('PonyJumped',
>>> data={
>>> 'Pony': 'Peculiar Hooves',
>>> 'Height': 9,
>>> 'Distance': 13
>>> }),
>>> NewEvent('PonyJumped',
>>> data={
>>> 'Pony': 'Sparkly Hooves',
>>> 'Height': 12,
>>> 'Distance': 12
>>> }),
>>> NewEvent('PonyJumped',
>>> data={
>>> 'Pony': 'Sparkly Hooves',
>>> 'Height': 11,
>>> 'Distance': 14
>>> })]
>>>
>>> await conn.publish(stream, events)
We can get events from a stream in slices by setting the `from_event_number` and `max_count` arguments. We can read events from either the front or back of the stream.
import StreamDirection from photonpump.messages
all_events = await conn.get(stream)
assert len(all_events) == 3
first_event = await conn.get(stream, max_count=1)[0].json()
assert first_event['Pony'] == 'Peculiar Hooves'
second_event = await conn.get(stream, max_count=1, from_event_number=1)[0].json()
assert second_event['Pony'] == 'Sparkly Hooves'
reversed_events = await conn.get(stream, direction=StreamDirection.backward)
assert len(reversed_events) == 3
assert reversed_events[2] == first_event
>>> import StreamDirection from photonpump.messages
>>>
>>> all_events = await conn.get(stream)
>>> assert len(all_events) == 3
>>>
>>> first_event = await conn.get(stream, max_count=1)[0].json()
>>> assert first_event['Pony'] == 'Peculiar Hooves'
>>>
>>> second_event = await conn.get(stream, max_count=1, from_event_number=1)[0].json()
>>> assert second_event['Pony'] == 'Sparkly Hooves'
>>>
>>> reversed_events = await conn.get(stream, direction=StreamDirection.backward)
>>> assert len(reversed_events) == 3
>>> assert reversed_events[2] == first_event
Reading with Asynchronous Generators
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We can page through a stream manually by using the `from_event_number` argument of :meth:`~photonpump.Client.get`, but it's simpler to use the :meth:`~photonpump.Client.iter` method, which returns an asynchronous generator. By default, `iter` will read from the beginning to the end of a stream, and then stop. As with `get`, you can set the :class:`~photon.messages.StreamDirection`, or use `from_event` to control the result:
async for event in conn.iter(stream):
print (event)
>>> async for event in conn.iter(stream):
>>> print (event)
This extends to asynchronous comprehensions:
async def feet_to_metres(jumps):
async for jump in jumps:
data = jump.json()
data['Height'] = data * 0.3048
data['Distance'] = data * 0.3048
yield data
jumps = (event async for event in conn.iter('ponies')
if event.type == 'PonyJumped')
async for jump in feet_to_metres(jumps):
print (event)
>>> async def feet_to_metres(jumps):
>>> async for jump in jumps:
>>> data = jump.json()
>>> data['Height'] = data * 0.3048
>>> data['Distance'] = data * 0.3048
>>> yield data
>>>
>>> jumps = (event async for event in conn.iter('ponies')
>>> if event.type == 'PonyJumped')
>>> async for jump in feet_to_metres(jumps):
>>> print (event)
Persistent Subscriptions
Expand All @@ -209,16 +209,16 @@ A persistent subscription stores its state on the server. When your application
Firstly, we need to :meth:`create the subscription <photonpump.connection.Client.create_subscription>`.
async def create_subscription(subscription_name, stream_name, conn):
await conn.create_subscription(subscription_name, stream_name)
>>> async def create_subscription(subscription_name, stream_name, conn):
>>> await conn.create_subscription(subscription_name, stream_name)
Once we have a subscription, we can :meth:`connect to it <photonpump.connection.Client.connect_subscription>` to begin receiving events. A persistent subscription exposes an `events` property, which acts like an asynchronous iterator.
async def read_events_from_subscription(subscription_name, stream_name, conn):
subscription = await conn.connect_subscription(subscription_name, stream_name)
async for event in subscription.events:
print(event)
await subscription.ack(event)
>>> async def read_events_from_subscription(subscription_name, stream_name, conn):
>>> subscription = await conn.connect_subscription(subscription_name, stream_name)
>>> async for event in subscription.events:
>>> print(event)
>>> await subscription.ack(event)
Eventstore will send each event to one consumer at a time. When you have handled the event, you must acknowledge receipt. Eventstore will resend messages that are unacknowledged.
Expand All @@ -230,9 +230,9 @@ In a Volatile Subscription, state is stored by the client. When your application
Volatile subsciptions do not support event acknowledgement.
async def subscribe_to_stream(stream, conn):
async for event in conn.subscribe_to(stream):
print(event)
>>> async def subscribe_to_stream(stream, conn):
>>> async for event in conn.subscribe_to(stream):
>>> print(event)
High-Availability Scenarios
Expand All @@ -242,9 +242,9 @@ Eventstore supports an HA-cluster deployment topology. In this scenario, Eventst
The cluster discovery interrogates eventstore gossip to find the active master. You can provide the IP of a maching in the cluster, or a DNS name that resolves to some members of the cluster, and photonpump will discover the others.
async def connect_to_cluster(hostname_or_ip, port=2113):
with connect(discovery_host=hostname_or_ip, discovery_port=2113) as c:
await c.ping()
>>> async def connect_to_cluster(hostname_or_ip, port=2113):
>>> with connect(discovery_host=hostname_or_ip, discovery_port=2113) as c:
>>> await c.ping()
If you provide both a `host` and `discovery_host`, photonpump will prefer discovery.
Expand Down

0 comments on commit 2298958

Please sign in to comment.