diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b2d777d9..24ce7459d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,15 +11,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `DailyTransport` event `on_participant_left`. +- Added support for receiving `DailyTransportMessage`. + ### Fixed - Images are now resized to the size of the output camera. -- Fixed an issue in `DailyTransport` that would not allow the processor to +- Fixed an issue in `DailyTransport` that would not allow the input processor to shutdown if no participant ever joined the room. -- Fixed transport start and stop. In some situation processors would halt or not - shutdown properly. +- Fixed base transports start and stop. In some situation processors would halt + or not shutdown properly. ## [0.0.13] - 2024-05-14 diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 16a1f29ac..8ed75abec 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -104,6 +104,7 @@ class DailyCallbacks(BaseModel): on_participant_joined: Callable[[Mapping[str, Any]], None] on_participant_left: Callable[[Mapping[str, Any], str], None] on_first_participant_joined: Callable[[Mapping[str, Any]], None] + on_app_message: Callable[[Any, str], None] on_error: Callable[[str], None] @@ -400,6 +401,10 @@ def on_transcription_started(self, status): def on_transcription_stopped(self, stopped_by, stopped_by_error): logger.debug("Transcription stopped") + def on_app_message(self, message: Any, sender: str): + self._callbacks.on_app_message(message, sender) + + # # Daily (CallClient callbacks) # @@ -468,6 +473,15 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) + # + # App Message + # + + def push_app_message(self, message: Any, sender: str): + frame = DailyTransportMessageFrame(message=message, participant_id=sender) + future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) + future.result() + # # Camera in # @@ -576,6 +590,7 @@ def __init__(self, room_url: str, token: str | None, bot_name: str, params: Dail on_first_participant_joined=self._on_first_participant_joined, on_participant_joined=self._on_participant_joined, on_participant_left=self._on_participant_left, + on_app_message=self._on_app_message, on_error=self._on_error, ) self._params = params @@ -694,6 +709,10 @@ def on_participant_left(self, participant, reason): def on_first_participant_joined(self, participant): pass + def on_app_message(self, message: Any, sender: str): + if self._input: + self._input.push_app_message(message, sender) + def event_handler(self, event_name: str): def decorator(handler): self._add_event_handler(event_name, handler) @@ -738,17 +757,3 @@ def _patch_method(self, event_name, *args, **kwargs): # def start_recording(self): # self.client.start_recording() - - # def on_error(self, error): - # self._logger.error(f"on_error: {error}") - - # def on_participant_left(self, participant, reason): - # if len(self.client.participants()) < self._min_others_count + 1: - # self._stop_threads.set() - - # def on_app_message(self, message: Any, sender: str): - # if self._loop: - # frame = ReceivedAppMessageFrame(message, sender) - # asyncio.run_coroutine_threadsafe( - # self.receive_queue.put(frame), self._loop - # )