diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c2cf3aee..108cd7784 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added vision support to Anthropic service. + - Added `WakeCheckFilter` which allows you to pass information downstream only if you say a certain phrase/word. @@ -19,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed Anthropic service to use new frame types. + - Fixed an issue in `LLMUserResponseAggregator` and `UserResponseAggregator` that would cause frames after a brief pause to not be pushed to the LLM. diff --git a/examples/foundational/07a-interruptible-anthropic.py b/examples/foundational/07a-interruptible-anthropic.py new file mode 100644 index 000000000..260cd74e8 --- /dev/null +++ b/examples/foundational/07a-interruptible-anthropic.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.anthropic import AnthropicLLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), + model="claude-3-opus-20240229") + + # todo: think more about how to handle system prompts in a more general way. OpenAI, + # Google, and Anthropic all have slightly different approaches to providing a system + # prompt. + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative, helpful, and brief way. Say hello.", + }, + ] + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Transport user input + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out # Assistant spoken responses + ]) + + task = PipelineTask(pipeline, allow_interruptions=True) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/examples/foundational/12c-describe-video-anthropic.py b/examples/foundational/12c-describe-video-anthropic.py new file mode 100644 index 000000000..17ca08407 --- /dev/null +++ b/examples/foundational/12c-describe-video-anthropic.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.user_response import UserResponseAggregator +from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.anthropic import AnthropicLLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class UserImageRequester(FrameProcessor): + + def __init__(self, participant_id: str | None = None): + super().__init__() + self._participant_id = participant_id + + def set_participant_id(self, participant_id: str): + self._participant_id = participant_id + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if self._participant_id and isinstance(frame, TextFrame): + await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM) + await self.push_frame(frame, direction) + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Describe participant video", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer() + ) + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + user_response = UserResponseAggregator() + + image_requester = UserImageRequester() + + vision_aggregator = VisionImageFrameAggregator() + + anthropic = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), + model="claude-3-sonnet-20240229" + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await tts.say("Hi there! Feel free to ask me what I see.") + transport.capture_participant_video(participant["id"], framerate=0) + transport.capture_participant_transcription(participant["id"]) + image_requester.set_participant_id(participant["id"]) + + pipeline = Pipeline([ + transport.input(), + user_response, + image_requester, + vision_aggregator, + anthropic, + tts, + transport.output() + ]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner() + + await runner.run(task) + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/src/pipecat/services/anthropic.py b/src/pipecat/services/anthropic.py index 25620f783..2610e6dca 100644 --- a/src/pipecat/services/anthropic.py +++ b/src/pipecat/services/anthropic.py @@ -4,9 +4,24 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.frames.frames import Frame, LLMMessagesFrame, TextFrame +import os +import asyncio +import time +import base64 + +from pipecat.frames.frames import ( + Frame, + TextFrame, + VisionImageRawFrame, + LLMMessagesFrame, + LLMFullResponseStartFrame, + LLMResponseStartFrame, + LLMResponseEndFrame, + LLMFullResponseEndFrame +) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame from loguru import logger @@ -20,6 +35,12 @@ class AnthropicLLMService(LLMService): + """This class implements inference with Anthropic's AI models + + This service translates internally from OpenAILLMContext to the messages format + expected by the Anthropic Python SDK. We are using the OpenAILLMContext as a lingua + franca for all LLM services, so that it is easy to switch between different LLMs. + """ def __init__( self, @@ -27,11 +48,85 @@ def __init__( model="claude-3-opus-20240229", max_tokens=1024): super().__init__() - self.client = AsyncAnthropic(api_key=api_key) - self.model = model - self.max_tokens = max_tokens + self._client = AsyncAnthropic(api_key=api_key) + self._model = model + self._max_tokens = max_tokens + + def _get_messages_from_openai_context( + self, context: OpenAILLMContext): + openai_messages = context.get_messages() + anthropic_messages = [] + + for message in openai_messages: + role = message["role"] + text = message["content"] + if role == "system": + role = "user" + if message.get("mime_type") == "image/jpeg": + # vision frame + encoded_image = base64.b64encode(message["data"].getvalue()).decode("utf-8") + anthropic_messages.append({ + "role": role, + "content": [{ + "type": "image", + "source": { + "type": "base64", + "media_type": message.get("mime_type"), + "data": encoded_image, + } + }, { + "type": "text", + "text": text + }] + }) + else: + # text frame + anthropic_messages.append({"role": role, "content": content}) + + return anthropic_messages + + async def _process_context(self, context: OpenAILLMContext): + await self.push_frame(LLMFullResponseStartFrame()) + try: + logger.debug(f"Generating chat: {context.get_messages_json()}") + + messages = self._get_messages_from_openai_context(context) + + start_time = time.time() + response = await self._client.messages.create( + messages=messages, + model=self._model, + max_tokens=self._max_tokens, + stream=True) + logger.debug(f"Anthropic LLM TTFB: {time.time() - start_time}") + async for event in response: + # logger.debug(f"Anthropic LLM event: {event}") + if (event.type == "content_block_delta"): + await self.push_frame(LLMResponseStartFrame()) + await self.push_frame(TextFrame(event.delta.text)) + await self.push_frame(LLMResponseEndFrame()) + + except Exception as e: + logger.error(f"Exception: {e}") + finally: + await self.push_frame(LLMFullResponseEndFrame()) async def process_frame(self, frame: Frame, direction: FrameDirection): + context = None + + if isinstance(frame, OpenAILLMContextFrame): + context: OpenAILLMContext = frame.context + elif isinstance(frame, LLMMessagesFrame): + context = OpenAILLMContext.from_messages(frame.messages) + elif isinstance(frame, VisionImageRawFrame): + context = OpenAILLMContext.from_image_frame(frame) + else: + await self.push_frame(frame, direction) + + if context: + await self._process_context(context) + + async def x_process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, LLMMessagesFrame): stream = await self.client.messages.create( max_tokens=self.max_tokens,