diff --git a/apps/opik-documentation/documentation/docs/cookbook/bedrock.ipynb b/apps/opik-documentation/documentation/docs/cookbook/bedrock.ipynb new file mode 100644 index 000000000..4392736a8 --- /dev/null +++ b/apps/opik-documentation/documentation/docs/cookbook/bedrock.ipynb @@ -0,0 +1,309 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Opik with AWS Bedrock\n", + "\n", + "Opik integrates with AWS Bedrock to provide a simple way to log traces for all Bedrock LLM calls. This works for all supported models, including if you are using the streaming API.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating an account on Comet.com\n", + "\n", + "[Comet](https://www.comet.com/site?from=llm&utm_source=opik&utm_medium=colab&utm_content=bedrock&utm_campaign=opik) provides a hosted version of the Opik platform, [simply create an account](https://www.comet.com/signup?from=llm&utm_source=opik&utm_medium=colab&utm_content=bedrock&utm_campaign=opik) and grab you API Key.\n", + "\n", + "> You can also run the Opik platform locally, see the [installation guide](https://www.comet.com/docs/opik/self-host/overview/?from=llm&utm_source=opik&utm_medium=colab&utm_content=bedrock&utm_campaign=opik) for more information." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install --upgrade opik boto3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import opik\n", + "\n", + "opik.configure(use_local=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Preparing our environment\n", + "\n", + "First, we will set up our bedrock client. Uncomment the following lines to pass AWS Credentials manually or [checkout other ways of passing credentials to Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html). You will also need to request access to the model in the UI before being able to generate text, here we are gonna use the Llama 3.2 model, you can request access to it in [this page for the us-east1](https://us-east-1.console.aws.amazon.com/bedrock/home?region=us-east-1#/providers?model=meta.llama3-2-3b-instruct-v1:0) region." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "\n", + "REGION = \"us-east-1\"\n", + "\n", + "MODEL_ID = \"us.meta.llama3-2-3b-instruct-v1:0\"\n", + "\n", + "bedrock = boto3.client(\n", + " service_name=\"bedrock-runtime\",\n", + " region_name=REGION,\n", + " # aws_access_key_id=ACCESS_KEY,\n", + " # aws_secret_access_key=SECRET_KEY,\n", + " # aws_session_token=SESSION_TOKEN,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Logging traces\n", + "\n", + "In order to log traces to Opik, we need to wrap our Bedrock calls with the `track_bedrock` function:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "from opik.integrations.bedrock import track_bedrock\n", + "\n", + "bedrock_client = track_bedrock(bedrock, project_name=\"bedrock-integration-demo\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "PROMPT = \"Why is it important to use a LLM Monitoring like CometML Opik tool that allows you to log traces and spans when working with LLM Models hosted on AWS Bedrock?\"\n", + "\n", + "response = bedrock_client.converse(\n", + " modelId=MODEL_ID,\n", + " messages=[{\"role\": \"user\", \"content\": [{\"text\": PROMPT}]}],\n", + " inferenceConfig={\"temperature\": 0.5, \"maxTokens\": 512, \"topP\": 0.9},\n", + ")\n", + "print(\"Response\", response[\"output\"][\"message\"][\"content\"][0][\"text\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The prompt and response messages are automatically logged to Opik and can be viewed in the UI.\n", + "\n", + "![Bedrock Integration](https://raw.githubusercontent.com/comet-ml/opik/main/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_cookbook.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Logging traces with streaming" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def stream_conversation(\n", + " bedrock_client,\n", + " model_id,\n", + " messages,\n", + " system_prompts,\n", + " inference_config,\n", + "):\n", + " \"\"\"\n", + " Sends messages to a model and streams the response.\n", + " Args:\n", + " bedrock_client: The Boto3 Bedrock runtime client.\n", + " model_id (str): The model ID to use.\n", + " messages (JSON) : The messages to send.\n", + " system_prompts (JSON) : The system prompts to send.\n", + " inference_config (JSON) : The inference configuration to use.\n", + " additional_model_fields (JSON) : Additional model fields to use.\n", + "\n", + " Returns:\n", + " Nothing.\n", + "\n", + " \"\"\"\n", + "\n", + " response = bedrock_client.converse_stream(\n", + " modelId=model_id,\n", + " messages=messages,\n", + " system=system_prompts,\n", + " inferenceConfig=inference_config,\n", + " )\n", + "\n", + " stream = response.get(\"stream\")\n", + " if stream:\n", + " for event in stream:\n", + " if \"messageStart\" in event:\n", + " print(f\"\\nRole: {event['messageStart']['role']}\")\n", + "\n", + " if \"contentBlockDelta\" in event:\n", + " print(event[\"contentBlockDelta\"][\"delta\"][\"text\"], end=\"\")\n", + "\n", + " if \"messageStop\" in event:\n", + " print(f\"\\nStop reason: {event['messageStop']['stopReason']}\")\n", + "\n", + " if \"metadata\" in event:\n", + " metadata = event[\"metadata\"]\n", + " if \"usage\" in metadata:\n", + " print(\"\\nToken usage\")\n", + " print(f\"Input tokens: {metadata['usage']['inputTokens']}\")\n", + " print(f\":Output tokens: {metadata['usage']['outputTokens']}\")\n", + " print(f\":Total tokens: {metadata['usage']['totalTokens']}\")\n", + " if \"metrics\" in event[\"metadata\"]:\n", + " print(f\"Latency: {metadata['metrics']['latencyMs']} milliseconds\")\n", + "\n", + "\n", + "system_prompt = \"\"\"You are an app that creates playlists for a radio station\n", + " that plays rock and pop music. Only return song names and the artist.\"\"\"\n", + "\n", + "# Message to send to the model.\n", + "input_text = \"Create a list of 3 pop songs.\"\n", + "\n", + "\n", + "message = {\"role\": \"user\", \"content\": [{\"text\": input_text}]}\n", + "messages = [message]\n", + "\n", + "# System prompts.\n", + "system_prompts = [{\"text\": system_prompt}]\n", + "\n", + "# inference parameters to use.\n", + "temperature = 0.5\n", + "top_p = 0.9\n", + "# Base inference parameters.\n", + "inference_config = {\"temperature\": temperature, \"topP\": 0.9}\n", + "\n", + "\n", + "stream_conversation(\n", + " bedrock_client,\n", + " MODEL_ID,\n", + " messages,\n", + " system_prompts,\n", + " inference_config,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Bedrock Integration](https://raw.githubusercontent.com/comet-ml/opik/main/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_streaming_cookbook.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using it with the `track` decorator\n", + "\n", + "If you have multiple steps in your LLM pipeline, you can use the `track` decorator to log the traces for each step. If Bedrock is called within one of these steps, the LLM call with be associated with that corresponding step:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from opik import track\n", + "from opik.integrations.bedrock import track_bedrock\n", + "\n", + "bedrock = boto3.client(\n", + " service_name=\"bedrock-runtime\",\n", + " region_name=REGION,\n", + " # aws_access_key_id=ACCESS_KEY,\n", + " # aws_secret_access_key=SECRET_KEY,\n", + " # aws_session_token=SESSION_TOKEN,\n", + ")\n", + "\n", + "os.environ[\"OPIK_PROJECT_NAME\"] = \"bedrock-integration-demo\"\n", + "bedrock_client = track_bedrock(bedrock)\n", + "\n", + "\n", + "@track\n", + "def generate_story(prompt):\n", + " res = bedrock_client.converse(\n", + " modelId=MODEL_ID, messages=[{\"role\": \"user\", \"content\": [{\"text\": prompt}]}]\n", + " )\n", + " return res[\"output\"][\"message\"][\"content\"][0][\"text\"]\n", + "\n", + "\n", + "@track\n", + "def generate_topic():\n", + " prompt = \"Generate a topic for a story about Opik.\"\n", + " res = bedrock_client.converse(\n", + " modelId=MODEL_ID, messages=[{\"role\": \"user\", \"content\": [{\"text\": prompt}]}]\n", + " )\n", + " return res[\"output\"][\"message\"][\"content\"][0][\"text\"]\n", + "\n", + "\n", + "@track\n", + "def generate_opik_story():\n", + " topic = generate_topic()\n", + " story = generate_story(topic)\n", + " return story\n", + "\n", + "\n", + "generate_opik_story()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The trace can now be viewed in the UI:\n", + "\n", + "![Bedrock Integration](https://raw.githubusercontent.com/comet-ml/opik/main/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_decorator_cookbook.png)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb b/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb index 8cc4052ff..653b07b76 100644 --- a/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb +++ b/apps/opik-documentation/documentation/docs/cookbook/openai.ipynb @@ -26,7 +26,7 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install --upgrade --quiet opik openai" + "%pip install --upgrade opik openai" ] }, { @@ -81,10 +81,17 @@ "from openai import OpenAI\n", "\n", "os.environ[\"OPIK_PROJECT_NAME\"] = \"openai-integration-demo\"\n", - "client = OpenAI()\n", - "\n", - "openai_client = track_openai(client)\n", "\n", + "client = OpenAI()\n", + "openai_client = track_openai(client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "prompt = \"\"\"\n", "Write a short two sentence story about Opik.\n", "\"\"\"\n", diff --git a/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_cookbook.png b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_cookbook.png new file mode 100644 index 000000000..13e80d762 Binary files /dev/null and b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_cookbook.png differ diff --git a/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_decorator_cookbook.png b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_decorator_cookbook.png new file mode 100644 index 000000000..e42c292c4 Binary files /dev/null and b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_decorator_cookbook.png differ diff --git a/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_streaming_cookbook.png b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_streaming_cookbook.png new file mode 100644 index 000000000..77c487f0d Binary files /dev/null and b/apps/opik-documentation/documentation/static/img/cookbook/bedrock_trace_streaming_cookbook.png differ diff --git a/sdks/python/src/opik/api_objects/span.py b/sdks/python/src/opik/api_objects/span.py index 100669a7e..ec75b1501 100644 --- a/sdks/python/src/opik/api_objects/span.py +++ b/sdks/python/src/opik/api_objects/span.py @@ -8,6 +8,7 @@ from ..message_processing import streamer, messages from .. import datetime_helpers from . import helpers, validation_helpers, constants +from opik import dict_utils LOGGER = logging.getLogger(__name__) @@ -225,6 +226,13 @@ def get_distributed_trace_headers(self) -> DistributedTraceHeadersDict: return {"opik_parent_span_id": self.id, "opik_trace_id": self.trace_id} +# Engineer note: +# +# After moving to minimal python version 3.10, a lot of common content +# from SpanData and TraceData can be moved to ObservationData parent dataclass. +# Before that it's impossible because of the dataclasses limitation to have optional arguments +# strictly after positional ones (including the attributes from the parent class). +# In python 3.10 @dataclass(kw_only=True) should help. @dataclasses.dataclass class SpanData: """ @@ -250,17 +258,30 @@ class SpanData: def update(self, **new_data: Any) -> "SpanData": for key, value in new_data.items(): - if value is not None: - if key in self.__dict__: - self.__dict__[key] = value - else: - LOGGER.debug( - "An attempt to update span with parameter name it doesn't have: %s", - key, - ) + if value is None: + continue + + if key not in self.__dict__: + LOGGER.debug( + "An attempt to update span with parameter name it doesn't have: %s", + key, + ) + continue + + if key == "metadata": + self._update_metadata(value) + continue + + self.__dict__[key] = value return self + def _update_metadata(self, new_metadata: Dict[str, Any]) -> None: + if self.metadata is None: + self.metadata = new_metadata + else: + self.metadata = dict_utils.deepmerge(self.metadata, new_metadata) + def init_end_time(self) -> "SpanData": self.end_time = datetime_helpers.local_timestamp() diff --git a/sdks/python/src/opik/api_objects/trace.py b/sdks/python/src/opik/api_objects/trace.py index f03285fe3..3a80c210c 100644 --- a/sdks/python/src/opik/api_objects/trace.py +++ b/sdks/python/src/opik/api_objects/trace.py @@ -7,6 +7,7 @@ from .. import datetime_helpers from ..message_processing import messages, streamer from ..types import CreatedByType, FeedbackScoreDict, SpanType, UsageDict +from opik import dict_utils LOGGER = logging.getLogger(__name__) @@ -199,6 +200,13 @@ def log_feedback_score( self._streamer.put(add_trace_feedback_batch_message) +# Engineer note: +# +# After moving to minimal python version 3.10, a lot of common content +# from SpanData and TraceData can be moved to ObservationData parent dataclass. +# Before that it's impossible because of the dataclasses limitation to have optional arguments +# strictly after positional ones (including the attributes from the parent class). +# In python 3.10 @dataclass(kw_only=True) should help. @dataclasses.dataclass class TraceData: """ @@ -221,17 +229,30 @@ class TraceData: def update(self, **new_data: Any) -> "TraceData": for key, value in new_data.items(): - if value is not None: - if key in self.__dict__: - self.__dict__[key] = value - else: - LOGGER.debug( - "An attempt to update trace with parameter name it doesn't have: %s", - key, - ) + if value is None: + continue + + if key not in self.__dict__: + LOGGER.debug( + "An attempt to update span with parameter name it doesn't have: %s", + key, + ) + continue + + if key == "metadata": + self._update_metadata(value) + continue + + self.__dict__[key] = value return self + def _update_metadata(self, new_metadata: Dict[str, Any]) -> None: + if self.metadata is None: + self.metadata = new_metadata + else: + self.metadata = dict_utils.deepmerge(self.metadata, new_metadata) + def init_end_time(self) -> "TraceData": self.end_time = datetime_helpers.local_timestamp() return self diff --git a/sdks/python/src/opik/decorator/base_track_decorator.py b/sdks/python/src/opik/decorator/base_track_decorator.py index 4da7c8010..db25b6932 100644 --- a/sdks/python/src/opik/decorator/base_track_decorator.py +++ b/sdks/python/src/opik/decorator/base_track_decorator.py @@ -31,6 +31,15 @@ class BaseTrackDecorator(abc.ABC): All TrackDecorator instances share the same context and can be used together simultaneously. + + The following methods MUST be implemented in the subclass: + * _start_span_inputs_preprocessor + * _end_span_inputs_preprocessor + + The following methods CAN be overriden in the subclass: + * _generators_handler + + Overriding other methods of this class is not recommended. """ def track( @@ -186,13 +195,13 @@ def wrapper(*args, **kwargs) -> Any: # type: ignore ) raise exception finally: - generator = self._generators_handler( + generator_or_generator_container = self._generators_handler( result, capture_output, generations_aggregator, ) - if generator is not None: - return generator + if generator_or_generator_container is not None: + return generator_or_generator_container self._after_call( output=result, @@ -202,6 +211,8 @@ def wrapper(*args, **kwargs) -> Any: # type: ignore if result is not None: return result + wrapper.opik_tracked = True # type: ignore + return wrapper def _tracked_async( @@ -259,6 +270,7 @@ async def wrapper(*args, **kwargs) -> Any: # type: ignore if result is not None: return result + wrapper.opik_tracked = True # type: ignore return wrapper def _before_call( @@ -448,11 +460,13 @@ def _after_call( generators_span_to_end, generators_trace_to_end, ) + + client = opik_client.get_client_cached() + span_data_to_end.init_end_time().update( **end_arguments.to_kwargs(), ) - client = opik_client.get_client_cached() client.span(**span_data_to_end.__dict__) if trace_data_to_end is not None: @@ -479,6 +493,9 @@ def _generators_handler( capture_output: bool, generations_aggregator: Optional[Callable[[List[Any]], str]], ) -> Optional[Union[Generator, AsyncGenerator]]: + """ + Subclasses can override this method to customize generator objects handling + """ if inspect.isgenerator(output): span_to_end, trace_to_end = pop_end_candidates() # For some reason mypy things wrap_sync_generator returns Any diff --git a/sdks/python/src/opik/dict_utils.py b/sdks/python/src/opik/dict_utils.py index 77c1e67c4..0346d1399 100644 --- a/sdks/python/src/opik/dict_utils.py +++ b/sdks/python/src/opik/dict_utils.py @@ -1,6 +1,6 @@ import copy import logging -from typing import Any, Dict, Mapping, Optional +from typing import Any, Dict, Mapping, Optional, List, Tuple from . import logging_messages @@ -43,5 +43,13 @@ def remove_none_from_dict(original: Mapping[str, Optional[Any]]) -> Mapping[str, return new +def split_dict_by_keys(input_dict: Dict, keys: List) -> Tuple[Dict, Dict]: + subset_dict = {key: input_dict[key] for key in keys if key in input_dict} + remaining_dict = { + key: value for key, value in input_dict.items() if key not in subset_dict + } + return subset_dict, remaining_dict + + def _is_dict(item: Any) -> bool: return isinstance(item, dict) diff --git a/sdks/python/src/opik/integrations/bedrock/__init__.py b/sdks/python/src/opik/integrations/bedrock/__init__.py new file mode 100644 index 000000000..996bad7ec --- /dev/null +++ b/sdks/python/src/opik/integrations/bedrock/__init__.py @@ -0,0 +1,3 @@ +from .opik_tracker import track_bedrock + +__all__ = ["track_bedrock"] diff --git a/sdks/python/src/opik/integrations/bedrock/chunks_aggregator.py b/sdks/python/src/opik/integrations/bedrock/chunks_aggregator.py new file mode 100644 index 000000000..d17cd57d1 --- /dev/null +++ b/sdks/python/src/opik/integrations/bedrock/chunks_aggregator.py @@ -0,0 +1,37 @@ +import logging +from typing import Any, List, Dict + +LOGGER = logging.getLogger(__name__) + + +def aggregate(items: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Implementation is based on the following AWS example (see the section `Conversation with streaming example`). + https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference-examples.html + """ + + result: Dict[str, Any] = { + "output": {"message": {"role": "assistant", "content": [{"text": ""}]}} + } + + for event in items: + if "messageStart" in event: + result["output"]["message"]["role"] = event["messageStart"]["role"] + + if "contentBlockDelta" in event: + result["output"]["message"]["content"][0]["text"] += event[ + "contentBlockDelta" + ]["delta"]["text"] + + if "messageStop" in event: + result["stopReason"] = event["messageStop"]["stopReason"] + + if "metadata" in event: + metadata = event["metadata"] + if "usage" in metadata: + result["usage"] = metadata["usage"] + if "metrics" in event["metadata"]: + result["metrics"] = {} + result["metrics"]["latencyMs"] = metadata["metrics"]["latencyMs"] + + return result diff --git a/sdks/python/src/opik/integrations/bedrock/converse_decorator.py b/sdks/python/src/opik/integrations/bedrock/converse_decorator.py new file mode 100644 index 000000000..8001c2578 --- /dev/null +++ b/sdks/python/src/opik/integrations/bedrock/converse_decorator.py @@ -0,0 +1,124 @@ +import logging +from typing import List, Any, Dict, Optional, Callable, Tuple, Union, TypedDict, cast +from opik import dict_utils +from opik.types import SpanType +from opik.decorator import base_track_decorator, arguments_helpers + +from . import stream_wrappers + +from botocore import eventstream + +LOGGER = logging.getLogger(__name__) + +KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages", "system", "toolConfig", "guardrailConfig"] +RESPONSE_KEYS_TO_LOG_AS_OUTPUTS = ["output"] + +BedrockResponseWithStream = Dict[str, Any] + + +class ConverseStreamOutput(TypedDict): + stream: eventstream.EventStream + ResponseMetadata: Dict[str, Any] + + +class BedrockConverseDecorator(base_track_decorator.BaseTrackDecorator): + """ + An implementation of BaseTrackDecorator designed specifically for tracking + calls of AWS bedrock client `converse` and `converse_stream` function. + + Besides special processing for input arguments and response content, it + overrides _generators_handler() method to work correctly with bedrock's streams + """ + + def _start_span_inputs_preprocessor( + self, + func: Callable, + name: Optional[str], + type: SpanType, + tags: Optional[List[str]], + metadata: Optional[Dict[str, Any]], + capture_input: bool, + args: Optional[Tuple], + kwargs: Optional[Dict[str, Any]], + project_name: Optional[str], + ) -> arguments_helpers.StartSpanParameters: + assert ( + kwargs is not None + ), "Expected kwargs to be not None in BedrockRuntime.Client.converse(**kwargs)" + + name = name if name is not None else func.__name__ + input, metadata = dict_utils.split_dict_by_keys( + kwargs, KWARGS_KEYS_TO_LOG_AS_INPUTS + ) + metadata["created_from"] = "bedrock" + tags = ["bedrock"] + + result = arguments_helpers.StartSpanParameters( + name=name, + input=input, + type=type, + tags=tags, + metadata=metadata, + project_name=project_name, + ) + + return result + + def _end_span_inputs_preprocessor( + self, output: Any, capture_output: bool + ) -> arguments_helpers.EndSpanParameters: + usage = output["usage"] + usage_in_openai_format = { + "prompt_tokens": usage["inputTokens"], + "completion_tokens": usage["outputTokens"], + "total_tokens": usage["inputTokens"] + usage["outputTokens"], + } + + output, metadata = dict_utils.split_dict_by_keys( + output, RESPONSE_KEYS_TO_LOG_AS_OUTPUTS + ) + result = arguments_helpers.EndSpanParameters( + output=output, + usage=usage_in_openai_format, + metadata=metadata, + ) + + return result + + def _generators_handler( # type: ignore + self, + output: Any, + capture_output: bool, + generations_aggregator: Optional[Callable[[List[Any]], Any]], + ) -> Union[ + ConverseStreamOutput, + None, + ]: + DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR = ( + generations_aggregator is None + ) + + if DECORATED_FUNCTION_IS_NOT_EXPECTED_TO_RETURN_GENERATOR: + return None + + assert generations_aggregator is not None + + if isinstance(output, dict) and "stream" in output: + span_to_end, trace_to_end = base_track_decorator.pop_end_candidates() + + wrapped_stream = stream_wrappers.wrap_stream( + stream=output["stream"], + capture_output=capture_output, + span_to_end=span_to_end, + trace_to_end=trace_to_end, + generations_aggregator=generations_aggregator, + response_metadata=output["ResponseMetadata"], + finally_callback=self._after_call, + ) + + output["stream"] = wrapped_stream + return cast(ConverseStreamOutput, output) + + STREAM_NOT_FOUND = None + + return STREAM_NOT_FOUND diff --git a/sdks/python/src/opik/integrations/bedrock/opik_tracker.py b/sdks/python/src/opik/integrations/bedrock/opik_tracker.py new file mode 100644 index 000000000..42413bda5 --- /dev/null +++ b/sdks/python/src/opik/integrations/bedrock/opik_tracker.py @@ -0,0 +1,39 @@ +from typing import Any, Optional +from . import converse_decorator +from . import chunks_aggregator + + +def track_bedrock(client: Any, project_name: Optional[str] = None) -> Any: + """Adds Opik tracking to an AWS Bedrock client. + + Tracks calls to `converse()` and `converse_stream()` methods + Can be used within other Opik-tracked functions. + + Args: + client: An instance of an AWS Bedrock client. + project_name: The name of the project to log data. + + Returns: + The modified bedrock client with Opik tracking enabled. + """ + decorator = converse_decorator.BedrockConverseDecorator() + + if not hasattr(client.converse, "opik_tracked"): + wrapper = decorator.track( + type="llm", + name="bedrock_converse", + project_name=project_name, + ) + tracked_converse = wrapper(client.converse) + client.converse = tracked_converse + + if not hasattr(client.converse_stream, "opik_tracked"): + stream_wrapper = decorator.track( + type="llm", + name="bedrock_converse_stream", + project_name=project_name, + generations_aggregator=chunks_aggregator.aggregate, + ) + tracked_converse_stream = stream_wrapper(client.converse_stream) + client.converse_stream = tracked_converse_stream + return client diff --git a/sdks/python/src/opik/integrations/bedrock/stream_wrappers.py b/sdks/python/src/opik/integrations/bedrock/stream_wrappers.py new file mode 100644 index 000000000..365ea70d8 --- /dev/null +++ b/sdks/python/src/opik/integrations/bedrock/stream_wrappers.py @@ -0,0 +1,37 @@ +import logging +from typing import Generator, Any, List, Optional, Callable, Dict +from opik.api_objects import trace, span +from opik.decorator import generator_wrappers + +from botocore import eventstream + +LOGGER = logging.getLogger(__name__) + + +def wrap_stream( + stream: eventstream.EventStream, + capture_output: bool, + span_to_end: span.SpanData, + trace_to_end: Optional[trace.TraceData], + generations_aggregator: Callable[[List[Any]], Any], + response_metadata: Dict[str, Any], + finally_callback: generator_wrappers.FinishGeneratorCallback, +) -> Generator[Any, None, None]: + items: List[Dict[str, Any]] = [] + + try: + for item in stream: + items.append(item) + + yield item + + finally: + aggregated_output = generations_aggregator(items) + aggregated_output["ResponseMetadata"] = response_metadata + + finally_callback( + output=aggregated_output, + generators_span_to_end=span_to_end, + generators_trace_to_end=trace_to_end, + capture_output=capture_output, + ) diff --git a/sdks/python/src/opik/integrations/openai/opik_tracker.py b/sdks/python/src/opik/integrations/openai/opik_tracker.py index fb876ecda..8d6bdf71b 100644 --- a/sdks/python/src/opik/integrations/openai/opik_tracker.py +++ b/sdks/python/src/opik/integrations/openai/opik_tracker.py @@ -22,14 +22,15 @@ def track_openai( The modified OpenAI client with Opik tracking enabled. """ decorator = openai_decorator.OpenaiTrackDecorator() - wrapper = decorator.track( - type="llm", - name="chat_completion_create", - generations_aggregator=chunks_aggregator.aggregate, - project_name=project_name, - ) - openai_client.chat.completions.create = wrapper( - openai_client.chat.completions.create - ) + if not hasattr(openai_client.chat.completions.create, "opik_tracked"): + wrapper = decorator.track( + type="llm", + name="chat_completion_create", + generations_aggregator=chunks_aggregator.aggregate, + project_name=project_name, + ) + openai_client.chat.completions.create = wrapper( + openai_client.chat.completions.create + ) return openai_client diff --git a/sdks/python/tests/unit/test_dict_utils.py b/sdks/python/tests/unit/test_dict_utils.py index ad7743c80..df54e47b3 100644 --- a/sdks/python/tests/unit/test_dict_utils.py +++ b/sdks/python/tests/unit/test_dict_utils.py @@ -1,5 +1,5 @@ import pytest - +from typing import Dict, List from opik import dict_utils @@ -62,3 +62,28 @@ def test_deepmerge__recursion_limit_not_exceeded__merging_performed_as_usual(): } } } + + +@pytest.mark.parametrize( + "input_dict, keys, expected_subset, expected_remaining", + [ + # all specified keys are in the dictionary + ({"a": 1, "b": 2, "c": 3}, ["a", "c"], {"a": 1, "c": 3}, {"b": 2}), + # some keys in the list are not in the dictionary + ({"a": 1, "b": 2, "c": 3}, ["a", "d"], {"a": 1}, {"b": 2, "c": 3}), + # empty list of keys + ({"a": 1, "b": 2, "c": 3}, [], {}, {"a": 1, "b": 2, "c": 3}), + # no matching keys in dictionary + ({"a": 1, "b": 2, "c": 3}, ["d", "e"], {}, {"a": 1, "b": 2, "c": 3}), + # empty input dictionary + ({}, ["a", "b"], {}, {}), + # all keys in input dictionary + ({"a": 1, "b": 2, "c": 3}, ["a", "b", "c"], {"a": 1, "b": 2, "c": 3}, {}), + ], +) +def test_split_dict_by_keys( + input_dict: Dict, keys: List, expected_subset: Dict, expected_remaining: Dict +): + subset, remaining = dict_utils.split_dict_by_keys(input_dict, keys) + assert subset == expected_subset + assert remaining == expected_remaining