Skip to content

Commit

Permalink
docs: update nats kv and os documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 22, 2024
1 parent 46212d9 commit 609abac
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 101 deletions.
44 changes: 18 additions & 26 deletions docs/docs/en/nats/jetstream/key-value.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,34 @@ This interface provides you with rich abilities to use it like a regular *KV* st

## FastStream Details

**FastStream** has no native interfaces to this *NatsJS* functionality (yet), but it allows you to get access into the inner `JetStream` object to create it manually.
**FastStream** has some useful methods to help you with **Key-Value NATS** feature interacting.

First of all, you need to create a *Key-Value* storage object and pass it into the context:
First of all, you need to create a *Key-Value* storage object and put some value to it:

```python linenums="1" hl_lines="12-13"
{! docs_src/nats/js/key_value.py [ln:5-8,11-13,22-27] !}
```python linenums="1" hl_lines="9-10"
{! docs_src/nats/js/key_value.py [ln:1-5,12-16] !}
```

!!! tip
We placed this code in `#!python @app.on_startup` hook because `#!python @app.after_startup` will be triggered **AFTER** your handlers start consuming messages. So, if you need to have access to any custom context objects, you should set them up in the `#!python @app.on_startup` hook.

Also, we call `#!python await broker.connect()` method manually to establish the connection to be able to create a storage.
`#!python broker.key_value(bucket="bucket")` is an idempotent method. It means that it stores all already created storages in memory and do not make new request to **NATS** if your are trying to call it for the same bucket.

---

Next, we are ready to use this object right in our handlers.

Let's create an annotated object to shorten context object access:

```python linenums="1" hl_lines="4"
{! docs_src/nats/js/key_value.py [ln:1-3,9] !}
```

And just use it in a handler:
Then we are able to use returned `key_value` object as a regular NATS one. But, if you want to watch by any changes by some key in the bucket, **FastStream** allows you to make it via regular `@broker.subscriber` interface:

```python linenums="1" hl_lines="4 6-8"
{! docs_src/nats/js/key_value.py [ln:4,14-19] !}
```python linenums="1" hl_lines="1"
{! docs_src/nats/js/key_value.py [ln:8-10] !}
```

Finally, let's test our code behavior by putting something into the KV storage and sending a message:
Also, if you want more detail settings for you **Key Value Storage**, we have `KvWatch` object for it:

```python linenums="1" hl_lines="3-4"
{! docs_src/nats/js/key_value.py [ln:30-33] !}
```
```python linenums="1" hl_lines="5"
from faststream.nats import NatsBroker, KvWatch

??? example "Full listing"
```python linenums="1"
{!> docs_src/nats/js/key_value.py !}
```
@broker.subscriber(
"key",
kv_watch=KvWatch("bucket", declare=False),
)
async def handler(msg: str):
...
```
50 changes: 26 additions & 24 deletions docs/docs/en/nats/jetstream/object.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,47 @@ The main difference between *KV* and *Object* storages is that in the *Object* s

## FastStream Details

**FastStream** has no native interfaces to this *NatsJS* functionality (yet), but it allows you to access the inner `JetStream` object to create in manually.
**FastStream** has some useful methods to help you with **Object Storage NATS** feature interacting.

First of all, you need to create an *Object* storage object and pass in to the context:
First of all, you need to create a *Object Storage* object and put some value to it:

```python linenums="1" hl_lines="12-13"
{! docs_src/nats/js/object.py [ln:7-10,13-15,24-29] !}
```python linenums="1" hl_lines="11-12"
{! docs_src/nats/js/object.py [ln:1-2,3,5,7-10,23-26] !}
```

!!! tip
We placed this code in the `#!python @app.on_startup` hook because `#!python @app.after_startup` will be triggered **AFTER** your handlers start consuming messages. So, if you need to have access to any custom context objects, you should set them up in the `#!python @app.on_startup` hook.
* [`BytesIO`](https://docs.python.org/3/library/io.html#binary-i-o){.external-link target="_blank"} - is a *Readable* object used to emulate a file opened for reading.

Also, we call `#!python await broker.connect()` method manually to establish the connection to be able to create a storage.
* `#!python broker.object_storage(bucket="example-bucket")` is an idempotent method. It means that it stores all already created storages in memory and do not make new request to **NATS** if your are trying to call it for the same bucket.

---

Next, we are ready to use this object right in the our handlers.
Then we are able to use returned `object_storage` object as a regular NATS one. But, if you want to watch by any new files in the bucket, **FastStream** allows you to make it via regular `@broker.subscriber` interface:

Let's create an Annotated object to shorten `Context` object access:

```python linenums="1" hl_lines="4"
{! docs_src/nats/js/object.py [ln:3-5,11] !}
```python linenums="1" hl_lines="1"
@broker.subscriber("example-bucket", obj_watch=True)
async def handler(filename: str):
assert filename == "file.txt"
```

And just use it in a handler:
**NATS** deliveres you just a filename (and some more metainformation you can get access via `message.raw_message`) because files can be any size. The framework should protect your service from memory overflow, so we can't upload whole file content right to the memo. By you can make it manually the following way:

```python linenums="1" hl_lines="6 8-9"
{! docs_src/nats/js/object.py [ln:1-2,6,16-21] !}
```python linenums="1" hl_lines="1 6 10-11"
{! docs_src/nats/js/object.py [ln:6-7,12-20] !}
```

Finally, let's test our code behavior by putting something into the *Object storage* and sending a message:
!!! note
`faststream.nats.annotations.ObjectStorage` is a your current bucket, so you need no to put it to context manually.

```python linenums="1" hl_lines="3-4"
{! docs_src/nats/js/object.py [ln:32-35] !}
```
Also, if you want more detail settings for you **Object Storage**, we have `ObjWatch` object for it:

!!! tip
[`BytesIO`](https://docs.python.org/3/library/io.html#binary-i-o){.external-link target="_blank"} - is a *Readable* object used to emulate a file opened for reading.
```python linenums="1" hl_lines="5"
from faststream.nats import NatsBroker, ObjWatch

??? example "Full listing"
```python linenums="1"
{!> docs_src/nats/js/object.py !}
```
@broker.subscriber(
"example-bucket",
obj_watch=ObjWatch(declare=False),
)
async def handler(filename: str):
...
```
31 changes: 7 additions & 24 deletions docs/docs_src/nats/js/key_value.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,16 @@
from nats.js.kv import KeyValue as KV
from typing_extensions import Annotated

from faststream import Logger
from faststream import Context, FastStream, Logger
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import ContextRepo

KeyValue = Annotated[KV, Context("kv")]

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber("subject")
async def handler(msg: str, kv: KeyValue, logger: Logger):
logger.info(msg)
kv_data = await kv.get("key")
assert kv_data.value == b"Hello!"


@app.on_startup
async def setup_broker(context: ContextRepo):
await broker.connect()

kv = await broker.stream.create_key_value(bucket="bucket")
context.set_global("kv", kv)
@broker.subscriber("key", kv_watch="bucket")
async def handler(msg: str):
assert msg == "Hello!"


@app.after_startup
async def test_send(kv: KeyValue):
await kv.put("key", b"Hello!")
await broker.publish("Hi!", "subject")
async def setup_broker():
key_value = await broker.key_value(bucket="bucket")
await key_value.put("key", b"Hello!")
37 changes: 14 additions & 23 deletions docs/docs_src/nats/js/object.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,26 @@
from io import BytesIO

from nats.js.object_store import ObjectStore as OS
from typing_extensions import Annotated

from faststream import FastStream
from faststream import Logger
from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import ContextRepo

ObjectStorage = Annotated[OS, Context("OS")]
from faststream.nats.annotations import ObjectStorage

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber("subject")
async def handler(msg: str, os: ObjectStorage, logger: Logger):
logger.info(msg)
obj = await os.get("file")
assert obj.data == b"File mock"


@app.on_startup
async def setup_broker(context: ContextRepo):
await broker.connect()

os = await broker.stream.create_object_store("bucket")
context.set_global("OS", os)
@broker.subscriber("example-bucket", obj_watch=True)
async def handler(
filename: str,
storage: ObjectStorage,
logger: Logger,
):
assert filename == "file.txt"
file = await storage.get(filename)
logger.info(file.data)


@app.after_startup
async def test_send(os: ObjectStorage):
await os.put("file", BytesIO(b"File mock"))
await broker.publish("Hi!", "subject")
async def test_send():
object_storage = await broker.object_storage("example-bucket")
await object_storage.put("file.txt", BytesIO(b"File mock"))
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.7"
__version__ = "0.5.8"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
2 changes: 1 addition & 1 deletion tests/docs/nats/js/test_kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ async def test_basic():

async with TestNatsBroker(broker, with_real=True), TestApp(app):
await handler.wait_call(3.0)
handler.mock.assert_called_once_with("Hi!")
handler.mock.assert_called_once_with(b"Hello!")
21 changes: 19 additions & 2 deletions tests/docs/nats/js/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@
async def test_basic():
from docs.docs_src.nats.js.object import app, broker, handler

async with TestNatsBroker(broker, with_real=True), TestApp(app):
async with TestNatsBroker(broker, with_real=True):
await broker.start()

os = await broker.object_storage("example-bucket")
try:
existed_files = await os.list()
except Exception:
existed_files = ()

call = True
for file in existed_files:
if file.name == "file.txt":
call = False

if call:
async with TestApp(app):
pass

await handler.wait_call(3.0)
handler.mock.assert_called_once_with("Hi!")
handler.mock.assert_called_once_with("file.txt")

0 comments on commit 609abac

Please sign in to comment.