Skip to content

Commit

Permalink
fix: Lazily connect to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 8, 2023
1 parent 89db2f3 commit 288a97c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
31 changes: 28 additions & 3 deletions posthog/temporal/tests/batch_exports/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,24 @@ async def queue():
yield queue


class CaptureKafkaProducer(aiokafka.AIOKafkaProducer):
class CaptureKafkaProducer:
"""A test aiokafka.AIOKafkaProducer that captures calls to send_and_wait."""

def __init__(self, *args, **kwargs):
super().__init__(bootstrap_servers="localhost:9092")
self.entries = []
self._producer: None | aiokafka.AIOKafkaProducer = None

@property
def producer(self) -> aiokafka.AIOKafkaProducer:
if self._producer is None:
self._producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_HOSTS + ["localhost:9092"],
security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT",
acks="all",
request_timeout_ms=1000000,
api_version="2.5.0",
)
return self._producer

async def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):
"""Append an entry and delegate to aiokafka.AIOKafkaProducer."""
Expand All @@ -87,7 +99,20 @@ async def send(self, topic, value=None, key=None, partition=None, timestamp_ms=N
"headers": headers,
}
)
return await super().send(topic, value, key, partition, timestamp_ms, headers)
return await self.producer.send(topic, value, key, partition, timestamp_ms, headers)

async def start(self):
await self.producer.start()

async def stop(self):
await self.producer.stop()

async def flush(self):
await self.producer.flush()

@property
def _closed(self):
return self.producer._closed


@pytest_asyncio.fixture(scope="function")
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/workflows/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def __init__(
bootstrap_servers=settings.KAFKA_HOSTS,
security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT",
acks="all",
api_version="2.5.0",
)
)

Expand Down

0 comments on commit 288a97c

Please sign in to comment.