diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py index 082c2de5c..24aaa68dc 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_span_processing_util.py @@ -29,6 +29,12 @@ # TODO: Use Semantic Conventions once upgrade to 0.47b0 GEN_AI_REQUEST_MODEL: str = "gen_ai.request.model" GEN_AI_SYSTEM: str = "gen_ai.system" +GEN_AI_REQUEST_MAX_TOKENS: str = "gen_ai.request.max_tokens" +GEN_AI_REQUEST_TEMPERATURE: str = "gen_ai.request.temperature" +GEN_AI_REQUEST_TOP_P: str = "gen_ai.request.top_p" +GEN_AI_RESPONSE_FINISH_REASONS: str = "gen_ai.response.finish_reasons" +GEN_AI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens" +GEN_AI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens" # Get dialect keywords retrieved from dialect_keywords.json file. diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index 581ca36f4..4a6eb10f5 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -2,7 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 import abc import inspect -from typing import Dict, Optional +import io +import json +import logging +import math +from typing import Any, Dict, Optional + +from botocore.response import StreamingBody from amazon.opentelemetry.distro._aws_attribute_keys import ( AWS_BEDROCK_AGENT_ID, @@ -11,7 +17,16 @@ AWS_BEDROCK_GUARDRAIL_ID, AWS_BEDROCK_KNOWLEDGE_BASE_ID, ) -from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM +from amazon.opentelemetry.distro._aws_span_processing_util import ( + GEN_AI_REQUEST_MAX_TOKENS, + GEN_AI_REQUEST_MODEL, + GEN_AI_REQUEST_TEMPERATURE, + GEN_AI_REQUEST_TOP_P, + GEN_AI_RESPONSE_FINISH_REASONS, + GEN_AI_SYSTEM, + GEN_AI_USAGE_INPUT_TOKENS, + GEN_AI_USAGE_OUTPUT_TOKENS, +) from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, _AwsSdkCallContext, @@ -28,6 +43,10 @@ _MODEL_ID: str = "modelId" _AWS_BEDROCK_SYSTEM: str = "aws_bedrock" +_logger = logging.getLogger(__name__) +# Set logger level to DEBUG +_logger.setLevel(logging.DEBUG) + class _BedrockAgentOperation(abc.ABC): """ @@ -240,3 +259,168 @@ def extract_attributes(self, attributes: _AttributeMapT): model_id = self._call_context.params.get(_MODEL_ID) if model_id: attributes[GEN_AI_REQUEST_MODEL] = model_id + + # Get the request body if it exists + body = self._call_context.params.get("body") + if body: + try: + request_body = json.loads(body) + + if "amazon.titan" in model_id: + self._extract_titan_attributes(attributes, request_body) + elif "anthropic.claude" in model_id: + self._extract_claude_attributes(attributes, request_body) + elif "meta.llama" in model_id: + self._extract_llama_attributes(attributes, request_body) + elif "cohere.command" in model_id: + self._extract_cohere_attributes(attributes, request_body) + elif "ai21.jamba" in model_id: + self._extract_ai21_attributes(attributes, request_body) + elif "mistral" in model_id: + self._extract_mistral_attributes(attributes, request_body) + + except json.JSONDecodeError: + _logger.debug("Error: Unable to parse the body as JSON") + + def _extract_titan_attributes(self, attributes, request_body): + config = request_body.get("textGenerationConfig", {}) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get("topP")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")) + + def _extract_claude_attributes(self, attributes, request_body): + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) + + def _extract_cohere_attributes(self, attributes, request_body): + prompt = request_body.get("message") + if prompt: + attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6) + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")) + + def _extract_ai21_attributes(self, attributes, request_body): + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) + + def _extract_llama_attributes(self, attributes, request_body): + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) + + def _extract_mistral_attributes(self, attributes, request_body): + prompt = request_body.get("prompt") + if prompt: + attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6) + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) + + @staticmethod + def _set_if_not_none(attributes, key, value): + if value is not None: + attributes[key] = value + + def on_success(self, span: Span, result: Dict[str, Any]): + model_id = self._call_context.params.get(_MODEL_ID) + + if not model_id: + return + + if "body" in result and isinstance(result["body"], StreamingBody): + original_body = None + try: + original_body = result["body"] + body_content = original_body.read() + + # Use one stream for telemetry + stream = io.BytesIO(body_content) + telemetry_content = stream.read() + response_body = json.loads(telemetry_content.decode("utf-8")) + if "amazon.titan" in model_id: + self._handle_amazon_titan_response(span, response_body) + elif "anthropic.claude" in model_id: + self._handle_anthropic_claude_response(span, response_body) + elif "meta.llama" in model_id: + self._handle_meta_llama_response(span, response_body) + elif "cohere.command" in model_id: + self._handle_cohere_command_response(span, response_body) + elif "ai21.jamba" in model_id: + self._handle_ai21_jamba_response(span, response_body) + elif "mistral" in model_id: + self._handle_mistral_mistral_response(span, response_body) + # Replenish stream for downstream application use + new_stream = io.BytesIO(body_content) + result["body"] = StreamingBody(new_stream, len(body_content)) + + except json.JSONDecodeError: + _logger.debug("Error: Unable to parse the response body as JSON") + except Exception as e: # pylint: disable=broad-exception-caught, invalid-name + _logger.debug("Error processing response: %s", e) + finally: + if original_body is not None: + original_body.close() + + # pylint: disable=no-self-use + def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]): + if "inputTextTokenCount" in response_body: + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"]) + if "results" in response_body and response_body["results"]: + result = response_body["results"][0] + if "tokenCount" in result: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]) + if "completionReason" in result: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result["completionReason"]]) + + # pylint: disable=no-self-use + def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]): + if "usage" in response_body: + usage = response_body["usage"] + if "input_tokens" in usage: + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"]) + if "output_tokens" in usage: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"]) + if "stop_reason" in response_body: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]) + + # pylint: disable=no-self-use + def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]): + # Output tokens: Approximate from the response text + if "text" in response_body: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6)) + if "finish_reason" in response_body: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]]) + + # pylint: disable=no-self-use + def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]): + if "usage" in response_body: + usage = response_body["usage"] + if "prompt_tokens" in usage: + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"]) + if "completion_tokens" in usage: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"]) + if "choices" in response_body: + choices = response_body["choices"][0] + if "finish_reason" in choices: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]]) + + # pylint: disable=no-self-use + def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]): + if "prompt_token_count" in response_body: + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"]) + if "generation_token_count" in response_body: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"]) + if "stop_reason" in response_body: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]) + + # pylint: disable=no-self-use + def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]): + if "outputs" in response_body: + outputs = response_body["outputs"][0] + if "text" in outputs: + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6)) + if "stop_reason" in outputs: + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]]) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index b27d5e799..86c6bc39f 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -1,12 +1,16 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import json +import math import os +from io import BytesIO from typing import Any, Dict from unittest import TestCase from unittest.mock import MagicMock, patch import gevent.monkey import pkg_resources +from botocore.response import StreamingBody from amazon.opentelemetry.distro.patches._instrumentation_patch import ( AWS_GEVENT_PATCH_MODULES, @@ -173,7 +177,7 @@ def _test_unpatched_gevent_instrumentation(self): self.assertFalse(gevent.monkey.is_module_patched("queue"), "gevent queue module has been patched") self.assertFalse(gevent.monkey.is_module_patched("contextvars"), "gevent contextvars module has been patched") - # pylint: disable=too-many-statements + # pylint: disable=too-many-statements, too-many-locals def _test_patched_botocore_instrumentation(self): # Kinesis self.assertTrue("kinesis" in _KNOWN_EXTENSIONS) @@ -211,12 +215,209 @@ def _test_patched_botocore_instrumentation(self): bedrock_agent_runtime_sucess_attributes: Dict[str, str] = _do_on_success_bedrock("bedrock-agent-runtime") self.assertEqual(len(bedrock_agent_runtime_sucess_attributes), 0) - # BedrockRuntime + # BedrockRuntime - Amazon Titan Models self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) - bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock("bedrock-runtime") - self.assertEqual(len(bedrock_runtime_attributes), 2) + request_body = { + "textGenerationConfig": { + "maxTokenCount": 512, + "temperature": 0.9, + "topP": 0.75, + } + } + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="amazon.titan", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 5) self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) - self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], _GEN_AI_REQUEST_MODEL) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "amazon.titan") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.9) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.75) + response_body = { + "inputTextTokenCount": 123, + "results": [ + { + "tokenCount": 456, + "outputText": "testing", + "completionReason": "FINISH", + } + ], + } + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="amazon.titan", streaming_body=streaming_body + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.input_tokens"], 123) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], 456) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["FINISH"]) + + # BedrockRuntime - Anthropic Claude Models + + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) + request_body = { + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 512, + "temperature": 0.5, + "top_p": 0.999, + } + + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="anthropic.claude", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "anthropic.claude") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.999) + response_body = { + "stop_reason": "end_turn", + "stop_sequence": None, + "usage": {"input_tokens": 23, "output_tokens": 36}, + } + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="anthropic.claude", streaming_body=streaming_body + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.input_tokens"], 23) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], 36) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["end_turn"]) + + # BedrockRuntime - Cohere Command Models + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) + request_body = { + "message": "Hello, world", + "max_tokens": 512, + "temperature": 0.5, + "p": 0.75, + } + + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="cohere.command", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 6) + self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "cohere.command") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.75) + self.assertEqual( + bedrock_runtime_attributes["gen_ai.usage.input_tokens"], math.ceil(len(request_body["message"]) / 6) + ) + response_body = { + "text": "Goodbye, world", + "finish_reason": "COMPLETE", + } + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="cohere.command", streaming_body=streaming_body + ) + self.assertEqual( + bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], math.ceil(len(response_body["text"]) / 6) + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["COMPLETE"]) + + # BedrockRuntime - AI21 Jamba Models + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) + request_body = { + "max_tokens": 512, + "temperature": 0.5, + "top_p": 0.9, + } + + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="ai21.jamba", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "ai21.jamba") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.9) + response_body = { + "choices": [{"finish_reason": "stop"}], + "usage": {"prompt_tokens": 24, "completion_tokens": 31, "total_tokens": 55}, + } + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="ai21.jamba", streaming_body=streaming_body + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.input_tokens"], 24) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], 31) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["stop"]) + + # BedrockRuntime - Meta LLama Models + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) + request_body = { + "max_gen_len": 512, + "temperature": 0.5, + "top_p": 0.9, + } + + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="meta.llama", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "meta.llama") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.9) + response_body = {"prompt_token_count": 31, "generation_token_count": 36, "stop_reason": "stop"} + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="meta.llama", streaming_body=streaming_body + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.input_tokens"], 31) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], 36) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["stop"]) + + # BedrockRuntime - Mistral Models + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS) + msg = "Hello, World" + formatted_prompt = f"[INST] {msg} [/INST]" + request_body = { + "prompt": formatted_prompt, + "max_tokens": 512, + "temperature": 0.5, + "top_p": 0.9, + } + + bedrock_runtime_attributes: Dict[str, str] = _do_extract_attributes_bedrock( + "bedrock-runtime", model_id="mistral", request_body=json.dumps(request_body) + ) + self.assertEqual(len(bedrock_runtime_attributes), 6) + self.assertEqual(bedrock_runtime_attributes["gen_ai.system"], _GEN_AI_SYSTEM) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.model"], "mistral") + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.max_tokens"], 512) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.temperature"], 0.5) + self.assertEqual(bedrock_runtime_attributes["gen_ai.request.top_p"], 0.9) + self.assertEqual( + bedrock_runtime_attributes["gen_ai.usage.input_tokens"], math.ceil(len(request_body["prompt"]) / 6) + ) + response_body = {"outputs": [{"text": "Goodbye, World", "stop_reason": "stop"}]} + json_bytes = json.dumps(response_body).encode("utf-8") + body_bytes = BytesIO(json_bytes) + streaming_body = StreamingBody(body_bytes, len(json_bytes)) + bedrock_runtime_success_attributes: Dict[str, str] = _do_on_success_bedrock( + "bedrock-runtime", model_id="mistral", streaming_body=streaming_body + ) + + self.assertEqual( + bedrock_runtime_success_attributes["gen_ai.usage.output_tokens"], + math.ceil(len(response_body["outputs"][0]["text"]) / 6), + ) + self.assertEqual(bedrock_runtime_success_attributes["gen_ai.response.finish_reasons"], ["stop"]) # SecretsManager self.assertTrue("secretsmanager" in _KNOWN_EXTENSIONS) @@ -385,26 +586,27 @@ def _do_extract_sqs_attributes() -> Dict[str, str]: return _do_extract_attributes(service_name, params) -def _do_extract_attributes_bedrock(service, operation=None) -> Dict[str, str]: +def _do_extract_attributes_bedrock(service, operation=None, model_id=None, request_body=None) -> Dict[str, str]: params: Dict[str, Any] = { "agentId": _BEDROCK_AGENT_ID, "dataSourceId": _BEDROCK_DATASOURCE_ID, "knowledgeBaseId": _BEDROCK_KNOWLEDGEBASE_ID, "guardrailId": _BEDROCK_GUARDRAIL_ID, - "modelId": _GEN_AI_REQUEST_MODEL, + "modelId": model_id, + "body": request_body, } return _do_extract_attributes(service, params, operation) -def _do_on_success_bedrock(service, operation=None) -> Dict[str, str]: +def _do_on_success_bedrock(service, operation=None, model_id=None, streaming_body=None) -> Dict[str, str]: result: Dict[str, Any] = { "agentId": _BEDROCK_AGENT_ID, "dataSourceId": _BEDROCK_DATASOURCE_ID, "knowledgeBaseId": _BEDROCK_KNOWLEDGEBASE_ID, "guardrailId": _BEDROCK_GUARDRAIL_ID, - "modelId": _GEN_AI_REQUEST_MODEL, + "body": streaming_body, } - return _do_on_success(service, result, operation) + return _do_on_success(service, result, operation, params={"modelId": model_id}) def _do_extract_secretsmanager_attributes() -> Dict[str, str]: