From 70d8e3704fff6e024821ce262e16d78417f6412c Mon Sep 17 00:00:00 2001 From: saudsami Date: Thu, 3 Oct 2024 09:11:41 +0500 Subject: [PATCH] realtimeapi code --- shared/open-ai-integration/complete-code.mdx | 267 ++++++++++--------- shared/open-ai-integration/quickstart.mdx | 31 +-- 2 files changed, 148 insertions(+), 150 deletions(-) diff --git a/shared/open-ai-integration/complete-code.mdx b/shared/open-ai-integration/complete-code.mdx index d04d52304..9bb9e0f0e 100644 --- a/shared/open-ai-integration/complete-code.mdx +++ b/shared/open-ai-integration/complete-code.mdx @@ -591,129 +591,6 @@ Your knowledge cutoff is 2023-10. You are a helpful, witty, and friendly AI. Act -
-`realtimeapi/connection.py` - -{`import asyncio -import base64 -import json -import logging -import os -import aiohttp - -from typing import Any, AsyncGenerator -from .struct import InputAudioBufferAppend, ClientToServerMessage, ServerToClientMessage, parse_server_message, to_json -from ..logger import setup_logger - -# Set up the logger with color and timestamp support -logger = setup_logger(name=__name__, log_level=logging.INFO) - - -DEFAULT_VIRTUAL_MODEL = "gpt-4o-realtime-preview" - -def smart_str(s: str, max_field_len: int = 128) -> str: - """parse string as json, truncate data field to 128 characters, reserialize""" - try: - data = json.loads(s) - if "delta" in data: - key = "delta" - elif "audio" in data: - key = "audio" - else: - return s - - if len(data[key]) > max_field_len: - data[key] = data[key][:max_field_len] + "..." - return json.dumps(data) - except json.JSONDecodeError: - return s - - -class RealtimeApiConnection: - def __init__( - self, - base_uri: str, - api_key: str | None = None, - path: str = "/v1/realtime", - verbose: bool = False, - model: str = DEFAULT_VIRTUAL_MODEL, - ): - - self.url = f"{base_uri}{path}" - if "model=" not in self.url: - self.url += f"?model={model}" - - self.api_key = api_key or os.environ.get("OPENAI_API_KEY") - self.websocket: aiohttp.ClientWebSocketResponse | None = None - self.verbose = verbose - self.session = aiohttp.ClientSession() - - async def __aenter__(self) -> "RealtimeApiConnection": - await self.connect() - return self - - async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool: - await self.close() - return False - - async def connect(self): - auth = aiohttp.BasicAuth("", self.api_key) if self.api_key else None - - headers = {"OpenAI-Beta": "realtime=v1"} - - self.websocket = await self.session.ws_connect( - url=self.url, - auth=auth, - headers=headers, - ) - - async def send_audio_data(self, audio_data: bytes): - """audio_data is assumed to be pcm16 24kHz mono little-endian""" - base64_audio_data = base64.b64encode(audio_data).decode("utf-8") - message = InputAudioBufferAppend(audio=base64_audio_data) - await self.send_request(message) - - async def send_request(self, message: ClientToServerMessage): - assert self.websocket is not None - message_str = to_json(message) - if self.verbose: - logger.info(f"-> {smart_str(message_str)}") - await self.websocket.send_str(message_str) - - - - async def listen(self) -> AsyncGenerator[ServerToClientMessage, None]: - assert self.websocket is not None - if self.verbose: - logger.info("Listening for realtimeapi messages") - try: - async for msg in self.websocket: - if msg.type == aiohttp.WSMsgType.TEXT: - if self.verbose: - logger.info(f"<- {smart_str(msg.data)}") - yield self.handle_server_message(msg.data) - elif msg.type == aiohttp.WSMsgType.ERROR: - logger.error("Error during receive: %s", self.websocket.exception()) - break - except asyncio.CancelledError: - logger.info("Receive messages task cancelled") - - def handle_server_message(self, message: str) -> ServerToClientMessage: - try: - return parse_server_message(message) - except Exception as e: - logger.error("Error handling message: " + str(e)) - raise e - - async def close(self): - # Close the websocket connection if it exists - if self.websocket: - await self.websocket.close() - self.websocket = None -`} - -
-
`tools.py` @@ -1019,6 +896,130 @@ def parse_args_realtimekit() -> RealtimeKitOptions:
+ +
+`realtimeapi/connection.py` + +{`import asyncio +import base64 +import json +import logging +import os +import aiohttp + +from typing import Any, AsyncGenerator +from .struct import InputAudioBufferAppend, ClientToServerMessage, ServerToClientMessage, parse_server_message, to_json +from ..logger import setup_logger + +# Set up the logger with color and timestamp support +logger = setup_logger(name=__name__, log_level=logging.INFO) + + +DEFAULT_VIRTUAL_MODEL = "gpt-4o-realtime-preview" + +def smart_str(s: str, max_field_len: int = 128) -> str: + """parse string as json, truncate data field to 128 characters, reserialize""" + try: + data = json.loads(s) + if "delta" in data: + key = "delta" + elif "audio" in data: + key = "audio" + else: + return s + + if len(data[key]) > max_field_len: + data[key] = data[key][:max_field_len] + "..." + return json.dumps(data) + except json.JSONDecodeError: + return s + + +class RealtimeApiConnection: + def __init__( + self, + base_uri: str, + api_key: str | None = None, + path: str = "/v1/realtime", + verbose: bool = False, + model: str = DEFAULT_VIRTUAL_MODEL, + ): + + self.url = f"{base_uri}{path}" + if "model=" not in self.url: + self.url += f"?model={model}" + + self.api_key = api_key or os.environ.get("OPENAI_API_KEY") + self.websocket: aiohttp.ClientWebSocketResponse | None = None + self.verbose = verbose + self.session = aiohttp.ClientSession() + + async def __aenter__(self) -> "RealtimeApiConnection": + await self.connect() + return self + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool: + await self.close() + return False + + async def connect(self): + auth = aiohttp.BasicAuth("", self.api_key) if self.api_key else None + + headers = {"OpenAI-Beta": "realtime=v1"} + + self.websocket = await self.session.ws_connect( + url=self.url, + auth=auth, + headers=headers, + ) + + async def send_audio_data(self, audio_data: bytes): + """audio_data is assumed to be pcm16 24kHz mono little-endian""" + base64_audio_data = base64.b64encode(audio_data).decode("utf-8") + message = InputAudioBufferAppend(audio=base64_audio_data) + await self.send_request(message) + + async def send_request(self, message: ClientToServerMessage): + assert self.websocket is not None + message_str = to_json(message) + if self.verbose: + logger.info(f"-> {smart_str(message_str)}") + await self.websocket.send_str(message_str) + + + + async def listen(self) -> AsyncGenerator[ServerToClientMessage, None]: + assert self.websocket is not None + if self.verbose: + logger.info("Listening for realtimeapi messages") + try: + async for msg in self.websocket: + if msg.type == aiohttp.WSMsgType.TEXT: + if self.verbose: + logger.info(f"<- {smart_str(msg.data)}") + yield self.handle_server_message(msg.data) + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.error("Error during receive: %s", self.websocket.exception()) + break + except asyncio.CancelledError: + logger.info("Receive messages task cancelled") + + def handle_server_message(self, message: str) -> ServerToClientMessage: + try: + return parse_server_message(message) + except Exception as e: + logger.error("Error handling message: " + str(e)) + raise e + + async def close(self): + # Close the websocket connection if it exists + if self.websocket: + await self.websocket.close() + self.websocket = None +`} + +
+
`realtimeapi/struct.py` @@ -1119,13 +1120,13 @@ class SessionUpdateParams: model: Optional[str] = None # Optional string to specify the model modalities: Optional[Set[str]] = None # Set of allowed modalities (e.g., "text", "audio") instructions: Optional[str] = None # Optional instructions string - voice: Optional[Voices] = None # Voice selection, can be `None` or from `Voices` Enum + voice: Optional[Voices] = None # Voice selection, can be \`None\` or from \`Voices\` Enum turn_detection: Optional[ServerVADUpdateParams] = None # Server VAD update params - input_audio_format: Optional[AudioFormats] = None # Input audio format from `AudioFormats` Enum - output_audio_format: Optional[AudioFormats] = None # Output audio format from `AudioFormats` Enum + input_audio_format: Optional[AudioFormats] = None # Input audio format from \`AudioFormats\` Enum + output_audio_format: Optional[AudioFormats] = None # Output audio format from \`AudioFormats\` Enum input_audio_transcription: Optional[InputAudioTranscription] = None # Optional transcription model tools: Optional[List[Dict[str, Union[str, any]]]] = None # List of tools (e.g., dictionaries) - tool_choice: Optional[ToolChoice] = None # ToolChoice, either string or `FunctionToolChoice` + tool_choice: Optional[ToolChoice] = None # ToolChoice, either string or \`FunctionToolChoice\` temperature: Optional[float] = None # Optional temperature for response generation max_response_output_tokens: Optional[Union[int, str]] = None # Max response tokens, "inf" for infinite @@ -1568,7 +1569,7 @@ class InputAudioBufferClear(ClientToServerMessage): @dataclass class ItemCreate(ClientToServerMessage): - item: Optional[ItemParam] = field(default=None) # Assuming `ItemParam` is already defined + item: Optional[ItemParam] = field(default=None) # Assuming \`ItemParam\` is already defined type: str = EventType.ITEM_CREATE previous_item_id: Optional[str] = None @@ -1605,7 +1606,7 @@ class ResponseCreateParams: @dataclass class ResponseCreate(ClientToServerMessage): type: str = EventType.RESPONSE_CREATE - response: Optional[ResponseCreateParams] = None # Assuming `ResponseCreateParams` is defined + response: Optional[ResponseCreateParams] = None # Assuming \`ResponseCreateParams\` is defined @dataclass @@ -1631,7 +1632,7 @@ class UpdateConversationConfig(ClientToServerMessage): @dataclass class SessionUpdate(ClientToServerMessage): - session: Optional[SessionUpdateParams] = field(default=None) # Assuming `SessionUpdateParams` is defined + session: Optional[SessionUpdateParams] = field(default=None) # Assuming \`SessionUpdateParams\` is defined type: str = EventType.SESSION_UPDATE @@ -1662,7 +1663,7 @@ def from_dict(data_class, data): def parse_client_message(unparsed_string: str) -> ClientToServerMessage: data = json.loads(unparsed_string) - # Dynamically select the correct message class based on the `type` field, using from_dict + # Dynamically select the correct message class based on the \`type\` field, using from_dict if data["type"] == EventType.INPUT_AUDIO_BUFFER_APPEND: return from_dict(InputAudioBufferAppend, data) elif data["type"] == EventType.INPUT_AUDIO_BUFFER_COMMIT: @@ -1688,12 +1689,12 @@ def parse_client_message(unparsed_string: str) -> ClientToServerMessage: # Assuming all necessary classes and enums (EventType, ServerToClientMessages, etc.) are imported -# Here’s how you can dynamically parse a server-to-client message based on the `type` field: +# Here’s how you can dynamically parse a server-to-client message based on the \`type\` field: def parse_server_message(unparsed_string: str) -> ServerToClientMessage: data = json.loads(unparsed_string) - # Dynamically select the correct message class based on the `type` field, using from_dict + # Dynamically select the correct message class based on the \`type\` field, using from_dict if data["type"] == EventType.ERROR: return from_dict(ErrorMessage, data) elif data["type"] == EventType.SESSION_CREATED: diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 6a4710ef5..db3456dbe 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -162,6 +162,7 @@ Overview of key files: - `tools.py`: Classes for registering and invoking tools. - `utils.py`: Provides utilities that facilitate passing audio data between Agora and OpenAI. - `parse_args.py`: Parses the command-line arguments used to customize the channel name and user ID when running script. +- `logger.py`: Helper functions for logging. - `realtimeapi/`: Contains the classes and methods that interact with OpenAI's Realtime API. The [complete code](#complete-integration-code) for files in the `realtime_agent` folder is provided at the bottom of this page. @@ -771,9 +772,9 @@ class PCMWriter: ## OpenAI Connection -The connection.py file manages the real-time communication between the Agent and OpenAI’s API. It handles the connection setup, sending and receiving messages, and managing audio data streaming. The RealtimeApiConnection class encapsulates all the connection logic, making it easier to integrate real-time AI responses. +The `connection.py` file manages the real-time communication between the agent and OpenAI’s API. It handles the connection setup, sending and receiving messages, and managing audio data streaming. The `RealtimeApiConnection` class encapsulates all the connection logic, making it easier to integrate real-time AI responses. -Open `realtimeapi/connection.py` and add the imports and the `smart_str` function, used to parse JSON data, truncate fields (like delta or audio) to a maximum length for logging purposes, and then reserialize the modified data. +Open `realtimeapi/connection.py` and add the imports and the `smart_str` function, used to parse JSON data, truncate fields (like delta or audio) to a maximum length for logging purposes, and then re-serialize the modified data. ```python import asyncio @@ -806,9 +807,9 @@ def smart_str(s: str, max_field_len: int = 128) -> str: return s ``` -### RealtimeApiConnection Class +### RealtimeApiConnection class -The `RealtimeApiConnection` class manages the real-time API connection. During initialization the OpenAI key, API URL (includes model), and authentication token are passed to the client and the WebSocket session is initialized. The connect method establishes a WebSocket connection to the specified URL using authentication headers. The close method ensures that the WebSocket connection is closed gracefully, preventing resource leaks. This connection lifecycle management is crucial for handling long-running WebSocket sessions in real-time applications. +The `RealtimeApiConnection` class manages the real-time API connection. During initialization the OpenAI key, API URL (includes model), and authentication token are passed to the client and the WebSocket session is initialized. The `connect` method establishes a WebSocket connection to the specified URL using authentication headers. The `close` method ensures that the WebSocket connection is closed gracefully, preventing resource leaks. This connection lifecycle management is crucial for handling long-running WebSocket sessions in real-time applications. ```python class RealtimeApiConnection: @@ -849,9 +850,9 @@ class RealtimeApiConnection: self.websocket = None ``` -#### Context Manager for Connection Lifecycle +#### Context manager for connection lifecycle -These methods allow the RealtimeApiConnection class to be used as an asynchronous context manager, ensuring that the connection is opened when entering the context and properly closed when exiting. This pattern simplifies resource management, especially for long-lived connections in asynchronous workflows. +These methods allow the `RealtimeApiConnection` class to be used as an asynchronous context manager, ensuring that the connection is opened when entering the context and properly closed when exiting. This pattern simplifies resource management, especially for long-lived connections in asynchronous workflows. ```python async def __aenter__(self) -> "RealtimeApiConnection": @@ -863,9 +864,9 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool return False ``` -#### Sending Audio Data and Messages +#### Sending audio data and messages -The send_audio_data method sends audio data (encoded in base64) over the WebSocket. It packages the audio data into a ClientToServerMessage and calls send_request to transmit it. The send_request method logs the outgoing message (if verbose logging is enabled) and sends it through the WebSocket connection. +The `send_audio_data` method sends audio data (encoded in base64) over the WebSocket. It packages the audio data into a `ClientToServerMessage` and calls `send_request` to transmit it. The `send_request` method logs the outgoing message (if verbose logging is enabled) and sends it through the WebSocket connection. ```python async def send_audio_data(self, audio_data: bytes): @@ -882,9 +883,9 @@ async def send_request(self, message: ClientToServerMessage): ``` -### Listening for Incoming Messages +### Listening for incoming messages -The listen method listens for incoming messages from the WebSocket. It uses an asynchronous generator to handle incoming messages in a non-blocking way. Depending on the message type (text or error), it processes the message and passes it to handle_server_message. If verbose logging is enabled, incoming messages are logged for easier debugging. +The `listen` method listens for incoming messages from the WebSocket. It uses an asynchronous generator to handle incoming messages in a non-blocking way. Depending on the message type (text or error), it processes the message and passes it to `handle_server_message`. If verbose logging is enabled, incoming messages are logged to facilitate debugging. ```python async def listen(self) -> AsyncGenerator[ServerToClientMessage, None]: @@ -904,9 +905,9 @@ async def listen(self) -> AsyncGenerator[ServerToClientMessage, None]: logger.info("Receive messages task cancelled") ``` -### Handling Server Messages +### Handling server messages -The handle_server_message method parses the server’s message and handles any exceptions that occur during parsing. This method ensures that malformed messages are logged as errors, helping to track down issues with the server response format. +The `handle_server_message` method parses the server’s message and handles any exceptions that occur during parsing. This method ensures that malformed messages are logged as errors, helping to track down issues with the server response format. ```python def handle_server_message(self, message: str) -> ServerToClientMessage: @@ -919,7 +920,7 @@ def handle_server_message(self, message: str) -> ServerToClientMessage: #### Structs.py -The `connection` class and `agent` utilize various classes and structures defined in `realtimeapi/structs.py`. While the specifics of this file are outside the scope of this guide, the complete code is provided below for reference. +The `connection` class and agent utilize various classes and structures defined in `realtimeapi/structs.py`. While the specifics of this file are beyond the scope of this guide, the [complete code](#complete-integration-code) is provided below for use and reference. ### Tool Management @@ -1405,10 +1406,6 @@ Your knowledge cutoff is 2023-10. You are a helpful, witty, and friendly AI. Act ``` -## Parse Args and Logger - -This project implements a few helper functions to support in parsing command line arguments and to handle logging. While the specifics of these files are outside the scope of this guide, the complete code is provided below for reference. - ## Test the Code