diff --git a/src/pipecat/pipeline/runner.py b/src/pipecat/pipeline/runner.py index bb10280b7..2d1b8100a 100644 --- a/src/pipecat/pipeline/runner.py +++ b/src/pipecat/pipeline/runner.py @@ -18,7 +18,6 @@ class PipelineRunner: def __init__(self, name: str | None = None, handle_sigint: bool = True): self.id: int = obj_id() self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}" - self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() self._tasks = {} self._running = True @@ -47,7 +46,8 @@ def is_active(self): return self._running def _setup_sigint(self): - self._loop.add_signal_handler( + loop = asyncio.get_running_loop() + loop.add_signal_handler( signal.SIGINT, lambda *args: asyncio.create_task(self._sigint_handler()) ) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index aaa1e0a29..5345eda6f 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -42,7 +42,7 @@ async def start(self): self._running = True if self._params.audio_in_enabled or self._params.vad_enabled: - loop = asyncio.get_running_loop() + loop = self.get_event_loop() self._audio_in_thread = loop.run_in_executor(None, self._audio_in_thread_handler) self._audio_out_thread = loop.run_in_executor(None, self._audio_out_thread_handler) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 1a3246fa6..f46d629f8 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -55,7 +55,7 @@ async def start(self): self._running = True - loop = asyncio.get_running_loop() + loop = self.get_event_loop() if self._params.camera_out_enabled: self._camera_out_thread = loop.run_in_executor(None, self._camera_out_thread_handler) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index b0a718325..820febc4e 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -474,9 +474,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) # - # App Message + # Frames # + def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): + future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) + future.result() + def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) @@ -686,9 +690,7 @@ def _on_transcription_message(self, participant_id, message): frame = InterimTranscriptionFrame(text, participant_id, timestamp) if self._input: - future = asyncio.run_coroutine_threadsafe( - self._input.push_frame(frame), self._input.get_event_loop()) - future.result() + self._input.push_transcription_frame(frame) # # Decorators (event handlers)