Skip to content

Commit

Permalink
Finish 0.1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
chandr-andr committed Apr 18, 2023
2 parents 0aa9cdf + 2283921 commit b6a691b
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 15 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,5 @@ AioKafkaBroker parameters:
* `kafka_topic` - custom topic in kafka.
* `result_backend` - custom result backend.
* `task_id_generator` - custom task_id genertaor.
* `aiokafka_producer` - custom `aiokafka` producer.
* `aiokafka_consumer` - custom `aiokafka` consumer.
* `kafka_admin_client` - custom `kafka` admin client.
* `delete_topic_on_shutdown` - flag to delete topic on broker shutdown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "taskiq-aio-kafka"
description = "Kafka broker for taskiq"
authors = ["Taskiq team <[email protected]>"]
maintainers = ["Taskiq team <[email protected]>"]
version = "0.1.1"
version = "0.1.2"
readme = "README.md"
license = "LICENSE"
classifiers = [
Expand Down
8 changes: 2 additions & 6 deletions taskiq_aio_kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def __init__( # noqa: WPS211
kafka_topic: Optional[NewTopic] = None,
result_backend: Optional[AsyncResultBackend[_T]] = None,
task_id_generator: Optional[Callable[[], str]] = None,
aiokafka_producer: Optional[AIOKafkaProducer] = None,
aiokafka_consumer: Optional[AIOKafkaConsumer] = None,
kafka_admin_client: Optional[KafkaAdminClient] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
delete_topic_on_shutdown: bool = False,
Expand All @@ -58,8 +56,6 @@ def __init__( # noqa: WPS211
:param kafka_topic: kafka topic.
:param result_backend: custom result backend.
:param task_id_generator: custom task_id generator.
:param aiokafka_producer: configured AIOKafkaProducer.
:param aiokafka_consumer: configured AIOKafkaConsumer.
:param kafka_admin_client: configured KafkaAdminClient.
:param loop: specific even loop.
:param delete_topic_on_shutdown: delete or don't delete topic on shutdown.
Expand All @@ -69,10 +65,10 @@ def __init__( # noqa: WPS211
"""
super().__init__(result_backend, task_id_generator)

if (aiokafka_producer or aiokafka_consumer) and not bootstrap_servers:
if kafka_admin_client and not bootstrap_servers:
raise WrongAioKafkaBrokerParametersError(
(
"If you specify `aiokafka_producer` and/or `aiokafka_consumer`, "
"If you specify `kafka_admin_client`, "
"you must specify `bootstrap_servers`."
),
)
Expand Down
6 changes: 0 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ async def broker_without_arguments(
@pytest.fixture()
async def broker(
kafka_url: str,
test_kafka_producer: AIOKafkaProducer,
test_kafka_consumer: AIOKafkaConsumer,
base_topic: NewTopic,
) -> AsyncGenerator[AioKafkaBroker, None]:
"""Yield new broker instance.
Expand All @@ -125,16 +123,12 @@ async def broker(
and shutdown after test.
:param kafka_url: url to kafka.
:param test_kafka_producer: custom AIOKafkaProducer.
:param test_kafka_consumer: custom AIOKafkaConsumer.
:param base_topic: base topic.
:yields: broker.
"""
broker = AioKafkaBroker(
bootstrap_servers=kafka_url,
aiokafka_producer=test_kafka_producer,
aiokafka_consumer=test_kafka_consumer,
kafka_topic=base_topic,
delete_topic_on_shutdown=True,
)
Expand Down

0 comments on commit b6a691b

Please sign in to comment.