Skip to content

Commit

Permalink
fix: Lazily connect to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 8, 2023
1 parent 37bef65 commit 1838321
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions posthog/temporal/tests/batch_exports/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ async def queue():
yield queue


class CaptureKafkaProducer(aiokafka.AIOKafkaProducer):
class CaptureKafkaProducer:
"""A test aiokafka.AIOKafkaProducer that captures calls to send_and_wait."""

def __init__(self, *args, **kwargs):
super().__init__(bootstrap_servers="localhost:9092")
self.entries = []
self._producer: None | aiokafka.AIOKafkaProducer = None

@property
def producer(self) -> aiokafka.AIOKafkaProducer:
if self._producer is None:
self._producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_HOSTS + ["localhost:9092"],
security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT",
acks="all",
)
return self._producer

async def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):
"""Append an entry and delegate to aiokafka.AIOKafkaProducer."""
Expand All @@ -87,7 +97,20 @@ async def send(self, topic, value=None, key=None, partition=None, timestamp_ms=N
"headers": headers,
}
)
return await super().send(topic, value, key, partition, timestamp_ms, headers)
return await self.producer.send(topic, value, key, partition, timestamp_ms, headers)

async def start(self):
await self.producer.start()

async def stop(self):
await self.producer.stop()

async def flush(self):
await self.producer.flush()

@property
def _closed(self):
return self.producer._closed


@pytest_asyncio.fixture(scope="function")
Expand Down

0 comments on commit 1838321

Please sign in to comment.