diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index d6eee25f40..e49ca6681e 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -153,13 +153,14 @@ async def send( kwargs["timestamp"] = timestamp_ms if not no_confirm: - result_future: asyncio.Future[Optional[Message]] = asyncio.Future() + loop = asyncio.get_running_loop() + result_future: asyncio.Future[Optional[Message]] = loop.create_future() def ack_callback(err: Any, msg: Optional[Message]) -> None: if err or (msg is not None and (err := msg.error())): - result_future.set_exception(KafkaException(err)) + loop.call_soon_threadsafe(result_future.set_exception, KafkaException(err)) else: - result_future.set_result(msg) + loop.call_soon_threadsafe(result_future.set_result, msg) kwargs["on_delivery"] = ack_callback