diff --git a/README.md b/README.md index cba5730..cb95483 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,5 @@ AioKafkaBroker parameters: * `kafka_topic` - custom topic in kafka. * `result_backend` - custom result backend. * `task_id_generator` - custom task_id genertaor. -* `aiokafka_producer` - custom `aiokafka` producer. -* `aiokafka_consumer` - custom `aiokafka` consumer. * `kafka_admin_client` - custom `kafka` admin client. * `delete_topic_on_shutdown` - flag to delete topic on broker shutdown. diff --git a/pyproject.toml b/pyproject.toml index 4f05026..aa9baee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "taskiq-aio-kafka" description = "Kafka broker for taskiq" authors = ["Taskiq team "] maintainers = ["Taskiq team "] -version = "0.1.1" +version = "0.1.2" readme = "README.md" license = "LICENSE" classifiers = [ diff --git a/taskiq_aio_kafka/broker.py b/taskiq_aio_kafka/broker.py index 59b216b..9f0b1ae 100644 --- a/taskiq_aio_kafka/broker.py +++ b/taskiq_aio_kafka/broker.py @@ -46,8 +46,6 @@ def __init__( # noqa: WPS211 kafka_topic: Optional[NewTopic] = None, result_backend: Optional[AsyncResultBackend[_T]] = None, task_id_generator: Optional[Callable[[], str]] = None, - aiokafka_producer: Optional[AIOKafkaProducer] = None, - aiokafka_consumer: Optional[AIOKafkaConsumer] = None, kafka_admin_client: Optional[KafkaAdminClient] = None, loop: Optional[asyncio.AbstractEventLoop] = None, delete_topic_on_shutdown: bool = False, @@ -58,8 +56,6 @@ def __init__( # noqa: WPS211 :param kafka_topic: kafka topic. :param result_backend: custom result backend. :param task_id_generator: custom task_id generator. - :param aiokafka_producer: configured AIOKafkaProducer. - :param aiokafka_consumer: configured AIOKafkaConsumer. :param kafka_admin_client: configured KafkaAdminClient. :param loop: specific even loop. :param delete_topic_on_shutdown: delete or don't delete topic on shutdown. @@ -69,10 +65,10 @@ def __init__( # noqa: WPS211 """ super().__init__(result_backend, task_id_generator) - if (aiokafka_producer or aiokafka_consumer) and not bootstrap_servers: + if kafka_admin_client and not bootstrap_servers: raise WrongAioKafkaBrokerParametersError( ( - "If you specify `aiokafka_producer` and/or `aiokafka_consumer`, " + "If you specify `kafka_admin_client`, " "you must specify `bootstrap_servers`." ), ) diff --git a/tests/conftest.py b/tests/conftest.py index 3dec1d7..3f924e8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,8 +114,6 @@ async def broker_without_arguments( @pytest.fixture() async def broker( kafka_url: str, - test_kafka_producer: AIOKafkaProducer, - test_kafka_consumer: AIOKafkaConsumer, base_topic: NewTopic, ) -> AsyncGenerator[AioKafkaBroker, None]: """Yield new broker instance. @@ -125,16 +123,12 @@ async def broker( and shutdown after test. :param kafka_url: url to kafka. - :param test_kafka_producer: custom AIOKafkaProducer. - :param test_kafka_consumer: custom AIOKafkaConsumer. :param base_topic: base topic. :yields: broker. """ broker = AioKafkaBroker( bootstrap_servers=kafka_url, - aiokafka_producer=test_kafka_producer, - aiokafka_consumer=test_kafka_consumer, kafka_topic=base_topic, delete_topic_on_shutdown=True, )