Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen-AI python implementation #290

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
# TODO: Use Semantic Conventions once upgrade to 0.47b0
GEN_AI_REQUEST_MODEL: str = "gen_ai.request.model"
GEN_AI_SYSTEM: str = "gen_ai.system"
GEN_AI_REQUEST_MAX_TOKENS: str = "gen_ai.request.max_tokens"
GEN_AI_REQUEST_TEMPERATURE: str = "gen_ai.request.temperature"
GEN_AI_REQUEST_TOP_P: str = "gen_ai.request.top_p"
GEN_AI_RESPONSE_FINISH_REASONS: str = "gen_ai.response.finish_reasons"
GEN_AI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens"
GEN_AI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens"


# Get dialect keywords retrieved from dialect_keywords.json file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import abc
import inspect
from typing import Dict, Optional
import json
import math
from typing import Any, Dict, Optional

from botocore.response import StreamingBody

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_BEDROCK_AGENT_ID,
Expand All @@ -11,7 +15,16 @@
AWS_BEDROCK_GUARDRAIL_ID,
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
)
from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM
from amazon.opentelemetry.distro._aws_span_processing_util import (
GEN_AI_REQUEST_MAX_TOKENS,
GEN_AI_REQUEST_MODEL,
GEN_AI_REQUEST_TEMPERATURE,
GEN_AI_REQUEST_TOP_P,
GEN_AI_RESPONSE_FINISH_REASONS,
GEN_AI_SYSTEM,
GEN_AI_USAGE_INPUT_TOKENS,
GEN_AI_USAGE_OUTPUT_TOKENS,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
Expand Down Expand Up @@ -240,3 +253,155 @@ def extract_attributes(self, attributes: _AttributeMapT):
model_id = self._call_context.params.get(_MODEL_ID)
if model_id:
attributes[GEN_AI_REQUEST_MODEL] = model_id

# Get the request body if it exists
body = self._call_context.params.get("body")
if body:
try:
request_body = json.loads(body)

if "amazon.titan" in model_id:
self._extract_titan_attributes(attributes, request_body)
elif "anthropic.claude" in model_id:
self._extract_claude_attributes(attributes, request_body)
elif "meta.llama" in model_id:
self._extract_llama_attributes(attributes, request_body)
elif "cohere.command" in model_id:
self._extract_cohere_attributes(attributes, request_body)
elif "ai21.jamba" in model_id:
self._extract_ai21_attributes(attributes, request_body)
elif "mistral" in model_id:
self._extract_mistral_attributes(attributes, request_body)

except json.JSONDecodeError:
print("Error: Unable to parse the body as JSON")

def _extract_titan_attributes(self, attributes, request_body):
config = request_body.get("textGenerationConfig", {})
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get("topP"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount"))

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_cohere_attributes(self, attributes, request_body):
prompt = request_body.get("message")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p"))

def _extract_ai21_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_mistral_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

@staticmethod
def _set_if_not_none(attributes, key, value):
if value is not None:
attributes[key] = value

def on_success(self, span: Span, result: Dict[str, Any]):
model_id = self._call_context.params.get(_MODEL_ID)
if not model_id:
return

if "body" in result and isinstance(result["body"], StreamingBody):
try:
# Read the entire content of the StreamingBody
body_content = result["body"].read()
# Decode the bytes to string and parse as JSON
response_body = json.loads(body_content.decode("utf-8"))
Jeel-mehta marked this conversation as resolved.
Show resolved Hide resolved

if "amazon.titan" in model_id:
self._handle_amazon_titan_response(span, response_body)
elif "anthropic.claude" in model_id:
self._handle_anthropic_claude_response(span, response_body)
elif "meta.llama" in model_id:
self._handle_meta_llama_response(span, response_body)
elif "cohere.command" in model_id:
self._handle_cohere_command_response(span, response_body)
elif "ai21.jamba" in model_id:
self._handle_ai21_jamba_response(span, response_body)
elif "mistral" in model_id:
self._handle_mistral_mistral_response(span, response_body)

except json.JSONDecodeError:
print("Error: Unable to parse the response body as JSON")
except Exception as e:
print(f"Error processing response: {str(e)}")
finally:
# Make sure to close the stream
result["body"].close()

def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]):
if "inputTextTokenCount" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"])

result = response_body["results"][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to access element with [0] will throw an error if the results array doesn't exist. I think we need another conditional check here.

For example:

if 'results' in response_body:
   result = response_body["results"][0]

or something more concise like:

result = response_body.get('results', [{}])

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw this, I will fix this and make another commit

if "tokenCount" in result:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"])
if "completionReason" in result:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result["completionReason"]])

def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "input_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"])
if "output_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]):
# Output tokens: Approximate from the response text
if "text" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6))
if "finish_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]])

def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "prompt_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"])
if "completion_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"])
if "choices" in response_body:
choices = response_body["choices"][0]
if "finish_reason" in choices:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]])

def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]):
if "prompt_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"])
if "generation_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]):
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6))
if "stop_reason" in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]])
Loading
Loading