Skip to content

Commit

Permalink
some minor event loop updates
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 16, 2024
1 parent 129acf8 commit af10adb
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/pipecat/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class PipelineRunner:
def __init__(self, name: str | None = None, handle_sigint: bool = True):
self.id: int = obj_id()
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()

self._tasks = {}
self._running = True
Expand Down Expand Up @@ -47,7 +46,8 @@ def is_active(self):
return self._running

def _setup_sigint(self):
self._loop.add_signal_handler(
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda *args: asyncio.create_task(self._sigint_handler())
)
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/transports/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def start(self):
self._running = True

if self._params.audio_in_enabled or self._params.vad_enabled:
loop = asyncio.get_running_loop()
loop = self.get_event_loop()
self._audio_in_thread = loop.run_in_executor(None, self._audio_in_thread_handler)
self._audio_out_thread = loop.run_in_executor(None, self._audio_out_thread_handler)

Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def start(self):

self._running = True

loop = asyncio.get_running_loop()
loop = self.get_event_loop()

if self._params.camera_out_enabled:
self._camera_out_thread = loop.run_in_executor(None, self._camera_out_thread_handler)
Expand Down
10 changes: 6 additions & 4 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

#
# App Message
# Frames
#

def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
future.result()

def push_app_message(self, message: Any, sender: str):
frame = DailyTransportMessageFrame(message=message, participant_id=sender)
future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
Expand Down Expand Up @@ -686,9 +690,7 @@ def _on_transcription_message(self, participant_id, message):
frame = InterimTranscriptionFrame(text, participant_id, timestamp)

if self._input:
future = asyncio.run_coroutine_threadsafe(
self._input.push_frame(frame), self._input.get_event_loop())
future.result()
self._input.push_transcription_frame(frame)

#
# Decorators (event handlers)
Expand Down

0 comments on commit af10adb

Please sign in to comment.