diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index 9beb42ef..fd60f67d 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -4,7 +4,7 @@ from importlib.metadata import version from packaging.version import Version, parse from .log_config import logger -from .event import LLMEvent, ErrorEvent +from .event import LLMEvent, ActionEvent, ToolEvent, ErrorEvent from .helpers import get_ISO_time, check_call_stack_for_agent_id import inspect from typing import Optional @@ -228,7 +228,7 @@ async def async_generator(): return response def _handle_response_cohere(self, response, kwargs, init_timestamp): - # TODO: """Handle responses for Cohere versions >v5.4.0""" + """Handle responses for Cohere versions >v5.4.0""" from cohere.types.non_streamed_chat_response import NonStreamedChatResponse from cohere.types.streamed_chat_response import ( StreamedChatResponse, @@ -248,6 +248,8 @@ def _handle_response_cohere(self, response, kwargs, init_timestamp): params=kwargs ) + self.action_events = {} + def handle_stream_chunk(chunk): # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion @@ -261,21 +263,59 @@ def handle_stream_chunk(chunk): try: if isinstance(chunk, StreamedChatResponse_StreamEnd): - # Streaming is done. Record LLMEvent - # self.llm_event.returns.finish_reason = chunk.is_finished + # StreamedChatResponse_TextGeneration = LLMEvent self.llm_event.completion = { - "role": "assistant", "content": self.llm_event.completion} + "role": "assistant", "content": chunk.response.text} self.llm_event.end_timestamp = get_ISO_time() - self.client.record(self.llm_event) + # StreamedChatResponse_SearchResults = ActionEvent + search_results = chunk.response.search_results + for search_result in search_results: + query = search_result.search_query + if query.generation_id in self.action_events: + action_event = self.action_events[query.generation_id] + search_result_dict = search_result.dict() + del search_result_dict["search_query"] + action_event.returns = search_result_dict + action_event.end_timestamp = get_ISO_time() + + # StreamedChatResponse_CitationGeneration = ActionEvent + documents = {doc['id']: doc for doc in chunk.response.documents} + citations = chunk.response.citations + for citation in citations: + citation_id = f"{citation.start}.{citation.end}" + if citation_id in self.action_events: + action_event = self.action_events[citation_id] + citation_dict = citation.dict() + # Replace document_ids with the actual documents + citation_dict['documents'] = [documents[doc_id] + for doc_id in citation_dict['document_ids'] if doc_id in documents] + del citation_dict['document_ids'] + + action_event.returns = citation_dict + action_event.end_timestamp = get_ISO_time() + + for key, action_event in self.action_events.items(): + self.client.record(action_event) + elif isinstance(chunk, StreamedChatResponse_TextGeneration): self.llm_event.completion += chunk.text elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): pass elif isinstance(chunk, StreamedChatResponse_CitationGeneration): - pass + for citation in chunk.citations: + self.action_events[f"{citation.start}.{citation.end}"] = ActionEvent( + action_type="citation", + init_timestamp=get_ISO_time(), + params=citation.text) elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): + for query in chunk.search_queries: + self.action_events[query.generation_id] = ActionEvent( + action_type="search_query", + init_timestamp=get_ISO_time(), + params=query.text) + elif isinstance(chunk, StreamedChatResponse_SearchResults): pass except Exception as e: @@ -498,7 +538,7 @@ def override_api(self): self._override_method(api, method_path, module) if api == 'cohere': - # Patch cohere vx.x.x+ methods + # Patch cohere v5.4.0+ methods module_version = version(api) if module_version is None: logger.warning(f'🖇 AgentOps: Cannot determine Cohere version. Only Cohere>=5.4.0 supported.') diff --git a/examples/cohere.py b/examples/cohere.py deleted file mode 100644 index 8e1ef3aa..00000000 --- a/examples/cohere.py +++ /dev/null @@ -1,30 +0,0 @@ -import cohere -import agentops -from dotenv import load_dotenv -load_dotenv() - -agentops.init() -co = cohere.Client() - -chat = co.chat( - chat_history=[ - {"role": "USER", "message": "Who discovered gravity?"}, - { - "role": "CHATBOT", - "message": "The man who is widely credited with discovering gravity is Sir Isaac Newton", - }, - ], - message="What year was he born?", -) - -print(chat) - -stream = co.chat_stream( - message="Tell me a short story" -) - -for event in stream: - if event.event_type == "text-generation": - print(event.text, end='') - -agentops.end_session('Success') diff --git a/examples/cohere_example.py b/examples/cohere_example.py new file mode 100644 index 00000000..d7a7013c --- /dev/null +++ b/examples/cohere_example.py @@ -0,0 +1,46 @@ +import cohere +import agentops # just +from dotenv import load_dotenv +load_dotenv() + +agentops.init(tags=["cohere", "agentops-demo"]) # three +co = cohere.Client() + +stream = co.chat_stream( + message="Tell me everything you can about AgentOps", + connectors=[{"id": "web-search"}] +) + +response = "" +for event in stream: + if event.event_type == "text-generation": + response += event.text + print(event.text, end='') + elif event.event_type == "stream-end": + print("\n") + print(event) + print("\n") + +stream = co.chat_stream( + chat_history=[ + {"role": "SYSTEM", "message": "You are Adam Silverman: die-hard advocate of AgentOps, leader in AI Agent observability"}, + { + "role": "CHATBOT", + "message": "How's your day going? I'd like to tell you about AgentOps: {response}", + }, + ], + message="Based on your newfound knowledge of AgentOps, is Cohere a suitable partner for them and how could they integrate?", + connectors=[{"id": "web-search"}] +) + +response = "" +for event in stream: + if event.event_type == "text-generation": + response += event.text + print(event.text, end='') + elif event.event_type == "stream-end": + print("\n") + print(event) + print("\n") + +agentops.end_session('Success') # lines