Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing LLMEvent returns and streaming logic #181

Merged
merged 17 commits into from
May 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 84 additions & 58 deletions agentops/llm_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .event import LLMEvent, ErrorEvent
from .helpers import get_ISO_time, check_call_stack_for_agent_id
import inspect
import pprint


class LlmTracker:
Expand All @@ -26,44 +27,51 @@ class LlmTracker:

def __init__(self, client):
self.client = client
self.completion = ""
self.llm_event: LLMEvent = None

def _handle_response_v0_openai(self, response, kwargs, init_timestamp):
"""Handle responses for OpenAI versions <v1.0.0"""

self.completion = ""
self.llm_event = None
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)

def handle_stream_chunk(chunk):
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk

try:
# NOTE: prompt/completion usage not returned in response when streaming
model = chunk['model']
choices = chunk['choices']
token = choices[0]['delta'].get('content', '')
finish_reason = choices[0]['finish_reason']
if token:
self.completion += token

if finish_reason:
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.completion = {"role": "assistant", "content": self.completion}
self.llm_event.returns = {"finish_reason": finish_reason, "content": self.completion}
self.llm_event.model = model
accumulated_delta = self.llm_event.returns['choices'][0]['delta']
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk['model']
self.llm_event.prompt = kwargs["messages"]
choice = chunk['choices'][0] # NOTE: We assume for completion only choices[0] is relevant

if choice['delta'].get('content'):
accumulated_delta['content'] += choice['delta'].content

if choice['delta'].get('role'):
accumulated_delta['role'] = choice['delta'].get('role')

if choice['finish_reason']:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0]['finish_reason'] = choice['finish_reason']
self.llm_event.completion = {
"role": accumulated_delta['role'], "content": accumulated_delta['content']}
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
chunk = pprint.pformat(chunk)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n"
f"chunk:\n {chunk}\n"
f"kwargs:\n {kwargs_str}\n"
)

# if the response is a generator, decorate the generator
if inspect.isasyncgen(response):
Expand All @@ -84,7 +92,8 @@ def generator():

self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
params=kwargs,
returns=response
)
# v0.0.0 responses are dicts
try:
Expand All @@ -93,16 +102,19 @@ def generator():
self.llm_event.prompt_tokens = response['usage']['prompt_tokens']
self.llm_event.completion = {"role": "assistant", "content": response['choices'][0]['message']['content']}
self.llm_event.completion_tokens = response['usage']['completion_tokens']
self.llm_event.returns = {"content": response['choices'][0]['message']['content']}
self.llm_event.model = response["model"]
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse response for LLM call. Skipping upload to AgentOps\n"
f"response:\n {response}\n"
f"kwargs:\n {kwargs_str}\n"
)

return response

Expand All @@ -112,43 +124,53 @@ def _handle_response_v1_openai(self, response, kwargs, init_timestamp):
from openai.types.chat import ChatCompletionChunk
from openai.resources import AsyncCompletions

self.completion = ""
self.llm_event = None
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)

def handle_stream_chunk(chunk: ChatCompletionChunk):

self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk

try:
# NOTE: prompt/completion usage not returned in response when streaming
model = chunk.model
choices = chunk.choices
token = choices[0].delta.content
finish_reason = choices[0].finish_reason
function_call = choices[0].delta.function_call
tool_calls = choices[0].delta.tool_calls
role = choices[0].delta.role
if token:
self.completion += token

if finish_reason:
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.completion = {"role": "assistant", "content": self.completion}
self.llm_event.returns = {"finish_reason": finish_reason, "content": self.completion,
"function_call": function_call, "tool_calls": tool_calls, "role": role}
self.llm_event.model = model
accumulated_delta = self.llm_event.returns.choices[0].delta
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk.model
self.llm_event.prompt = kwargs["messages"]
choice = chunk.choices[0] # NOTE: We assume for completion only choices[0] is relevant

if choice.delta.content:
accumulated_delta.content += choice.delta.content

if choice.delta.role:
accumulated_delta.role = choice.delta.role

if choice.delta.tool_calls:
accumulated_delta.tool_calls = choice.delta.tool_calls

if choice.delta.function_call:
accumulated_delta.function_call = choice.delta.function_call

if choice.finish_reason:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0].finish_reason = choice.finish_reason
self.llm_event.completion = {"role": accumulated_delta.role, "content": accumulated_delta.content,
"function_call": accumulated_delta.function_call, "tool_calls": accumulated_delta.tool_calls}
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
chunk = pprint.pformat(chunk)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n"
f"chunk:\n {chunk}\n"
f"kwargs:\n {kwargs_str}\n"
)

# if the response is a generator, decorate the generator
if isinstance(response, Stream):
Expand Down Expand Up @@ -180,20 +202,24 @@ async def async_generator():
)
# v1.0.0+ responses are objects
try:
self.llm_event.returns = response.model_dump()
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = response.usage.prompt_tokens
self.llm_event.completion = response.choices[0].message.model_dump()
self.llm_event.completion_tokens = response.usage.completion_tokens
self.llm_event.returns = response.model_dump()
self.llm_event.model = response.model

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse response for LLM call. Skipping upload to AgentOps\n"
f"response:\n {response}\n"
f"kwargs:\n {kwargs_str}\n"
)

return response

Expand Down
Loading