Skip to content

Commit

Permalink
Merge pull request #278 from pipecat-ai/aleix/detect-user-idle
Browse files Browse the repository at this point in the history
add support for detecting user idle
  • Loading branch information
aconchillo authored Jul 2, 2024
2 parents c1957ab + be14ce4 commit 974d9c3
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 0 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `UserIdleProcessor`. This processor can be used to wait for any
interaction with the user. If the user doesn't say anything within a given
timeout a provided callback is called.

- Added `IdleFrameProcessor`. This processor can be used to wait for frames
within a given timeout. If no frame is received within the timeout a provided
callback is called.

- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
upstream while the bot is talking.

- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS

Expand Down Expand Up @@ -45,6 +56,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send
data to the websocket after being closed.

### Other

- Added new `17-detect-user-idle.py` example that shows how to use the new
`UserIdleProcessor`.

## [0.0.35] - 2024-06-28

### Changed
Expand Down
108 changes: 108 additions & 0 deletions examples/foundational/17-detect-user-idle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import aiohttp
import os
import sys

from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from runner import configure

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")

messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)

async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))

user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)

pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])

task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

runner = PipelineRunner()

await runner.run(task)


if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))
11 changes: 11 additions & 0 deletions src/pipecat/frames/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ class StopInterruptionFrame(SystemFrame):
pass


@dataclass
class BotSpeakingFrame(SystemFrame):
"""Emitted by transport outputs while the bot is still speaking. This can be
used, for example, to detect when a user is idle. That is, while the bot is
speaking we don't want to trigger any user idle timeout since the user might
be listening.
"""
pass


@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
Expand Down
76 changes: 76 additions & 0 deletions src/pipecat/processors/idle_frame_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio

from typing import Awaitable, Callable, List

from pipecat.frames.frames import Frame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection


class IdleFrameProcessor(AsyncFrameProcessor):
"""This class waits to receive any frame or list of desired frames within a
given timeout. If the timeout is reached before receiving any of those
frames the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""

def __init__(
self,
*,
callback: Callable[["IdleFrameProcessor"], Awaitable[None]],
timeout: float,
types: List[type] = [],
**kwargs):
super().__init__(**kwargs)

self._callback = callback
self._timeout = timeout
self._types = types

self._create_idle_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)

# If we are not waiting for any specific frame set the event, otherwise
# check if we have received one of the desired frames.
if not self._types:
self._idle_event.set()
else:
for t in self._types:
if isinstance(frame, t):
self._idle_event.set()

# If we are not waiting for any specific frame set the event, otherwise
async def cleanup(self):
self._idle_task.cancel()
await self._idle_task

def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())

async def _idle_task_handler(self):
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
await self._callback(self)
except asyncio.CancelledError:
break
finally:
self._idle_event.clear()
77 changes: 77 additions & 0 deletions src/pipecat/processors/user_idle_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio

from typing import Awaitable, Callable

from pipecat.frames.frames import BotSpeakingFrame, Frame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection


class UserIdleProcessor(AsyncFrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""

def __init__(
self,
*,
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
timeout: float,
**kwargs):
super().__init__(**kwargs)

self._callback = callback
self._timeout = timeout

self._interrupted = False

self._create_idle_task()

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)

# We shouldn't call the idle callback if the user or the bot are speaking.
if isinstance(frame, StartInterruptionFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, StopInterruptionFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()

async def cleanup(self):
self._idle_task.cancel()
await self._idle_task

def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())

async def _idle_task_handler(self):
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
await self._callback(self)
except asyncio.CancelledError:
break
finally:
self._idle_event.clear()
2 changes: 2 additions & 0 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
AudioRawFrame,
BotSpeakingFrame,
CancelFrame,
MetricsFrame,
SpriteFrame,
Expand Down Expand Up @@ -263,4 +264,5 @@ async def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
if len(buffer) >= self._audio_chunk_size:
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
buffer = buffer[self._audio_chunk_size:]
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
return buffer
3 changes: 3 additions & 0 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def set_callbacks(self, callbacks: DailyCallbacks):
self._callbacks = callbacks

async def send_message(self, frame: DailyTransportMessageFrame):
if not self._client:
return

future = self._loop.create_future()
self._client.send_app_message(
frame.message,
Expand Down

0 comments on commit 974d9c3

Please sign in to comment.