diff --git a/docs/docs/en/contributing/2_contributing-index.md b/docs/docs/en/contributing/2_contributing-index.md index 18f8b41d..db970fb0 100644 --- a/docs/docs/en/contributing/2_contributing-index.md +++ b/docs/docs/en/contributing/2_contributing-index.md @@ -20,7 +20,8 @@ Activate the new environment with: source ./venv/bin/activate ``` -Make sure you have the latest pip version on your virtual environment to +Make sure you have the latest pip version on your virtual environment to + ```bash python -m pip install --upgrade pip ``` diff --git a/docs/docs/en/getting_started/4_broker/2_routing.md b/docs/docs/en/getting_started/4_broker/2_routing.md index 93db5a9e..faf765c3 100644 --- a/docs/docs/en/getting_started/4_broker/2_routing.md +++ b/docs/docs/en/getting_started/4_broker/2_routing.md @@ -14,8 +14,9 @@ This behavior is similar for all brokers, however, the parameters passed to `@br To learn more about the behavior of specialized brokers, go to the following sections: -* [RabbitBroker](../../../rabbit/1_routing) -* [NatsBroker](../../../nats/2_routing) +* [RabbitBroker](../../../rabbit/1_routing/#routing-rules) +* [NatsBroker](../../../nats/1_nats-index/#routing-rules) +* [RedisBroker](../../../redis/1_redis-index/#routing-rules) ## Error handling diff --git a/docs/docs/en/nats/1_nats-index.md b/docs/docs/en/nats/1_nats-index.md index 98f143c8..db111ab5 100644 --- a/docs/docs/en/nats/1_nats-index.md +++ b/docs/docs/en/nats/1_nats-index.md @@ -1,3 +1,26 @@ # NATS -{! docs/en/helpful/in-progress.md !} +## Advantages and disadvantages + +**NATS** is an easy-to-use, high-performance message broker written in *Golang*. If your application does not require complex routing logic, should cope with high load, scale and do not require large hardware costs, *NATS* will be an excellent choice for you. + +!!! note + More information about *NATS* can be found on the [official website](https://nats.io ){.external-link target="_blank"} + +However, *NATS* has disadvantages that you should be aware of: + +* Messages are not persistent. If the message is published while your consumer is disconnected, it will be lost. +* There are no complex routing mechanisms. +* There are no mechanisms for confirming receipt and processing of messages from the consumer. + +These shortcomings are corrected by using the persistent level - *JetStream*. If you need strict guarantees for the delivery and processing of messages to the small detriment of the speed and resources consumed, you can use *NatsJS*. + +## Routing rules + +*NATS* does not have the ability to configure complex routing rules. The only entity in *NATS* is `subject`, which can be subscribed to either directly by name or by regular expression pattern. + +Both examples are discussed [a little further](../3_examples/1_direct). + +In order to support the ability to scale consumers horizontally, *NATS* supports the `queue group` functionality: +a message sent to `subject` will be processed by a random consumer from the `queue group` subscribed to this `subject`. +This approach allows you to increase the processing speed of `subject` by *N* times when starting *N* consumers with one group. diff --git a/docs/docs/en/nats/2_publishing.md b/docs/docs/en/nats/2_publishing.md new file mode 100644 index 00000000..65c87f49 --- /dev/null +++ b/docs/docs/en/nats/2_publishing.md @@ -0,0 +1,36 @@ +# NATS Publishing + +To send messages, `NatsBroker` uses the unified `publish` method. + +```python +import asyncio +from propan import NatsBroker + +async def pub(): + async with NatsBroker() as broker: + await broker.publish("Hi!", subject="test") + +asyncio.run(pub()) +``` + +## Basic arguments + +The `publish` method accepts the following arguments: + +* `message`: bytes | str | dict | Sequence[Any] | pydatic.BaseModel - message to send +* `subject`: str - *subject*, where the message will be sent. + +## Message Parameters + +* `headers`: dict[str, Any] | None = None - headers of the message being sent (used by consumers) + +## RPC arguments + +Also `publish` supports common arguments for creating [*RPC* queries](../../getting_started/4_broker/5_rpc/#_3): + +* `reply_to`: str = "" - which *channel* to send the response to (used for asynchronous RPC requests) +* `callback`: bool = False - whether to expect a response to the message +* `callback_timeout`: float | None = 30.0 - timeout waiting for a response. In the case of `None` - waits indefinitely +* `raise_timeout`: bool = False + * `False` - return None in case of timeout + * `True` - error `asyncio.TimeoutError` in case of timeout diff --git a/docs/docs/en/nats/2_routing.md b/docs/docs/en/nats/2_routing.md deleted file mode 100644 index 286ee419..00000000 --- a/docs/docs/en/nats/2_routing.md +++ /dev/null @@ -1,3 +0,0 @@ -# NATS Routing - -{! docs/en/helpful/in-progress.md !} diff --git a/docs/docs/en/nats/3_examples/1_direct.md b/docs/docs/en/nats/3_examples/1_direct.md new file mode 100644 index 00000000..92e277b5 --- /dev/null +++ b/docs/docs/en/nats/3_examples/1_direct.md @@ -0,0 +1,64 @@ +# Direct + +**Direct** Subject is the basic way to route messages in *NATS*. Its essence is very simple: +`subject` sends messages to all consumers subscribed to it. + +## Scaling + +If one `subject` is listening by several consumers with the same `queue group`, the message will go to a random consumer each time. + +Thus, *NATS* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: *NATS* will take care of how to distribute messages between your services. + +## Example + +**Direct** Subject is the type used in **Propan** by default: you can simply declare it as follows + +```python +@broker.handler("test_subject") +async def handler(): +... +``` + +Full example: + +```python linenums="1" +{!> docs_src/nats/direct.py !} +``` + +### Consumer Announcement + +To begin with, we have declared several consumers for two `subjects`: `test-subj-1` and `test-subj-2`: + +```python linenums="7" hl_lines="1 5 9" +{!> docs_src/nats/direct.py [ln:7-17]!} +``` + +!!! note + Note that all consumers are subscribed using the same `queue_group`: within the same service, this does not make sense, since messages will come to these handlers in turn. + Here we emulate the work of several consumers and load balancing between them. + +### Message distribution + +Now the distribution of messages between these consumers will look like this: + +```python +{!> docs_src/nats/direct.py [ln:21]!} +``` + +The message `1` will be sent to `handler1` or `handler2`, because they are listening to one `subject` within one `queue group` + +--- + +```python +{!> docs_src/nats/direct.py [ln:22]!} +``` + +Message `2` will be sent similarly to message `1` + +--- + +```python +{!> docs_src/nats/direct.py [ln:23]!} +``` + +The message `3` will be sent to `handler3`, because he is the only one listening to `test-subj-2` diff --git a/docs/docs/en/nats/3_examples/2_pattern.md b/docs/docs/en/nats/3_examples/2_pattern.md new file mode 100644 index 00000000..6826e744 --- /dev/null +++ b/docs/docs/en/nats/3_examples/2_pattern.md @@ -0,0 +1,55 @@ +# Pattern + +**Pattern** Subject is a powerful *NATS* routing engine. This type of `subject` messages to consumers by the *pattern* specified when they connect to `subject` and a message key. + +## Scaling + +If one `subject` is listening by several consumers with the same `queue group`, the message will go to a random consumer each time. + +Thus, *NATS* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: *NATS* will take care of how to distribute messages between your services. + +## Example + +```python linenums="1" +{!> docs_src/nats/pattern.py !} +``` + +### Consumer Announcement + +To begin with, we have announced several consumers for two `subjects`: `*.info` and `*.error`: + +```python linenums="7" hl_lines="1 5 9" +{!> docs_src/nats/pattern.py [ln:7-17]!} +``` + +At the same time, in the `subject` of our consumers, we specify the *pattern* that will be processed by these consumers. + +!!! note + Note that all consumers are subscribed using the same `queue_group`: within the same service, this does not make sense, since messages will come to these handlers in turn. + Here we emulate the work of several consumers and load balancing between them. + +### Message distribution + +Now the distribution of messages between these consumers will look like this: + +```python +{!> docs_src/nats/pattern.py [ln:21]!} +``` + +The message `1` will be sent to `handler1` or `handler2`, because they listen to the same `subject` template within the same `queue group` + +--- + +```python +{!> docs_src/nats/pattern.py [ln:22]!} +``` + +Message `2` will be sent similarly to message `1` + +--- + +```python +{!> docs_src/nats/pattern.py [ln:23]!} +``` + +The message `3` will be sent to `handler3`, because it is the only one listening to the pattern `*.error*` diff --git a/docs/docs/en/nats/3_publishing.md b/docs/docs/en/nats/3_publishing.md deleted file mode 100644 index d3245745..00000000 --- a/docs/docs/en/nats/3_publishing.md +++ /dev/null @@ -1,3 +0,0 @@ -# NATS Publishing - -{! docs/en/helpful/in-progress.md !} diff --git a/docs/docs/en/redis/1_redis-index.md b/docs/docs/en/redis/1_redis-index.md index a6324f50..6695c43f 100644 --- a/docs/docs/en/redis/1_redis-index.md +++ b/docs/docs/en/redis/1_redis-index.md @@ -8,7 +8,7 @@ to include a new heavy dependency (*Kafka*, *RabbitMQ*, *Nats*, etc.) to your in *Redis* works fast, does not degrade with a large number of messages, and most importantly - you already have it! !!! note -More information about the documentation of *Redis Pub/Sub* can be found at [official site](https://redis.io/docs/manual/pubsub/#messages-matching-both-a-pattern-and-a-channel-subscription){.external- link target="_blank"} + More information about *Redis Pub/Sub* can be found at [official website](https://redis.io/docs/manual/pubsub/#messages-matching-both-a-pattern-and-a-channel-subscription){.external- link target="_blank"} However, *Redis* as a message broker has some important disadvantages: @@ -47,4 +47,4 @@ This is necessary for the correct recognition of the *content-type* of the incom If **Propan** receives a message sent using another library or framework (or just a message in a different format), the entire body of this message will be perceived as the `data` field of the received message, and the `content-type` will be recognized automatically. -At the same time, **RPC** requests will not work, since there is no `reply_to` field in the incoming message. \ No newline at end of file +At the same time, **RPC** requests will not work, since there is no `reply_to` field in the incoming message. diff --git a/docs/docs/en/redis/3_examples/1_direct.md b/docs/docs/en/redis/3_examples/1_direct.md index 771af738..510cd84f 100644 --- a/docs/docs/en/redis/3_examples/1_direct.md +++ b/docs/docs/en/redis/3_examples/1_direct.md @@ -4,7 +4,7 @@ ## Scaling -If one channel is listened by several consumers, the message will be received by **all** consumers of this channel. +If one channel is listening by several consumers, the message will be received by **all** consumers of this channel. Thus, horizontal scaling by increasing the number of consumer services is not possible only using *Redis Pub/Sub*. If you need similar functionality, look for *Redis Streams* or other brokers (for example, *Nats* or *RabbitMQ*). diff --git a/docs/docs/en/redis/3_examples/2_pattern.md b/docs/docs/en/redis/3_examples/2_pattern.md index 85629ce3..d564291d 100644 --- a/docs/docs/en/redis/3_examples/2_pattern.md +++ b/docs/docs/en/redis/3_examples/2_pattern.md @@ -1,7 +1,7 @@ # Pattern **Pattern** Channel is a powerful *Redis* routing engine. This type of `channel` sends messages to consumers by the *pattern* -specified when they connect to the `channel` and the message key. +specified when they connect to the `channel` and a message key. ## Scaling diff --git a/docs/docs/ru/getting_started/4_broker/2_routing.md b/docs/docs/ru/getting_started/4_broker/2_routing.md index 8de061ae..8302627a 100644 --- a/docs/docs/ru/getting_started/4_broker/2_routing.md +++ b/docs/docs/ru/getting_started/4_broker/2_routing.md @@ -14,8 +14,9 @@ async def base_handler(body: str): Чтобы узнать подробнее о поведении специализированных брокеров перейдите в следующие разделы: -* [RabbitBroker](../../../rabbit/1_routing) -* [NatsBroker](../../../nats/2_routing) +* [RabbitBroker](../../../rabbit/1_routing/#routing-rules) +* [NatsBroker](../../../nats/1_nats-index/#routing-rules) +* [RedisBroker](../../../redis/1_redis-index/#routing-rules) ## Обработка ошибок diff --git a/docs/docs/ru/nats/1_nats-index.md b/docs/docs/ru/nats/1_nats-index.md index 1ba9afec..6b53dcf6 100644 --- a/docs/docs/ru/nats/1_nats-index.md +++ b/docs/docs/ru/nats/1_nats-index.md @@ -1,3 +1,26 @@ # NATS -{! docs/ru/helpful/in-progress.md !} +## Преимущества и недостатки + +**NATS** - простой в использовании высокопроизводительный брокер сообщений, написанный на *Golang*. Если ваше приложение не требует сложной логики маршрутизации, должно справляться с высокой нагрузкой, просто масштабироваться и не требовать больших затрат на железо, *NATS* станет для вас отличным выбором. + +!!! note + Подробнее с *NATS* вы можете ознакомиться на [официальном сайте](https://nats.io){.external-link target="_blank"} + +Однако, у *NATS* есть и недостатки, о которых вам стоит знать: + +* Сообщения не персистентны. Если сообщение будет опубликовано пока ваш потребитель отключен - оно будет потеряно. +* Отстутствуют механизмы сложной маршрутизации. +* Отстутствуют механизмы подтверждения получения и обработки сообщений со стороны потребителя. + +Однако, эти недостатки исправляются использованием персистентного уровня - *JetStream*. Если вам нужны строгие гарантии доставки и обработки сообщений в небольшой ущерб скорости работы и потребляемым ресурсам, вы можете воспользоваться *NatsJS*. + +## Правила маршрутизации + +*NATS* не обладает возможностью настраивать сложные правила маршрутизации. Единственной сущностью в *NATS* является `subject`, на который можно подписаться либо напрямую по имени, либо по паттерну регулярного выражения. + +Оба примера рассмотрены [чуть далее](../3_examples/1_direct). + +С целью поддержки возможностей горизонтально масштабировать потребителей, *NATS* поддерживает функционал `queue group`: +сообщение, отправленное в `subject` будет обработано случайным потребителем из `queue group`, подписанной на этот `subject`. +Такой подход позволяет увеличить скорость обработки `subject` в *N* раз при запуске *N* потребителей с одной группой. diff --git a/docs/docs/ru/nats/2_publishing.md b/docs/docs/ru/nats/2_publishing.md new file mode 100644 index 00000000..6dd2a1d9 --- /dev/null +++ b/docs/docs/ru/nats/2_publishing.md @@ -0,0 +1,36 @@ +# NATS Publishing + +Для отправки сообщений `NatsBroker` использует унифицированный метод `publish`. + +```python +import asyncio +from propan import NatsBroker + +async def pub(): + async with NatsBroker() as broker: + await broker.publish("Hi!", subject="test") + +asyncio.run(pub()) +``` + +## Базовые аргументы + +Метод `publish` принимает следующие аргументы: + +* `message`: bytes | str | dict | Sequence[Any] | pydatic.BaseModel - сообщение для отправки +* `subject`: str - *subject*, куда будет отправлено сообщение. + +## Параметры сообщения + +* `headers`: dict[str, Any] | None = None - заголовки отправляемого сообщения (используются потребителями) + +## RPC аргументы + +Также `publish` поддерживает общие аргументы для создания [*RPC* запросов](../../getting_started/4_broker/5_rpc/#_3): + +* `reply_to`: str = "" - в какой *channel* отправить ответ (используется для асинхронных RPC запросов) +* `callback`: bool = False - ожидать ли ответ на сообщение +* `callback_timeout`: float | None = 30.0 - таймаут ожидания ответа. В случае `None` - ждет бесконечно +* `raise_timeout`: bool = False + * `False` - возвращать None в случае таймаута + * `True` - ошибка `asyncio.TimeoutError` в случае таймаута diff --git a/docs/docs/ru/nats/2_routing.md b/docs/docs/ru/nats/2_routing.md deleted file mode 100644 index c2bda84c..00000000 --- a/docs/docs/ru/nats/2_routing.md +++ /dev/null @@ -1,3 +0,0 @@ -# NATS Routing - -{! docs/ru/helpful/in-progress.md !} diff --git a/docs/docs/ru/nats/3_examples/1_direct.md b/docs/docs/ru/nats/3_examples/1_direct.md new file mode 100644 index 00000000..600841da --- /dev/null +++ b/docs/docs/ru/nats/3_examples/1_direct.md @@ -0,0 +1,64 @@ +# Direct + +**Direct** Subject - базовый способ маршрутизации сообщений в *NATS*. Его суть очень проста: +`subject` отправляет сообщения всем потребителям, подписанным на него. + +## Масштабирование + +Если один `subject` слушает несколько потребителей с одинаковой `queue group`, сообщение будет уходить каждый раз случайному потребителю. + +Таким образом, *NATS* может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: *NATS* сам позаботится о том, как распределить сообщения между вашими сервисами. + +## Пример + +**Direct** Subject - тип, используемый в **Propan** по умолчанию: вы можете просто объявить его следующим образом + +```python +@broker.handler("test_subject") +async def handler(): + ... +``` + +Полный пример: + +```python linenums="1" +{!> docs_src/nats/direct.py !} +``` + +### Объявление потребителей + +Для начала мы объявили несколько потребителей для двух `subject`: `test-subj-1` и `test-subj-2`: + +```python linenums="7" hl_lines="1 5 9" +{!> docs_src/nats/direct.py [ln:7-17]!} +``` + +!!! note + Обратите внимание, что все потребители подписаны с использованием одной `queue_group`: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. + Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними. + +### Распределение сообщений + +Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом: + +```python +{!> docs_src/nats/direct.py [ln:21]!} +``` + +Сообщение `1` будет отправлено в `handler1` или `handler2`, т.к. они слушают один `subject` в рамках одной `queue group` + +--- + +```python +{!> docs_src/nats/direct.py [ln:22]!} +``` + +Сообщение `2` будет отправлено аналогично сообщению `1` + +--- + +```python +{!> docs_src/nats/direct.py [ln:23]!} +``` + +Сообщение `3` будет отправлено в `handler3`, т.к. он единственный слушает `test-subj-2` \ No newline at end of file diff --git a/docs/docs/ru/nats/3_examples/2_pattern.md b/docs/docs/ru/nats/3_examples/2_pattern.md new file mode 100644 index 00000000..d3de51a1 --- /dev/null +++ b/docs/docs/ru/nats/3_examples/2_pattern.md @@ -0,0 +1,56 @@ +# Pattern + +**Pattern** Subject - мощный механизм маршрутизации *NATS*. Данный тип `subject` отправляет сообщения потребителям в соответсвии с *паттерном*, +указанном при их подключении к `subject` и ключом самого сообщения. + +## Масштабирование + +Если один `subject` слушает несколько потребителей с одинаковой `queue group`, сообщение будет уходить каждый раз случайному потребителю. + +Таким образом, *NATS* может самостоятельно балансировать нагрузку на потребителей очереди. Вы можете увеличить скорость обработки потока сообщений из очереди просто запустив дополнительные инстансы сервиса-потребителя. Вам не нужно вносить изменений в текущую конфигурацию инфраструктуры: *NATS* сам позаботится о том, как распределить сообщения между вашими сервисами. + +## Пример + +```python linenums="1" +{!> docs_src/nats/pattern.py !} +``` + +### Объявление потребителей + +Для начала мы объявили несколько потребителей для двух `subject`: `*.info` и `*.error`: + +```python linenums="7" hl_lines="1 5 9" +{!> docs_src/nats/pattern.py [ln:7-17]!} +``` + +При этом в `subject` наших потребителей мы указываем *паттерн*, который будут обрабатываться этими потребителемя. + +!!! note + Обратите внимание, что все потребители подписаны с использованием одной `queue_group`: в рамках одного сервиса это не имеет смысла, так как сообщения будут приходить в эти обработчики поочередно. + Здесь мы эмулируем работу несколько потребителей и балансировку нагрузки между ними. + +### Распределение сообщений + +Теперь распределение сообщений между этими потребителями будет выглядеть следующим образом: + +```python +{!> docs_src/nats/pattern.py [ln:21]!} +``` + +Сообщение `1` будет отправлено в `handler1` или `handler2`, т.к. они слушают один шаблон `subject` в рамках одной `queue group` + +--- + +```python +{!> docs_src/nats/pattern.py [ln:22]!} +``` + +Сообщение `2` будет отправлено аналогично сообщению `1` + +--- + +```python +{!> docs_src/nats/pattern.py [ln:23]!} +``` + +Сообщение `3` будет отправлено в `handler3`, т.к. он единственный слушает шаблон `*.error*` \ No newline at end of file diff --git a/docs/docs/ru/nats/3_publishing.md b/docs/docs/ru/nats/3_publishing.md deleted file mode 100644 index 5e09709c..00000000 --- a/docs/docs/ru/nats/3_publishing.md +++ /dev/null @@ -1,3 +0,0 @@ -# NATS Publishing - -{! docs/ru/helpful/in-progress.md !} diff --git a/docs/docs_src/nats/direct.py b/docs/docs_src/nats/direct.py new file mode 100644 index 00000000..4dd5f692 --- /dev/null +++ b/docs/docs_src/nats/direct.py @@ -0,0 +1,23 @@ +from propan import PropanApp, NatsBroker +from propan.annotations import Logger + +broker = NatsBroker() +app = PropanApp(broker) + +@broker.handle("test-subj-1", "workers") +async def base_handler1(logger: Logger): + logger.info("base_handler1") + +@broker.handle("test-subj-1", "workers") +async def base_handler2(logger: Logger): + logger.info("base_handler2") + +@broker.handle("test-subj-2", "workers") +async def base_handler3(logger: Logger): + logger.info("base_handler3") + +@app.after_startup +async def send_messages(): + await broker.publish("", "test-subj-1") # handlers: 1 or 2 + await broker.publish("", "test-subj-1") # handlers: 1 or 2 + await broker.publish("", "test-subj-2") # handlers: 3 diff --git a/docs/docs_src/nats/pattern.py b/docs/docs_src/nats/pattern.py new file mode 100644 index 00000000..e00c4b02 --- /dev/null +++ b/docs/docs_src/nats/pattern.py @@ -0,0 +1,23 @@ +from propan import PropanApp, NatsBroker +from propan.annotations import Logger + +broker = NatsBroker() +app = PropanApp(broker) + +@broker.handle("*.info", "workers") +async def base_handler1(logger: Logger): + logger.info("base_handler1") + +@broker.handle("*.info", "workers") +async def base_handler2(logger: Logger): + logger.info("base_handler2") + +@broker.handle("*.error", "workers") +async def base_handler3(logger: Logger): + logger.info("base_handler3") + +@app.after_startup +async def send_messages(): + await broker.publish("", "logs.info") # handlers: 1 or 2 + await broker.publish("", "logs.info") # handlers: 1 or 2 + await broker.publish("", "logs.error") # handlers: 3 diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index c0998a0a..e798912b 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -150,8 +150,10 @@ nav: - SQS Basics: sqs/1_sqs-index.md - Nats: - Nats Basics: nats/1_nats-index.md - - Routing: nats/2_routing.md - - Nats Publishing: nats/3_publishing.md + - Nats Publishing: nats/2_publishing.md + - Examples: + - Direct: nats/3_examples/1_direct.md + - Pattern: nats/3_examples/2_pattern.md - Integrations: integrations/1_integrations-index.md - FastAPI Plugin: integrations/2_fastapi-plugin.md - Contributing: diff --git a/propan/__about__.py b/propan/__about__.py index 69dfaa6d..239bbed7 100644 --- a/propan/__about__.py +++ b/propan/__about__.py @@ -2,7 +2,7 @@ from unittest.mock import Mock -__version__ = "0.1.2.10" +__version__ = "0.1.2.12" INSTALL_MESSAGE = ( diff --git a/propan/brokers/_model/broker_usecase.py b/propan/brokers/_model/broker_usecase.py index f4b29fb3..9f469bb3 100644 --- a/propan/brokers/_model/broker_usecase.py +++ b/propan/brokers/_model/broker_usecase.py @@ -18,6 +18,7 @@ from fast_depends.construct import get_dependant from fast_depends.model import Dependant +from fast_depends.utils import args_to_kwargs from pydantic.fields import ModelField from typing_extensions import Self @@ -47,7 +48,7 @@ Wrapper, ) from propan.utils import apply_types, context -from propan.utils.functions import to_async +from propan.utils.functions import get_function_arguments, to_async T = TypeVar("T") @@ -90,13 +91,19 @@ def __init__( async def connect(self, *args: Any, **kwargs: Any) -> Any: if self._connection is None: - _args = args or self._connection_args - _kwargs = kwargs or self._connection_kwargs - self._connection = await self._connect(*_args, **_kwargs) + arguments = get_function_arguments(self.__init__) # type: ignore + init_kwargs = args_to_kwargs( + arguments, + *self._connection_args, + **self._connection_kwargs, + ) + connect_kwargs = args_to_kwargs(arguments, *args, **kwargs) + _kwargs = {**init_kwargs, **connect_kwargs} + self._connection = await self._connect(**_kwargs) return self._connection @abstractmethod - async def _connect(self, *args: Any, **kwargs: Any) -> Any: + async def _connect(self, **kwargs: Any) -> Any: raise NotImplementedError() @abstractmethod diff --git a/propan/brokers/kafka/kafka_broker.py b/propan/brokers/kafka/kafka_broker.py index 5dbc8e34..bd71ad70 100644 --- a/propan/brokers/kafka/kafka_broker.py +++ b/propan/brokers/kafka/kafka_broker.py @@ -57,11 +57,9 @@ def __init__( async def _connect( self, - bootstrap_servers: Union[str, List[str]] = "localhost", **kwargs: Any, ) -> AIOKafkaConsumer: kwargs["client_id"] = kwargs.get("client_id", "propan-" + __version__) - kwargs["bootstrap_servers"] = bootstrap_servers producer = AIOKafkaProducer(**kwargs) context.set_global("producer", producer) diff --git a/propan/brokers/kafka/kafka_broker.pyi b/propan/brokers/kafka/kafka_broker.pyi index c1d3a1cd..4068aa44 100644 --- a/propan/brokers/kafka/kafka_broker.pyi +++ b/propan/brokers/kafka/kafka_broker.pyi @@ -86,8 +86,8 @@ class KafkaBroker(BrokerUsecase): ) -> None: ... async def connect( self, - bootstrap_servers: Union[str, List[str]] = "localhost", *, + bootstrap_servers: Union[str, List[str]] = "localhost", # both loop: Optional[AbstractEventLoop] = None, client_id: str = "propan-" + __version__, diff --git a/propan/brokers/nats/nats_broker.py b/propan/brokers/nats/nats_broker.py index 523f2a85..e26f48fe 100644 --- a/propan/brokers/nats/nats_broker.py +++ b/propan/brokers/nats/nats_broker.py @@ -28,6 +28,7 @@ class NatsBroker(BrokerUsecase): def __init__( self, +<<<<<<< HEAD servers: Union[str, Sequence[str]] = ("nats://localhost:4222",), *args: Any, log_fmt: Optional[str] = None, @@ -40,6 +41,14 @@ def __init__( url_=servers, **kwargs, ) +======= + servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 + *, + log_fmt: Optional[str] = None, + **kwargs: AnyDict, + ) -> None: + super().__init__(servers, log_fmt=log_fmt, **kwargs) +>>>>>>> ffe6421e3859303b0ecb0c74654b5b53e06e576b self._connection = None @@ -49,7 +58,7 @@ def __init__( async def _connect( self, - *args: Any, + *, url: Optional[str] = None, error_cb: Optional[ErrorCallback] = None, reconnected_cb: Optional[Callback] = None, @@ -58,7 +67,6 @@ async def _connect( if url is not None: kwargs["servers"] = kwargs.pop("servers", []) + [url] return await nats.connect( - *args, error_cb=self.log_connection_broken(error_cb), reconnected_cb=self.log_reconnected(reconnected_cb), **kwargs, @@ -104,7 +112,11 @@ async def start(self) -> None: c = self._get_log_context(None, handler.subject, handler.queue) self._log(f"`{func.__name__}` waiting for messages", extra=c) - sub = await self._connection.subscribe(handler.subject, cb=func) + sub = await self._connection.subscribe( + subject=handler.subject, + queue=handler.queue, + cb=func, + ) handler.subscription = sub async def publish( diff --git a/propan/brokers/nats/nats_broker.pyi b/propan/brokers/nats/nats_broker.pyi index de382b83..318e0c08 100644 --- a/propan/brokers/nats/nats_broker.pyi +++ b/propan/brokers/nats/nats_broker.pyi @@ -38,6 +38,7 @@ class NatsBroker(BrokerUsecase): def __init__( self, servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 + *, error_cb: Optional[ErrorCallback] = None, disconnected_cb: Optional[Callback] = None, closed_cb: Optional[Callback] = None, @@ -68,7 +69,6 @@ class NatsBroker(BrokerUsecase): inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, pending_size: int = DEFAULT_PENDING_SIZE, flush_timeout: Optional[float] = None, - *, logger: Optional[logging.Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = None, @@ -76,6 +76,7 @@ class NatsBroker(BrokerUsecase): ) -> None: ... async def connect( self, + *, servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 error_cb: Optional[ErrorCallback] = None, disconnected_cb: Optional[Callback] = None, diff --git a/propan/brokers/rabbit/rabbit_broker.py b/propan/brokers/rabbit/rabbit_broker.py index 5e713c51..928f2044 100644 --- a/propan/brokers/rabbit/rabbit_broker.py +++ b/propan/brokers/rabbit/rabbit_broker.py @@ -1,11 +1,12 @@ import asyncio from functools import wraps -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union +from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union from uuid import uuid4 import aio_pika import aiormq from aio_pika.abc import DeliveryMode +from yarl import URL from propan.brokers._model import BrokerUsecase from propan.brokers._model.schemas import PropanMessage @@ -23,19 +24,29 @@ class RabbitBroker(BrokerUsecase): handlers: List[Handler] _connection: Optional[aio_pika.RobustConnection] _channel: Optional[aio_pika.RobustChannel] + _queues: List[ + aio_pika.RobustQueue + ] # save queues to shield aio-pika WeakRef from GC __max_queue_len: int __max_exchange_len: int def __init__( self, +<<<<<<< HEAD url: Optional[str] = None, *args: Tuple[Any, ...], protocol: str = "amqp", consumers: Optional[int] = None, +======= + url: Union[str, URL, None] = None, + *, +>>>>>>> ffe6421e3859303b0ecb0c74654b5b53e06e576b log_fmt: Optional[str] = None, + consumers: Optional[int] = None, **kwargs: AnyDict, ) -> None: +<<<<<<< HEAD super().__init__( url, *args, @@ -44,12 +55,16 @@ def __init__( url_=url, **kwargs, ) +======= + super().__init__(url, log_fmt=log_fmt, **kwargs) +>>>>>>> ffe6421e3859303b0ecb0c74654b5b53e06e576b self._max_consumers = consumers self._channel = None self.__max_queue_len = 4 self.__max_exchange_len = 4 + self._queues = [] async def close(self) -> None: if self._channel is not None: @@ -62,11 +77,10 @@ async def close(self) -> None: async def _connect( self, - *args: Any, **kwargs: Any, ) -> aio_pika.RobustConnection: connection = await aio_pika.connect_robust( - *args, **kwargs, loop=asyncio.get_event_loop() + **kwargs, loop=asyncio.get_event_loop() ) if self._channel is None: # pragma: no branch @@ -130,6 +144,7 @@ async def start(self) -> None: self._log(f"`{func.__name__}` waiting for messages", extra=c) await queue.consume(func) + self._queues.append(queue) async def publish( self, diff --git a/propan/brokers/rabbit/rabbit_broker.pyi b/propan/brokers/rabbit/rabbit_broker.pyi index 74ba0086..72ae019b 100644 --- a/propan/brokers/rabbit/rabbit_broker.pyi +++ b/propan/brokers/rabbit/rabbit_broker.pyi @@ -30,6 +30,7 @@ class RabbitBroker(BrokerUsecase): def __init__( self, url: Union[str, URL, None] = None, + *, host: str = "localhost", port: int = 5672, login: str = "guest", @@ -40,7 +41,6 @@ class RabbitBroker(BrokerUsecase): ssl_context: Optional[SSLContext] = None, timeout: aio_pika.abc.TimeoutType = None, client_properties: Optional[FieldTable] = None, - *, logger: Optional[logging.Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = None, @@ -78,6 +78,7 @@ class RabbitBroker(BrokerUsecase): """ async def connect( self, + *, url: Union[str, URL, None] = None, host: str = "localhost", port: int = 5672, diff --git a/propan/brokers/redis/redis_broker.py b/propan/brokers/redis/redis_broker.py index dd846c5c..2b2859a2 100644 --- a/propan/brokers/redis/redis_broker.py +++ b/propan/brokers/redis/redis_broker.py @@ -1,7 +1,7 @@ import asyncio import logging from functools import wraps -from typing import Any, Callable, Coroutine, Dict, List, NoReturn, Optional, TypeVar +from typing import Any, Callable, Dict, List, NoReturn, Optional, TypeVar from uuid import uuid4 from redis.asyncio.client import PubSub, Redis @@ -32,8 +32,8 @@ class RedisBroker(BrokerUsecase): def __init__( self, url: str = "redis://localhost:6379", - polling_interval: float = 1.0, *, + polling_interval: float = 1.0, log_fmt: Optional[str] = None, **kwargs: Any, ) -> None: @@ -51,16 +51,6 @@ async def _connect( pool = ConnectionPool(**url_options) return Redis(connection_pool=pool) - async def connect( - self, - url: Optional[str] = None, - *args: Any, - **kwargs: Any, - ) -> Coroutine[Any, Any, Any]: - if url is not None: - kwargs["url"] = url - return await super().connect(*args, **kwargs) - async def close(self) -> None: for h in self.handlers: if h.task is not None: # pragma: no branch diff --git a/propan/brokers/redis/redis_broker.pyi b/propan/brokers/redis/redis_broker.pyi index d34d4899..19c04854 100644 --- a/propan/brokers/redis/redis_broker.pyi +++ b/propan/brokers/redis/redis_broker.pyi @@ -21,8 +21,8 @@ class RedisBroker(BrokerUsecase): def __init__( self, url: str = "redis://localhost:6379", - polling_interval: float = 1.0, *, + polling_interval: float = 1.0, host: str = "localhost", port: Union[str, int] = 6379, username: Optional[str] = None, @@ -70,6 +70,7 @@ class RedisBroker(BrokerUsecase): """ async def connect( self, + *, url: str = "redis://localhost:6379", host: str = "localhost", port: Union[str, int] = 6379, diff --git a/propan/brokers/sqs/sqs_broker.py b/propan/brokers/sqs/sqs_broker.py index acc86b23..a934fd98 100644 --- a/propan/brokers/sqs/sqs_broker.py +++ b/propan/brokers/sqs/sqs_broker.py @@ -63,7 +63,7 @@ def __init__( self.response_queue = response_queue self.response_callbacks = {} - async def _connect(self, url: Optional[str] = None, **kwargs: Any) -> AioBaseClient: + async def _connect(self, *, url: str, **kwargs: Any) -> AioBaseClient: session = get_session() client: AioBaseClient = await session._create_client( service_name="sqs", endpoint_url=url, **kwargs diff --git a/propan/brokers/sqs/sqs_broker.pyi b/propan/brokers/sqs/sqs_broker.pyi index 7ac9536b..2a1cf078 100644 --- a/propan/brokers/sqs/sqs_broker.pyi +++ b/propan/brokers/sqs/sqs_broker.pyi @@ -55,8 +55,8 @@ class SQSBroker(BrokerUsecase): """""" async def connect( self, - url: str = "http://localhost:9324/", *, + url: str = "http://localhost:9324/", region_name: Optional[str] = None, api_version: Optional[str] = None, use_ssl: bool = True, diff --git a/propan/fastapi/kafka/router.pyi b/propan/fastapi/kafka/router.pyi index 626f1092..ca006ae1 100644 --- a/propan/fastapi/kafka/router.pyi +++ b/propan/fastapi/kafka/router.pyi @@ -26,7 +26,13 @@ from propan.types import AnyCallable Partition = TypeVar("Partition") +<<<<<<< HEAD +class KafkaRouter(PropanRouter): + broker: KafkaBroker + +======= class KafkaRouter(PropanRouter[KafkaBroker]): +>>>>>>> 1acd1a468477ed27a085d3d1c6f38d967e64f1f9 def __init__( self, bootstrap_servers: Union[str, List[str]] = "localhost", diff --git a/propan/fastapi/nats/router.pyi b/propan/fastapi/nats/router.pyi index b52a5dca..b09191e7 100644 --- a/propan/fastapi/nats/router.pyi +++ b/propan/fastapi/nats/router.pyi @@ -32,7 +32,13 @@ from propan.fastapi.core.router import PropanRouter from propan.log import access_logger from propan.types import AnyCallable +<<<<<<< HEAD +class NatsRouter(PropanRouter): + broker: NatsBroker + +======= class NatsRouter(PropanRouter[NatsBroker]): +>>>>>>> 1acd1a468477ed27a085d3d1c6f38d967e64f1f9 def __init__( self, servers: Union[str, List[str]] = ["nats://localhost:4222"], # noqa: B006 diff --git a/propan/fastapi/rabbit/router.pyi b/propan/fastapi/rabbit/router.pyi index df667cc6..98b42cb0 100644 --- a/propan/fastapi/rabbit/router.pyi +++ b/propan/fastapi/rabbit/router.pyi @@ -19,7 +19,13 @@ from propan.fastapi.core import PropanRouter from propan.log import access_logger from propan.types import AnyCallable +<<<<<<< HEAD +class RabbitRouter(PropanRouter): + broker: RabbitBroker + +======= class RabbitRouter(PropanRouter[RabbitBroker]): +>>>>>>> 1acd1a468477ed27a085d3d1c6f38d967e64f1f9 def __init__( self, host: str = "localhost", diff --git a/propan/fastapi/redis/router.pyi b/propan/fastapi/redis/router.pyi index 835b27a7..6042754c 100644 --- a/propan/fastapi/redis/router.pyi +++ b/propan/fastapi/redis/router.pyi @@ -16,7 +16,13 @@ from propan.fastapi.core.router import PropanRouter from propan.log import access_logger from propan.types import AnyCallable +<<<<<<< HEAD +class RedisRouter(PropanRouter): + broker: RedisBroker + +======= class RedisRouter(PropanRouter[RedisBroker]): +>>>>>>> 1acd1a468477ed27a085d3d1c6f38d967e64f1f9 def __init__( self, url: str = "redis://localhost:6379", diff --git a/propan/fastapi/sqs/router.pyi b/propan/fastapi/sqs/router.pyi index a9d3e17a..673a54fc 100644 --- a/propan/fastapi/sqs/router.pyi +++ b/propan/fastapi/sqs/router.pyi @@ -17,7 +17,13 @@ from propan.fastapi.core.router import PropanRouter from propan.log import access_logger from propan.types import AnyCallable +<<<<<<< HEAD +class SQSRouter(PropanRouter): + broker: SQSBroker + +======= class SQSRouter(PropanRouter[SQSBroker]): +>>>>>>> 1acd1a468477ed27a085d3d1c6f38d967e64f1f9 def __init__( self, url: str = "http://localhost:9324/", diff --git a/propan/utils/classes.py b/propan/utils/classes.py index 54776ab8..bfd95884 100644 --- a/propan/utils/classes.py +++ b/propan/utils/classes.py @@ -1,14 +1,14 @@ from typing import Any -class Singlethon: - _instanse = None +class Singleton: + _instance = None - def __new__(cls, *args: Any, **kwargs: Any) -> "Singlethon": - if cls._instanse is None: - cls._instanse = super().__new__(cls) - return cls._instanse + def __new__(cls, *args: Any, **kwargs: Any) -> "Singleton": + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance @classmethod def _drop(cls) -> None: - cls._instanse = None + cls._instance = None diff --git a/propan/utils/context/main.py b/propan/utils/context/main.py index 988a2fca..0b08a559 100644 --- a/propan/utils/context/main.py +++ b/propan/utils/context/main.py @@ -2,12 +2,12 @@ from contextvars import ContextVar, Token from typing import Any, Dict, Iterator, TypeVar -from propan.utils.classes import Singlethon +from propan.utils.classes import Singleton T = TypeVar("T") -class ContextRepo(Singlethon): +class ContextRepo(Singleton): _global_context: Dict[str, Any] _scope_context: Dict[str, ContextVar[Any]] diff --git a/propan/utils/functions.py b/propan/utils/functions.py index 20c7f591..9dc276cc 100644 --- a/propan/utils/functions.py +++ b/propan/utils/functions.py @@ -1,5 +1,6 @@ +import inspect from functools import wraps -from typing import Awaitable, Callable, TypeVar, cast +from typing import Awaitable, Callable, List, TypeVar, cast from fast_depends.injector import run_async as call_or_await from typing_extensions import ParamSpec @@ -9,7 +10,6 @@ "to_async", ) - T = TypeVar("T") P = ParamSpec("P") @@ -21,3 +21,16 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: return cast(T, r) return wrapper + + +def get_function_arguments(func: Callable[P, T]) -> List[str]: + signature = inspect.signature(func) + + arg_kinds = [ + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ] + + return [ + param.name for param in signature.parameters.values() if param.kind in arg_kinds + ] diff --git a/tests/brokers/base/connection.py b/tests/brokers/base/connection.py index 8b1847e5..fa91c7b7 100644 --- a/tests/brokers/base/connection.py +++ b/tests/brokers/base/connection.py @@ -37,3 +37,11 @@ async def test_connect_by_url_priority(self, settings): assert await broker.connect(*args, **kwargs) assert await self.ping(broker) await broker.close() + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + args, kwargs = self.get_broker_args(settings) + broker = self.broker(*args) + assert await broker.connect(**kwargs) + assert await self.ping(broker) + await broker.close() diff --git a/tests/brokers/kafka/test_connect.py b/tests/brokers/kafka/test_connect.py index fae177cc..480d9f8a 100644 --- a/tests/brokers/kafka/test_connect.py +++ b/tests/brokers/kafka/test_connect.py @@ -7,3 +7,9 @@ @pytest.mark.kafka class TestKafkaConnect(BrokerConnectionTestcase): broker = KafkaBroker + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(bootstrap_servers=settings.url) + await broker.close() diff --git a/tests/brokers/nats/test_connect.py b/tests/brokers/nats/test_connect.py index f82c3048..1cfa4ab0 100644 --- a/tests/brokers/nats/test_connect.py +++ b/tests/brokers/nats/test_connect.py @@ -7,3 +7,9 @@ @pytest.mark.nats class TestNatsConnect(BrokerConnectionTestcase): broker = NatsBroker + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(servers=settings.url) + await broker.close() diff --git a/tests/brokers/rabbit/test_connect.py b/tests/brokers/rabbit/test_connect.py index d35bd9a4..0613bd53 100644 --- a/tests/brokers/rabbit/test_connect.py +++ b/tests/brokers/rabbit/test_connect.py @@ -29,3 +29,20 @@ async def test_connection_by_params(self, settings): port=settings.port, ) await broker.close() + + @pytest.mark.asyncio + async def test_connect_merge_kwargs_with_priority(self, settings): + broker = self.broker(host="fake-host", port=5677) # kwargs will be ignored + assert await broker.connect( + host=settings.host, + login=settings.login, + password=settings.password, + port=settings.port, + ) + await broker.close() + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(url=settings.url) + await broker.close() diff --git a/tests/brokers/redis/test_connect.py b/tests/brokers/redis/test_connect.py index 06d3e6ae..bb1fbc09 100644 --- a/tests/brokers/redis/test_connect.py +++ b/tests/brokers/redis/test_connect.py @@ -20,3 +20,18 @@ async def test_init_connect_by_raw_data(self, settings): port=settings.port, ) as broker: assert await self.ping(broker) + + @pytest.mark.asyncio + async def test_connect_merge_kwargs_with_priority(self, settings): + broker = self.broker(host="fake-host", port=6377) # kwargs will be ignored + assert await broker.connect( + host=settings.host, + port=settings.port, + ) + await broker.close() + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(url=settings.url) + await broker.close() diff --git a/tests/brokers/sqs/test_connect.py b/tests/brokers/sqs/test_connect.py index 1b7f9c3c..f8cf02eb 100644 --- a/tests/brokers/sqs/test_connect.py +++ b/tests/brokers/sqs/test_connect.py @@ -15,3 +15,10 @@ def get_broker_args(self, settings): "region_name": settings.region_name, "config": AioConfig(signature_version=UNSIGNED), } + + @pytest.mark.asyncio + async def test_connect_merge_args_and_kwargs(self, settings): + args, kwargs = self.get_broker_args(settings) + broker = self.broker("fake-url") # will be ignored + assert await broker.connect(url=settings.url, **kwargs) + await broker.close() diff --git a/tests/tools/__init__.py b/tests/tools/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/tools/marks.py b/tests/tools/marks.py deleted file mode 100644 index 600626c6..00000000 --- a/tests/tools/marks.py +++ /dev/null @@ -1,13 +0,0 @@ -import sys - -import pytest - -needs_py310 = pytest.mark.skipif( - sys.version_info < (3, 10), reason="requires python3.10+" -) - -needs_py38 = pytest.mark.skipif(sys.version_info < (3, 8), reason="requires python3.8+") - -needs_ex_py37 = pytest.mark.skipif( - sys.version_info == (3, 8), reason="requires python3.7" -) diff --git a/tests/utils/test_classes.py b/tests/utils/test_classes.py index 2ccf319a..51eb158c 100644 --- a/tests/utils/test_classes.py +++ b/tests/utils/test_classes.py @@ -1,11 +1,11 @@ -from propan.utils.classes import Singlethon +from propan.utils.classes import Singleton -def test_singlethon(): - assert Singlethon() is Singlethon() +def test_singleton(): + assert Singleton() is Singleton() def test_drop(): - s1 = Singlethon() + s1 = Singleton() s1._drop() - assert Singlethon() is not s1 + assert Singleton() is not s1