From d7c3e380a589233db6fa0e29425b49791dd828b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 14:57:18 -0700 Subject: [PATCH 1/5] added BotSpeakingFrame --- CHANGELOG.md | 3 +++ src/pipecat/frames/frames.py | 11 +++++++++++ src/pipecat/transports/base_output.py | 2 ++ 3 files changed, 16 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60f30806f..546550365 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed + upstream while the bot is talking. + - Added `XTTSService`. This is a local Text-To-Speech service. See https://github.com/coqui-ai/TTS diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 0b013dbfb..a581055be 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -240,6 +240,17 @@ class StopInterruptionFrame(SystemFrame): pass +@dataclass +class BotSpeakingFrame(SystemFrame): + """Emitted by transport outputs while the bot is still speaking. This can be + used, for example, to detect when a user is idle. That is, while the bot is + speaking we don't want to trigger any user idle timeout since the user might + be listening. + + """ + pass + + @dataclass class MetricsFrame(SystemFrame): """Emitted by processor that can compute metrics like latencies. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index c0ea8cdd1..41f2df79e 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -14,6 +14,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.frames.frames import ( AudioRawFrame, + BotSpeakingFrame, CancelFrame, MetricsFrame, SpriteFrame, @@ -263,4 +264,5 @@ async def _maybe_send_audio(self, buffer: bytearray) -> bytearray: if len(buffer) >= self._audio_chunk_size: await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size])) buffer = buffer[self._audio_chunk_size:] + await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) return buffer From 933b63cf1397ab9b0322d395b513e5095b80d2c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 14:57:42 -0700 Subject: [PATCH 2/5] processors: added new IdleFrameProcessor --- CHANGELOG.md | 4 + .../processors/idle_frame_processor.py | 76 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/pipecat/processors/idle_frame_processor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 546550365..e0710a03c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `IdleFrameProcessor`. This processor can be used to wait for frames + within a given timeout. If no frame is received within the timeout a provided + callback is called. + - Added new frame `BotSpeakingFrame`. This frame will be continuously pushed upstream while the bot is talking. diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py new file mode 100644 index 000000000..40304a5c6 --- /dev/null +++ b/src/pipecat/processors/idle_frame_processor.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from typing import Awaitable, Callable, List + +from pipecat.frames.frames import Frame, SystemFrame +from pipecat.processors.async_frame_processor import AsyncFrameProcessor +from pipecat.processors.frame_processor import FrameDirection + + +class IdleFrameProcessor(AsyncFrameProcessor): + """This class waits to receive any frame or list of desired frames within a + given timeout. If the timeout is reached before receiving any of those + frames the provided callback will be called. + + The callback can then be used to push frames downstream by using + `queue_frame()` (or `push_frame()` for system frames). + + """ + + def __init__( + self, + *, + callback: Callable[["IdleFrameProcessor"], Awaitable[None]], + timeout: float, + types: List[type] = [], + **kwargs): + super().__init__(**kwargs) + + self._callback = callback + self._timeout = timeout + self._types = types + + self._create_idle_task() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, SystemFrame): + await self.push_frame(frame, direction) + else: + await self.queue_frame(frame, direction) + + # If we are not waiting for any specific frame set the event, otherwise + # check if we have received one of the desired frames. + if not self._types: + self._idle_event.set() + else: + for t in self._types: + if isinstance(frame, t): + self._idle_event.set() + + # If we are not waiting for any specific frame set the event, otherwise + async def cleanup(self): + self._idle_task.cancel() + await self._idle_task + + def _create_idle_task(self): + self._idle_event = asyncio.Event() + self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) + + async def _idle_task_handler(self): + while True: + try: + await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) + except asyncio.TimeoutError: + await self._callback(self) + except asyncio.CancelledError: + break + finally: + self._idle_event.clear() From 535514f506874102a1e9ed17b55d845e7756209e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 14:58:01 -0700 Subject: [PATCH 3/5] processors: added new UserIdleProcessor --- CHANGELOG.md | 4 + src/pipecat/processors/user_idle_processor.py | 77 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 src/pipecat/processors/user_idle_processor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e0710a03c..6ca777718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `UserIdleProcessor`. This processor can be used to wait for any + interaction with the user. If the user doesn't say anything within a given + timeout a provided callback is called. + - Added `IdleFrameProcessor`. This processor can be used to wait for frames within a given timeout. If no frame is received within the timeout a provided callback is called. diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py new file mode 100644 index 000000000..20e8e7be6 --- /dev/null +++ b/src/pipecat/processors/user_idle_processor.py @@ -0,0 +1,77 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio + +from typing import Awaitable, Callable + +from pipecat.frames.frames import BotSpeakingFrame, Frame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame +from pipecat.processors.async_frame_processor import AsyncFrameProcessor +from pipecat.processors.frame_processor import FrameDirection + + +class UserIdleProcessor(AsyncFrameProcessor): + """This class is useful to check if the user is interacting with the bot + within a given timeout. If the timeout is reached before any interaction + occurred the provided callback will be called. + + The callback can then be used to push frames downstream by using + `queue_frame()` (or `push_frame()` for system frames). + + """ + + def __init__( + self, + *, + callback: Callable[["UserIdleProcessor"], Awaitable[None]], + timeout: float, + **kwargs): + super().__init__(**kwargs) + + self._callback = callback + self._timeout = timeout + + self._interrupted = False + + self._create_idle_task() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, SystemFrame): + await self.push_frame(frame, direction) + else: + await self.queue_frame(frame, direction) + + # We shouldn't call the idle callback if the user or the bot are speaking. + if isinstance(frame, StartInterruptionFrame): + self._interrupted = True + self._idle_event.set() + elif isinstance(frame, StopInterruptionFrame): + self._interrupted = False + self._idle_event.set() + elif isinstance(frame, BotSpeakingFrame): + self._idle_event.set() + + async def cleanup(self): + self._idle_task.cancel() + await self._idle_task + + def _create_idle_task(self): + self._idle_event = asyncio.Event() + self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) + + async def _idle_task_handler(self): + while True: + try: + await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) + except asyncio.TimeoutError: + if not self._interrupted: + await self._callback(self) + except asyncio.CancelledError: + break + finally: + self._idle_event.clear() From d1ca0c5614dfac132118c2396fb5649424537380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 14:58:18 -0700 Subject: [PATCH 4/5] examples: added new 17-detect-user-idle.py --- CHANGELOG.md | 5 + examples/foundational/17-detect-user-idle.py | 108 +++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 examples/foundational/17-detect-user-idle.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ca777718..6c69f22a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send data to the websocket after being closed. +### Other + +- Added new `17-detect-user-idle.py` example that shows how to use the new + `UserIdleProcessor`. + ## [0.0.35] - 2024-06-28 ### Changed diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py new file mode 100644 index 000000000..a1fb40dd9 --- /dev/null +++ b/examples/foundational/17-detect-user-idle.py @@ -0,0 +1,108 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.user_idle_processor import UserIdleProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + async def user_idle_callback(user_idle: UserIdleProcessor): + messages.append( + {"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."}) + await user_idle.queue_frame(LLMMessagesFrame(messages)) + + user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) + + pipeline = Pipeline([ + transport.input(), # Transport user input + user_idle, # Idle user check-in + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, PipelineParams( + allow_interruptions=True, + enable_metrics=True, + report_only_initial_ttfb=True, + )) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) From be14ce465d839e3cb4343fdc94cc0f1de4a6db46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 1 Jul 2024 18:25:40 -0700 Subject: [PATCH 5/5] transports(daily): make sure we don't send data if client is closed --- src/pipecat/transports/services/daily.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index c85e989dd..3c7b2772c 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -199,6 +199,9 @@ def set_callbacks(self, callbacks: DailyCallbacks): self._callbacks = callbacks async def send_message(self, frame: DailyTransportMessageFrame): + if not self._client: + return + future = self._loop.create_future() self._client.send_app_message( frame.message,