Skip to content

Commit

Permalink
SQS Beta release
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 28, 2023
1 parent 4aa83c6 commit 17cb2b1
Show file tree
Hide file tree
Showing 73 changed files with 1,321 additions and 75 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ jobs:
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
ALLOW_PLAINTEXT_LISTENER: "true"

sqs:
image: softwaremill/elasticmq-native
ports:
- 9324:9324

nats:
image: nats
ports:
Expand Down
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ services:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- ALLOW_PLAINTEXT_LISTENER=yes

sqs:
image: softwaremill/elasticmq-native
ports:
- 9324:9324
```
```bash
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ It is a modern, high-level framework on top of popular specific Python brokers l
| **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
Expand Down Expand Up @@ -144,6 +144,8 @@ pip install "propan[async-nats]"
pip install "propan[async-redis]"
# or
pip install "propan[async-kafka]"
# or
pip install "propan[async-sqs]"
```

### Basic usage
Expand All @@ -155,10 +157,12 @@ from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker
# from propan import SQSBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")
# broker = SQSBroker("http://localhost:9324", ...)

app = PropanApp(broker)

Expand Down
18 changes: 18 additions & 0 deletions docs/docs/en/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# CHANGELOG

## 2023-05-28 **0.1.2.3** SQS Beta

**Propan** added support for *SQS* as a message broker. This functionality is full tested.

*SQSBroker* supports:

* message delivery
* test client, without the need to run *ElasticMQ* or connect to cloud *SQS*
* *FastAPI* Plugin

*SQSBroker* not supports **RPC** yet.

Also, current release include the following fixes:

* *Kafka* connection recovery
* *Nats* connection recovery
* *Redis* connection methods supports not-url parameters

## 2023-05-26 **0.1.2.2** NATS Stable

`NatsBroker` is full tested now.
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/en/contributing/2_contributing-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ services:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- ALLOW_PLAINTEXT_LISTENER=yes

sqs:
image: softwaremill/elasticmq-native
ports:
- 9324:9324
```
```bash
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/en/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ This is the **Propan** declarative way to write the same code. That is so much e
| **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
| **Pulsar** | :mag: planning :mag: | :mag: planning :mag: |
| **Pulsar** | :mag: planning :mag: | :mag: planning :mag: |
18 changes: 18 additions & 0 deletions docs/docs/ru/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# CHANGELOG

## 2023-05-28 **0.1.2.3** SQS Beta

В **Propan** добавлена поддержка *SQS* в качестве брокера сообщений. Данный функционал полностью протестирован.

*SQSBroker* поддерживает:

* доставку сообщений
* тестовый клиент, без необходимости запуска *ElasticMQ* или подключения к облачному *SQS*
* *FastAPI* плагин

*KafkaBroker* на данный момент не поддерживает **RPC** запросы.

Также текущий релиз включает следующие исправления:

* автоматическое восстановления соединения с *Kafka*
* автоматическое восстановления соединения с *Nats*
* *Redis* поддерживает подключение по явным аргументам

## 2023-05-26 **0.1.2.2** NATS Stable

`NatsBroker` полностью протестирован.
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/ru/contributing/2_contributing-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ services:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- ALLOW_PLAINTEXT_LISTENER=yes

sqs:
image: softwaremill/elasticmq-native
ports:
- 9324:9324
```
#### Hatch
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/ru/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def base_handler(body):
| **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Nats** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: |
| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: |
| **SQS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **SQS** | :warning: **beta** :warning: | :mag: planning :mag: |
| **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: |
| **MQTT** | :mag: planning :mag: | :mag: planning :mag: |
| **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: |
Expand Down
14 changes: 14 additions & 0 deletions examples/nats/1_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from propan import NatsBroker, PropanApp

broker = NatsBroker("nats://localhost:4222")
app = PropanApp(broker)


@broker.handle("test")
async def hello(msg: str):
print(msg)


@app.after_startup
async def pub():
await broker.publish("hi", "test")
21 changes: 21 additions & 0 deletions examples/sqs/1_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from aiobotocore.config import AioConfig
from botocore import UNSIGNED

from propan import PropanApp, SQSBroker

broker = SQSBroker(
"http://localhost:9324/",
region_name="us-west-2",
config=AioConfig(signature_version=UNSIGNED),
)
app = PropanApp(broker)


@broker.handle("test")
async def hello(msg: str):
print(msg)


@app.after_startup
async def pub():
await broker.publish("hi", "test")
3 changes: 2 additions & 1 deletion propan/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices"""

__version__ = "0.1.2.2"
__version__ = "0.1.2.3"


INSTALL_MESSAGE = (
Expand All @@ -10,4 +10,5 @@
'pip install "propan[async-nats]"\n'
'pip install "propan[async-redis]"\n'
'pip install "propan[async-kafka]"\n'
'pip install "propan[async-sqs]"\n'
)
10 changes: 9 additions & 1 deletion propan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
except Exception:
KafkaBroker = None # type: ignore

assert any((RabbitBroker, NatsBroker, RedisBroker)), INSTALL_MESSAGE
try:
from propan.brokers.sqs import SQSBroker
except Exception:
SQSBroker = None # type: ignore

assert any(
(RabbitBroker, NatsBroker, RedisBroker, SQSBroker, KafkaBroker)
), INSTALL_MESSAGE

__all__ = ( # noqa: F405
# app
Expand All @@ -45,4 +52,5 @@
"RabbitBroker",
"RedisBroker",
"KafkaBroker",
"SQSBroker",
)
31 changes: 26 additions & 5 deletions propan/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

RabbitBroker = Annotated[RB, ContextField("broker")]
RabbitMessage = Annotated[aio_pika.message.IncomingMessage, ContextField("message")]
Channel = Annotated[
aio_pika.robust_channel.RobustChannel, ContextField("broker._channel")
]
except Exception:
RabbitBroker = RabbitMessage = None # type: ignore
RabbitBroker = RabbitMessage = Channel = None # type: ignore


try:
Expand All @@ -35,29 +38,47 @@


try:
from redis.asyncio.client import Redis as R

from propan.brokers.redis import RedisBroker as RedB

RedisBroker = Annotated[RedB, ContextField("broker")]
Redis = Annotated[R, ContextField("broker._connection")]
except Exception:
RedisBroker = None # type: ignore
RedisBroker = Redis = None # type: ignore


try:
from aiokafka import AIOKafkaProducer
from aiokafka.structs import ConsumerRecord

from propan.brokers.kafka import KafkaBroker as KB

KafkaBroker = Annotated[KB, ContextField("broker")]
KafkaMessage = Annotated[ConsumerRecord, ContextField("message")]
Producer = Annotated[AIOKafkaProducer, ContextField("producer")]
except Exception:
KafkaBroker = KafkaMessage = None # type: ignore


try:
from aiobotocore.client import AioBaseClient

from propan.brokers.sqs import SQSBroker as SB

SQSBroker = Annotated[SB, ContextField("broker")]
client = Annotated[AioBaseClient, ContextField("client")]
queue_url = Annotated[str, ContextField("queue_url")]
except Exception:
SQSBroker = client = queue_url = None # type: ignore


assert any(
(
all((RabbitBroker, RabbitMessage)),
all((RabbitBroker, RabbitMessage, Channel)),
all((NatsBroker, NatsMessage)),
RedisBroker,
all((KafkaBroker, KafkaMessage)),
all((RedisBroker, Redis)),
all((KafkaBroker, KafkaMessage, Producer)),
all((SQSBroker, client, queue_url)),
)
), INSTALL_MESSAGE
4 changes: 4 additions & 0 deletions propan/brokers/_model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from propan.brokers._model.broker_usecase import BrokerUsecase
from propan.brokers._model.schemas import ContentTypes, Queue

__all__ = ("Queue", "BrokerUsecase", "ContentTypes")
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from functools import wraps
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, TypeVar, Union

from propan.brokers.model.schemas import (
from propan.brokers._model.schemas import (
ContentType,
ContentTypes,
PropanMessage,
SendableModel,
)
from propan.brokers.model.utils import (
from propan.brokers._model.utils import (
change_logger_handlers,
get_watcher,
set_message_context,
Expand Down Expand Up @@ -80,6 +80,7 @@ async def publish(
self,
message: SendableMessage,
*args: Any,
reply_to: str = "",
callback: bool = False,
callback_timeout: Optional[float] = None,
raise_timeout: bool = False,
Expand All @@ -102,7 +103,7 @@ def _process_message(
raise NotImplementedError()

def _get_log_context(
self, message: PropanMessage, **kwargs: Dict[str, str]
self, message: Optional[PropanMessage], **kwargs: Dict[str, str]
) -> Dict[str, Any]:
return {
"message_id": message.message_id[:10] if message else "",
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async def wrapper(message: Any, reraise_exc: bool = False) -> Optional[T]:
try:
return await func(message)
except Exception as e:
print(e)
if reraise_exc is True:
raise e
return None
Expand Down
21 changes: 14 additions & 7 deletions propan/brokers/kafka/kafka_broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from functools import partial, wraps
from typing import Any, Callable, Dict, List, NoReturn, Optional, Sequence, Tuple, Union

Expand All @@ -7,9 +8,9 @@
from typing_extensions import TypeVar

from propan.__about__ import __version__
from propan.brokers._model.broker_usecase import BrokerUsecase
from propan.brokers._model.schemas import PropanMessage
from propan.brokers.kafka.schemas import Handler
from propan.brokers.model.broker_usecase import BrokerUsecase
from propan.brokers.model.schemas import PropanMessage
from propan.brokers.push_back_watcher import BaseWatcher
from propan.types import (
AnyCallable,
Expand Down Expand Up @@ -122,7 +123,7 @@ async def start(self) -> None:
consumer = self._connection(*handler.topics, **handler.consumer_kwargs)
await consumer.start()
handler.consumer = consumer
handler.task = asyncio.create_task(_consume(handler))
handler.task = asyncio.create_task(self._consume(handler))

@staticmethod
async def _parse_message(message: ConsumerRecord) -> PropanMessage:
Expand Down Expand Up @@ -187,7 +188,7 @@ def fmt(self) -> str:

def _get_log_context(
self,
message: PropanMessage,
message: Optional[PropanMessage],
topics: Sequence[str] = (),
) -> Dict[str, Any]:
if topics:
Expand All @@ -200,7 +201,13 @@ def _get_log_context(
**super()._get_log_context(message),
}

async def _consume(self, handler: Handler) -> NoReturn:
c = self._get_log_context(None, handler.topics)

async def _consume(handler: Handler) -> NoReturn:
async for msg in handler.consumer:
await handler.callback(msg)
while True:
try:
msg = await handler.consumer.getone()
except Exception as e:
self._log(e, logging.WATNING, c)
else:
await handler.callback(msg)
Loading

0 comments on commit 17cb2b1

Please sign in to comment.