Skip to content

Commit

Permalink
transports(daily): add receiving transport messages
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed May 15, 2024
1 parent 28da747 commit 9e22a8b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Added `DailyTransport` event `on_participant_left`.

- Added support for receiving `DailyTransportMessage`.

### Fixed

- Images are now resized to the size of the output camera.

- Fixed an issue in `DailyTransport` that would not allow the processor to
- Fixed an issue in `DailyTransport` that would not allow the input processor to
shutdown if no participant ever joined the room.

- Fixed transport start and stop. In some situation processors would halt or not
shutdown properly.
- Fixed base transports start and stop. In some situation processors would halt
or not shutdown properly.

## [0.0.13] - 2024-05-14

Expand Down
33 changes: 19 additions & 14 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class DailyCallbacks(BaseModel):
on_participant_joined: Callable[[Mapping[str, Any]], None]
on_participant_left: Callable[[Mapping[str, Any], str], None]
on_first_participant_joined: Callable[[Mapping[str, Any]], None]
on_app_message: Callable[[Any, str], None]
on_error: Callable[[str], None]


Expand Down Expand Up @@ -400,6 +401,10 @@ def on_transcription_started(self, status):
def on_transcription_stopped(self, stopped_by, stopped_by_error):
logger.debug("Transcription stopped")

def on_app_message(self, message: Any, sender: str):
self._callbacks.on_app_message(message, sender)

#
# Daily (CallClient callbacks)
#

Expand Down Expand Up @@ -468,6 +473,15 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

await super().process_frame(frame, direction)

#
# App Message
#

def push_app_message(self, message: Any, sender: str):
frame = DailyTransportMessageFrame(message=message, participant_id=sender)
future = asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
future.result()

#
# Camera in
#
Expand Down Expand Up @@ -576,6 +590,7 @@ def __init__(self, room_url: str, token: str | None, bot_name: str, params: Dail
on_first_participant_joined=self._on_first_participant_joined,
on_participant_joined=self._on_participant_joined,
on_participant_left=self._on_participant_left,
on_app_message=self._on_app_message,
on_error=self._on_error,
)
self._params = params
Expand Down Expand Up @@ -694,6 +709,10 @@ def on_participant_left(self, participant, reason):
def on_first_participant_joined(self, participant):
pass

def on_app_message(self, message: Any, sender: str):
if self._input:
self._input.push_app_message(message, sender)

def event_handler(self, event_name: str):
def decorator(handler):
self._add_event_handler(event_name, handler)
Expand Down Expand Up @@ -738,17 +757,3 @@ def _patch_method(self, event_name, *args, **kwargs):

# def start_recording(self):
# self.client.start_recording()

# def on_error(self, error):
# self._logger.error(f"on_error: {error}")

# def on_participant_left(self, participant, reason):
# if len(self.client.participants()) < self._min_others_count + 1:
# self._stop_threads.set()

# def on_app_message(self, message: Any, sender: str):
# if self._loop:
# frame = ReceivedAppMessageFrame(message, sender)
# asyncio.run_coroutine_threadsafe(
# self.receive_queue.put(frame), self._loop
# )

0 comments on commit 9e22a8b

Please sign in to comment.