diff --git a/README.md b/README.md index 0587174..d7cd094 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Motivation: Redis provided on the cloud is usually only available within a VPC a ## Supported queue service - [x] AWS SQS +- [x] Google PubSub Welcome to contribute more queue service, see [adapter/impl](./redis_canal/adapter/impl/) for more details. diff --git a/pyproject.toml b/pyproject.toml index 3123c78..4b35b12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,8 +25,9 @@ classifiers = [ ] [project.optional-dependencies] -all = ["redis_canal[sqs]"] +all = ["redis_canal[sqs, pubsub]"] sqs = ["boto3"] +pubsub = ["google-cloud-pubsub"] test = [ "pytest", "pytest-asyncio", diff --git a/redis_canal/adapter/impl/pubsub.py b/redis_canal/adapter/impl/pubsub.py new file mode 100644 index 0000000..d2d5bae --- /dev/null +++ b/redis_canal/adapter/impl/pubsub.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import asyncio +from functools import cached_property +from typing import TYPE_CHECKING, Awaitable + +from redis_canal.adapter.plugin import Adapter, hookimpl +from redis_canal.log import logger +from redis_canal.models import Message +from redis_canal.tools import run_in_threadpool + +if TYPE_CHECKING: + from google.cloud import pubsub_v1 + + +class PubSubAdapter(Adapter): + """ + PubSub Adapter for Google Cloud Pub/Sub. + + Args: + queue_url (str): The topic path, e.g. projects/{project}/topics/{topic} + poll_time (float): The time to poll the queue + poll_size (int): The number of messages to poll at a time + """ + + register_name = "pubsub" + + def __init__(self, queue_url: str, poll_time: float, poll_size: int, *args, **kwargs): + super().__init__(queue_url, poll_time, poll_size, *args, **kwargs) + + if self.poll_time < 1: + self.poll_time = 1 + if self.poll_size > 10: + self.poll_size = 10 + + self.poll_time = self.poll_time + self.ensure_queue_exists() + + @property + def topic_path(self) -> str: + return self.queue_url + + @property + def subscription_path(self) -> str: + return f"projects/{self.project_id}/subscriptions/{self.subscribe_id}" + + @property + def project_id(self) -> str: + return self.topic_path.split("/")[1] + + @property + def subscribe_id(self) -> str: + return "redis_cannal_pubsub" + + @cached_property + def publisher(self) -> "pubsub_v1.PublisherClient": + try: + from google.cloud import pubsub_v1 + except ImportError: + raise RuntimeError( + "Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub" + ) + + return pubsub_v1.PublisherClient() + + @cached_property + def subscriber(self) -> "pubsub_v1.SubscriberClient": + try: + from google.cloud import pubsub_v1 + except ImportError: + raise RuntimeError( + "Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub" + ) + + return pubsub_v1.SubscriberClient() + + def ensure_queue_exists(self): + from google.api_core import exceptions + + try: + self.publisher.create_topic( + name=self.topic_path, + ) + except exceptions.AlreadyExists: + pass + + try: + self.subscriber.create_subscription( + name=self.subscription_path, + topic=self.topic_path, + ) + except exceptions.AlreadyExists: + pass + + async def emit(self, message: Message) -> None: + def _(): + response = self.publisher.publish( + topic=self.topic_path, + data=message.model_dump_json().encode("utf-8"), + ) + + logger.debug(f"Published message {response.result()} to {self.topic_path}") + + await run_in_threadpool(_) + + async def poll(self, process_func: Awaitable[Message], *args, **kwargs) -> None: + pubsub_messages = await self._poll_message() + await asyncio.gather( + *[ + self._process_messages(process_func, message, ack_id) + for message, ack_id in pubsub_messages + ] + ) + + async def _poll_message(self) -> list[tuple[Message, str]]: + def _(): + response = self.subscriber.pull( + request={ + "subscription": self.subscription_path, + "max_messages": self.poll_size, + } + ) + return [ + ( + Message.model_validate_json(message.message.data.decode("utf-8")), + message.ack_id, + ) + for message in response.received_messages + ] + + return await run_in_threadpool(_) + + async def _process_messages( + self, process_func: Awaitable[Message], message: Message, ack_id: str + ): + try: + await process_func(message) + except Exception as e: + logger.error(f"Error processing message {message}: {e}") + else: + await run_in_threadpool( + self.subscriber.acknowledge, + request={ + "subscription": self.subscription_path, + "ack_ids": [ack_id], + }, + ) + + +@hookimpl +def register(manager): + manager.register(PubSubAdapter) diff --git a/redis_canal/adapter/impl/sqs.py b/redis_canal/adapter/impl/sqs.py index c60b954..959acc3 100644 --- a/redis_canal/adapter/impl/sqs.py +++ b/redis_canal/adapter/impl/sqs.py @@ -35,7 +35,7 @@ def client(self): import boto3 except ImportError: raise RuntimeError( - "boto3 is not installed, try install moriarty with `pip install moriarty[matrix]` for all components or `pip install moriarty[sqs]` for sqs only" + "boto3 is not installed, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[sqs]` for sqs" ) return boto3.client("sqs") diff --git a/tests/adapter/test_pubsub_adapter.py b/tests/adapter/test_pubsub_adapter.py new file mode 100644 index 0000000..9938917 --- /dev/null +++ b/tests/adapter/test_pubsub_adapter.py @@ -0,0 +1,54 @@ +import os + +import pytest + +from redis_canal.adapter.impl.pubsub import PubSubAdapter +from redis_canal.models import Message + + +@pytest.fixture +def pubsub_adapter(case_id): + try: + import google.api_core.exceptions as exceptions + import google.cloud.pubsub_v1 + + except ImportError: + pytest.skip("Google Cloud Pub/Sub SDK is not installed") + + project_id = os.getenv("TEST_PUBSUB_PROJECT_ID") + if not project_id: + pytest.skip("TEST_PUBSUB_PROJECT_ID is not configured") + + queue_url = f"projects/{project_id}/topics/test-topic" + try: + adapter = PubSubAdapter( + queue_url=queue_url, + poll_time=1, + poll_size=10, + ) + except Exception as e: + pytest.skip(f"Google Cloud Pub/Sub SDK is not configured correctly: {e}") + + yield adapter + try: + adapter.publisher.delete_topic(request={"topic": queue_url}) + except exceptions.NotFound: + pass + + +async def test_pubsub_adapter(pubsub_adapter): + + message_input = Message( + redis_key="test", + message_id="123-345", + message_content={"f1": "v1"}, + ) + + async def validate(message): + assert message == message_input + print("validated!") + + await pubsub_adapter.emit(message_input) + await pubsub_adapter.poll( + process_func=validate, + ) diff --git a/tests/test_adapter_manager.py b/tests/test_adapter_manager.py index 16ae5ef..0fd21f4 100644 --- a/tests/test_adapter_manager.py +++ b/tests/test_adapter_manager.py @@ -2,7 +2,7 @@ def test_bridge_manager(): - registed = ["sqs"] + registed = ["sqs", "pubsub"] manager = AdapterManager() assert sorted(registed) == sorted(manager.registed_cls.keys())