Skip to content

Commit

Permalink
Fixing LLMEvent returns and streaming logic (#181)
Browse files Browse the repository at this point in the history
* Setting session to none if server does not return 200 for /sessions

* WIP

* WIP. Moving around how we handle chunks

* Ready for testing

* WIP. v1 is tested need to test v0

* WIP. Updated. Need to test v0

* Fixed logging messages

* Tidied

* Done
  • Loading branch information
HowieG authored May 3, 2024
1 parent 2ac634e commit 562a1ac
Showing 1 changed file with 84 additions and 58 deletions.
142 changes: 84 additions & 58 deletions agentops/llm_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .event import LLMEvent, ErrorEvent
from .helpers import get_ISO_time, check_call_stack_for_agent_id
import inspect
import pprint


class LlmTracker:
Expand All @@ -26,44 +27,51 @@ class LlmTracker:

def __init__(self, client):
self.client = client
self.completion = ""
self.llm_event: LLMEvent = None

def _handle_response_v0_openai(self, response, kwargs, init_timestamp):
"""Handle responses for OpenAI versions <v1.0.0"""

self.completion = ""
self.llm_event = None
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)

def handle_stream_chunk(chunk):
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk

try:
# NOTE: prompt/completion usage not returned in response when streaming
model = chunk['model']
choices = chunk['choices']
token = choices[0]['delta'].get('content', '')
finish_reason = choices[0]['finish_reason']
if token:
self.completion += token

if finish_reason:
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.completion = {"role": "assistant", "content": self.completion}
self.llm_event.returns = {"finish_reason": finish_reason, "content": self.completion}
self.llm_event.model = model
accumulated_delta = self.llm_event.returns['choices'][0]['delta']
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk['model']
self.llm_event.prompt = kwargs["messages"]
choice = chunk['choices'][0] # NOTE: We assume for completion only choices[0] is relevant

if choice['delta'].get('content'):
accumulated_delta['content'] += choice['delta'].content

if choice['delta'].get('role'):
accumulated_delta['role'] = choice['delta'].get('role')

if choice['finish_reason']:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0]['finish_reason'] = choice['finish_reason']
self.llm_event.completion = {
"role": accumulated_delta['role'], "content": accumulated_delta['content']}
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
chunk = pprint.pformat(chunk)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n"
f"chunk:\n {chunk}\n"
f"kwargs:\n {kwargs_str}\n"
)

# if the response is a generator, decorate the generator
if inspect.isasyncgen(response):
Expand All @@ -84,7 +92,8 @@ def generator():

self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
params=kwargs,
returns=response
)
# v0.0.0 responses are dicts
try:
Expand All @@ -93,16 +102,19 @@ def generator():
self.llm_event.prompt_tokens = response['usage']['prompt_tokens']
self.llm_event.completion = {"role": "assistant", "content": response['choices'][0]['message']['content']}
self.llm_event.completion_tokens = response['usage']['completion_tokens']
self.llm_event.returns = {"content": response['choices'][0]['message']['content']}
self.llm_event.model = response["model"]
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse response for LLM call. Skipping upload to AgentOps\n"
f"response:\n {response}\n"
f"kwargs:\n {kwargs_str}\n"
)

return response

Expand All @@ -112,43 +124,53 @@ def _handle_response_v1_openai(self, response, kwargs, init_timestamp):
from openai.types.chat import ChatCompletionChunk
from openai.resources import AsyncCompletions

self.completion = ""
self.llm_event = None
self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)

def handle_stream_chunk(chunk: ChatCompletionChunk):

self.llm_event = LLMEvent(
init_timestamp=init_timestamp,
params=kwargs
)
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk

try:
# NOTE: prompt/completion usage not returned in response when streaming
model = chunk.model
choices = chunk.choices
token = choices[0].delta.content
finish_reason = choices[0].finish_reason
function_call = choices[0].delta.function_call
tool_calls = choices[0].delta.tool_calls
role = choices[0].delta.role
if token:
self.completion += token

if finish_reason:
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.completion = {"role": "assistant", "content": self.completion}
self.llm_event.returns = {"finish_reason": finish_reason, "content": self.completion,
"function_call": function_call, "tool_calls": tool_calls, "role": role}
self.llm_event.model = model
accumulated_delta = self.llm_event.returns.choices[0].delta
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk.model
self.llm_event.prompt = kwargs["messages"]
choice = chunk.choices[0] # NOTE: We assume for completion only choices[0] is relevant

if choice.delta.content:
accumulated_delta.content += choice.delta.content

if choice.delta.role:
accumulated_delta.role = choice.delta.role

if choice.delta.tool_calls:
accumulated_delta.tool_calls = choice.delta.tool_calls

if choice.delta.function_call:
accumulated_delta.function_call = choice.delta.function_call

if choice.finish_reason:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0].finish_reason = choice.finish_reason
self.llm_event.completion = {"role": accumulated_delta.role, "content": accumulated_delta.content,
"function_call": accumulated_delta.function_call, "tool_calls": accumulated_delta.tool_calls}
self.llm_event.end_timestamp = get_ISO_time()

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
chunk = pprint.pformat(chunk)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n"
f"chunk:\n {chunk}\n"
f"kwargs:\n {kwargs_str}\n"
)

# if the response is a generator, decorate the generator
if isinstance(response, Stream):
Expand Down Expand Up @@ -180,20 +202,24 @@ async def async_generator():
)
# v1.0.0+ responses are objects
try:
self.llm_event.returns = response.model_dump()
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = response.usage.prompt_tokens
self.llm_event.completion = response.choices[0].message.model_dump()
self.llm_event.completion_tokens = response.usage.completion_tokens
self.llm_event.returns = response.model_dump()
self.llm_event.model = response.model

self.client.record(self.llm_event)
except Exception as e:
self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
# TODO: This error is specific to only one path of failure. Should be more generic or have different logger for different paths
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
f"🖇 AgentOps: Unable to parse a chunk for LLM call {kwargs} - skipping upload to AgentOps")
f"🖇 AgentOps: Unable to parse response for LLM call. Skipping upload to AgentOps\n"
f"response:\n {response}\n"
f"kwargs:\n {kwargs_str}\n"
)

return response

Expand Down

0 comments on commit 562a1ac

Please sign in to comment.