Skip to content

Commit

Permalink
fix: Non-threadsafe operations in AsyncConfluentProducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
DABND19 committed Jan 4, 2025
1 parent 5bced11 commit daa47ee
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,14 @@ async def send(
kwargs["timestamp"] = timestamp_ms

if not no_confirm:
result_future: asyncio.Future[Optional[Message]] = asyncio.Future()
loop = asyncio.get_running_loop()
result_future: asyncio.Future[Optional[Message]] = loop.create_future()

def ack_callback(err: Any, msg: Optional[Message]) -> None:
if err or (msg is not None and (err := msg.error())):
result_future.set_exception(KafkaException(err))
loop.call_soon_threadsafe(result_future.set_exception, KafkaException(err))
else:
result_future.set_result(msg)
loop.call_soon_threadsafe(result_future.set_result, msg)

kwargs["on_delivery"] = ack_callback

Expand Down

0 comments on commit daa47ee

Please sign in to comment.