diff --git a/README.md b/README.md index 2f8d210..1cf986b 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,68 @@ from memphis.types import Retention, Storage import asyncio ``` +### Quickstart - Producing and Consuming + +The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages. + +> The Memphis.py SDK uses asyncio for many functions. Make sure to call the following code in an async function: + +```python +async def main(): + ... + +if __name__ == '__main__': + asyncio.run(main()) +``` + +First, a connection to Memphis must be made: + +```python +from memphis import Memphis + +# Connecting to the broker +memphis = Memphis() + +await memphis.connect( + host = "", + username = "", + password = "", + account_id = # For cloud users, at the top of the overview page +) +``` + +Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function: + +```python +await memphis.produce( + station_name="", + producer_name="", + message={ + "id": i, + "chocolates_to_eat": 3 + } +) +``` + +Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function: + +```python +from memphis.message import Message + +messages: list[Message] = await memphis.fetch_messages( + station_name="", + consumer_name="", +) # Type-hint the return here for LSP integration + +for consumed_message in messages: + msg_data = json.loads(consumed_message.get_data()) + print(f"Ate {msg_data['chocolates_to_eat']} chocolates... Yum") + + await consumed_message.ack() +``` + +> Remember to call `memphis.close()` to close the connection. + ### Connecting to Memphis First, we need to create Memphis `object` and then connect with Memphis by using `memphis.connect`. diff --git a/examples/consumer.py b/examples/consumer.py index 39d0c9f..92a4b9e 100644 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -1,46 +1,53 @@ -from __future__ import annotations +""" +An example consumer for the Memphis.dev python SDK. +""" import asyncio - -from memphis import Memphis, MemphisConnectError, MemphisError, MemphisHeaderError +import json +from memphis import Memphis +from memphis.message import Message async def main(): - async def msg_handler(msgs, error, _): - try: - for msg in msgs: - print("message: ", msg.get_data()) - await msg.ack() - if error: - print(error) - except (MemphisError, MemphisConnectError, MemphisHeaderError) as e: - print(e) - return - + """ + Async main function used for the asyncio runtime. + """ try: + # Connecting to the broker memphis = Memphis() + await memphis.connect( host="", - username="", - connection_token="", + username="", + password="", + # account_id=, # For cloud users on, at the top of the overview page ) consumer = await memphis.consumer( station_name="", consumer_name="", - consumer_group="", ) - consumer.set_context({"key": "value"}) - consumer.consume(msg_handler) - # Keep your main thread alive so the consumer will keep receiving data - await asyncio.Event().wait() + while True: + messages: list[ + Message + ] = await consumer.fetch() # Type-hint the return here for LSP integration - except (MemphisError, MemphisConnectError) as e: - print(e) + if len(messages) == 0: + continue + for consumed_message in messages: + msg_data = json.loads(consumed_message.get_data()) + + # Do something with the message data + print(msg_data) + await consumed_message.ack() + + except Exception as e: + print(e) finally: - await memphis.close() + if memphis != None: + await memphis.close() if __name__ == "__main__": diff --git a/examples/producer.py b/examples/producer.py index 01dd381..9b0b2b9 100644 --- a/examples/producer.py +++ b/examples/producer.py @@ -1,45 +1,38 @@ -from __future__ import annotations +""" +An example producer for the Memphis.dev python SDK. +""" import asyncio - -from memphis import ( - Headers, - Memphis, - MemphisConnectError, - MemphisError, - MemphisHeaderError, - MemphisSchemaError, -) +from memphis import Memphis, MemphisConnectError, MemphisError async def main(): + """ + Async main function used for the asyncio runtime. + """ try: + # Connecting to the broker memphis = Memphis() + await memphis.connect( host="", - username="", - connection_token="", + username="", + password="", + # account_id= , # For cloud users on, at the top of the overview page ) + # Creating a producer and producing a message. + # You can also use the memphis.producer function producer = await memphis.producer( - station_name="", producer_name="" + station_name="", # Matches the station name in memphis cloud + producer_name="", ) - headers = Headers() - headers.add("key", "value") - for i in range(5): - await producer.produce( - bytearray("Message #" + str(i) + ": Hello world", "utf-8"), - headers=headers, - ) # you can send the message parameter as dict as well - - except ( - MemphisError, - MemphisConnectError, - MemphisHeaderError, - MemphisSchemaError, - ) as e: - print(e) + for i in range(10): + await producer.produce(message={"id": i, "chocolates_to_eat": 3}) + + except (MemphisError, MemphisConnectError) as e: + print(e) finally: await memphis.close()