diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/workflows/logger.py index 93d1900e5b128..03cf980238078 100644 --- a/posthog/temporal/workflows/logger.py +++ b/posthog/temporal/workflows/logger.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import ssl import aiokafka import structlog @@ -10,7 +11,6 @@ from structlog.processors import EventRenamer from structlog.typing import FilteringBoundLogger -from posthog.kafka_client.client import _sasl_params from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES BACKGROUND_LOGGER_TASKS = set() @@ -255,7 +255,7 @@ def __init__( security_protocol=settings.KAFKA_SECURITY_PROTOCOL or "PLAINTEXT", acks="all", api_version="2.5.0", - **_sasl_params(), + ssl_context=configure_default_ssl_context() if settings.KAFKA_SECURITY_PROTOCOL == "SSL" else None, ) ) self.logger = structlog.get_logger() @@ -299,3 +299,13 @@ async def flush(self): def mark_queue_done(self, _=None): self.queue.task_done() + + +def configure_default_ssl_context(): + """Setup a default SSL context for Kafka.""" + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.options |= ssl.OP_NO_SSLv2 + context.options |= ssl.OP_NO_SSLv3 + context.verify_mode = ssl.CERT_OPTIONAL + context.load_default_certs() + return context