diff --git a/.gitignore b/.gitignore index eeb33f955..2f39587df 100644 --- a/.gitignore +++ b/.gitignore @@ -157,7 +157,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ .vscode/ .benchmarks/ diff --git a/agentops/client.py b/agentops/client.py index cf2067858..ee173e192 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -35,6 +35,7 @@ class Client: endpoint (str, optional): The endpoint for the AgentOps service. Defaults to 'https://agentops-server-v2.fly.dev'. max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. Defaults to 1000. max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. + override (bool): Whether to override and LLM calls to emit as events. Attributes: session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). """ @@ -44,7 +45,8 @@ def __init__(self, api_key: Optional[str] = None, tags: Optional[List[str]] = None, endpoint: Optional[str] = 'https://agentops-server-v2.fly.dev', max_wait_time: Optional[int] = 1000, - max_queue_size: Optional[int] = 100): + max_queue_size: Optional[int] = 100, + override=True): # Get API key from env if api_key is None: @@ -71,11 +73,12 @@ def __init__(self, api_key: Optional[str] = None, # Override sys.excepthook sys.excepthook = self.handle_exception - self.start_session(tags) + self._start_session(tags) - if 'openai' in sys.modules: - self.llm_tracker = LlmTracker(self) - self.llm_tracker.override_api('openai') + if override: + if 'openai' in sys.modules: + self.llm_tracker = LlmTracker(self) + self.llm_tracker.override_api('openai') def handle_exception(self, exc_type, exc_value, exc_traceback): """ @@ -232,7 +235,7 @@ async def _record_event_async(self, func, event_name, tags, *args, **kwargs): return returns - def start_session(self, tags: Optional[List[str]] = None): + def _start_session(self, tags: Optional[List[str]] = None): """ Start a new session for recording events. @@ -245,10 +248,10 @@ def start_session(self, tags: Optional[List[str]] = None): self.worker.start_session(self.session) def end_session(self, end_state: str = Field("Indeterminate", - description="End state of the session", - pattern="^(Success|Fail|Indeterminate)$"), - rating: Optional[str] = None, - video: Optional[str] = None): + description="End state of the session", + pattern="^(Success|Fail|Indeterminate)$"), + rating: Optional[str] = None, + video: Optional[str] = None): """ End the current session with the AgentOps service. diff --git a/agentops/event.py b/agentops/event.py index e33b76d6a..d3d2f0e29 100644 --- a/agentops/event.py +++ b/agentops/event.py @@ -8,6 +8,7 @@ from typing import Optional, List from pydantic import Field + class Event: """ Represents a discrete event to be recorded. @@ -32,13 +33,15 @@ class Event: event_type (str): Type of the event. params (str, optional): The parameters passed to the operation. returns (str, optional): The output of the operation. - result (str): Result of the operation. + result (Result): Result of the operation as Enum Result. action_type (str): Type of action of the event. model (Models, optional): The model used during the event. prompt (str, optional): The input prompt for an LLM call. tags (List[str], optional): Tags associated with the event. end_timestamp (float): The timestamp for when the event ended, represented as seconds since the epoch. init_timestamp (float): The timestamp for when the event was initiated, represented as seconds since the epoch. + prompt_tokens (int, optional): The number of tokens in the prompt if the event is an LLM call + completion_tokens (int, optional): The number of tokens in the completion if the event is an LLM call """ def __init__(self, event_type: str, @@ -54,7 +57,9 @@ def __init__(self, event_type: str, prompt: Optional[str] = None, tags: Optional[List[str]] = None, init_timestamp: Optional[float] = None, - screenshot: Optional[str] = None + screenshot: Optional[str] = None, + prompt_tokens: Optional[int] = None, + completion_tokens: Optional[int] = None ): self.event_type = event_type self.params = params @@ -66,4 +71,21 @@ def __init__(self, event_type: str, self.prompt = prompt self.end_timestamp = get_ISO_time() self.init_timestamp = init_timestamp if init_timestamp else self.end_timestamp - self.screenshot = screenshot \ No newline at end of file + self.screenshot = screenshot + self.prompt_tokens = prompt_tokens + self.completion_tokens = completion_tokens + + def __str__(self): + return str({ + "event_type": self.event_type, + "params": self.params, + "returns": self.returns, + "action_type": self.action_type, + "result": self.result, + "model": self.model, + "prompt": self.prompt, + "tags": self.tags, + "init_timestamp": self.init_timestamp, + "prompt_tokens": self.prompt_tokens, + "completion_tokens": self.completion_tokens, + }) diff --git a/agentops/http.py b/agentops/http.py index 685355b66..5460f0745 100644 --- a/agentops/http.py +++ b/agentops/http.py @@ -1,4 +1,3 @@ -import json from enum import Enum from typing import Optional import requests diff --git a/agentops/langchain_callback_handler.py b/agentops/langchain_callback_handler.py new file mode 100644 index 000000000..08981edb6 --- /dev/null +++ b/agentops/langchain_callback_handler.py @@ -0,0 +1,283 @@ +from typing import Dict, Any, List, Optional +from uuid import UUID + +from langchain_core.agents import AgentFinish, AgentAction +from langchain_core.outputs import LLMResult +from langchain_core.documents import Document + +from agentops import Client as AOClient +from agentops import Event +from tenacity import RetryCallState + +from langchain.callbacks.base import BaseCallbackHandler + +from agentops.helpers import get_ISO_time +from typing import Any, Dict, List, Optional, Sequence + + +class LangchainCallbackHandler(BaseCallbackHandler): + """Callback handler for Langchain agents.""" + + def __init__(self, api_key: str, tags: List[str] = None): + self.ao_client = AOClient(api_key=api_key, tags=tags, override=False) + + # keypair + self.events: Dict[Any, Event] = {} + + # LLM Callbacks + def on_llm_start( + self, + serialized: Dict[str, Any], + prompts: List[str], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id] = Event( + event_type="llm", + action_type='llm', + tags=tags, + model=kwargs['invocation_params']['model'], + params={**kwargs, **metadata}, + prompt=prompts[0], + init_timestamp=get_ISO_time() + ) + + def on_llm_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Fail" + + self.ao_client.record(self.events[run_id]) + + def on_llm_end( + self, + response: LLMResult, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].returns = response.generations[0][0].message.content + self.events[run_id].prompt_tokens = response.llm_output['token_usage']['prompt_tokens'] + self.events[run_id].completion_tokens = response.llm_output['token_usage']['completion_tokens'] + + if len(response.generations) > 0: + self.events[run_id].result = "Success" + else: + self.events[run_id].result = "Fail" + + self.ao_client.record(self.events[run_id]) + + # Chain callbacks + def on_chain_start( + self, + serialized: Dict[str, Any], + inputs: Dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id] = Event( + event_type="chain", + init_timestamp=get_ISO_time(), + tags=tags, + params={**inputs, **kwargs, **metadata}, + ) + + def on_chain_end( + self, + outputs: Dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Success" + self.events[run_id].returns = outputs + + self.ao_client.record(self.events[run_id]) + + def on_chain_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Fail" + + self.ao_client.record(self.events[run_id]) + + # Tool callbacks + def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + """Run when tool starts running.""" + self.events[run_id] = Event( + event_type="tool", + init_timestamp=get_ISO_time(), + tags=tags, + params={**serialized, **metadata}, + ) + + def on_tool_end( + self, + output: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + # Tools are capable of failing `on_tool_end` quietly. + # This is a workaround to make sure we can log it as an error. + if kwargs.get('name') == '_Exception': + self.events[run_id].result = "Fail" + else: + self.events[run_id].result = "Success" + + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].returns = output + + self.ao_client.record(self.events[run_id]) + + def on_tool_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Fail" + self.events[run_id].returns = str(error) + + self.ao_client.record(self.events[run_id]) + + # Retriever callbacks + def on_retriever_start( + self, + serialized: Dict[str, Any], + query: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + self.events[run_id] = Event( + event_type="retriever", + init_timestamp=get_ISO_time() + ) + + def on_retriever_end( + self, + documents: Sequence[Document], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + **kwargs: Any, + ) -> None: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Success" + + self.ao_client.record(self.events[run_id]) + + def on_retriever_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[List[str]] = None, + **kwargs: Any, + ) -> None: + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Fail" + + self.ao_client.record(self.events[run_id]) + + # Agent callbacks + def on_agent_action( + self, + action: AgentAction, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + """Run on agent action.""" + self.events[run_id] = Event( + event_type="agent", + init_timestamp=get_ISO_time(), + params={**kwargs}, + ) + + def on_agent_finish( + self, + finish: AgentFinish, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + """Run on agent finish.""" + self.events[run_id].end_timestamp = get_ISO_time() + self.events[run_id].result = "Success" + self.events[run_id].returns = finish + + self.ao_client.record(self.events[run_id]) + + # TODO: Create a way for the end user to set this based on their conditions + self.ao_client.end_session("Success") + + # Misc. + def on_retry( + self, + retry_state: RetryCallState, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + """Run on a retry event.""" + event = Event( + event_type="retry", + init_timestamp=get_ISO_time(), + end_timestamp=get_ISO_time(), + params={**kwargs}, + result="Indeterminate", + returns=retry_state + ) + self.ao_client.record(event) + + @property + def session_id(self): + return self.ao_client.session.session_id diff --git a/examples/langchain_examples.ipynb b/examples/langchain_examples.ipynb new file mode 100644 index 000000000..c573c5800 --- /dev/null +++ b/examples/langchain_examples.ipynb @@ -0,0 +1,275 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e0deea1ab1db2a19", + "metadata": {}, + "source": [ + "# AgentOps Langchain Agent Implementation\n", + "\n", + "Using AgentOps monitoring with Langchain is simple. We've created a LangchainCallbackHandler that will do all of the heavy lifting!\n", + "\n", + "First we'll import the typical Langchain packages:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "initial_id", + "metadata": { + "ExecuteTime": { + "end_time": "2023-12-15T20:21:11.477270Z", + "start_time": "2023-12-15T20:21:10.289895Z" + } + }, + "outputs": [], + "source": [ + "import os\n", + "from langchain.chat_models import ChatOpenAI\n", + "from langchain.agents import initialize_agent, AgentType\n", + "from dotenv import load_dotenv\n", + "from langchain.agents import tool" + ] + }, + { + "cell_type": "markdown", + "id": "57ddb8eca4e8a3cb", + "metadata": {}, + "source": [ + "The only difference with using AgentOps is that we'll also import this special Callback Handler" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "585f00bb186711a7", + "metadata": { + "ExecuteTime": { + "end_time": "2023-12-15T20:21:11.478111Z", + "start_time": "2023-12-15T20:21:11.471462Z" + } + }, + "outputs": [], + "source": [ + "from agentops.langchain_callback_handler import LangchainCallbackHandler" + ] + }, + { + "cell_type": "markdown", + "id": "523be945b85dc5d5", + "metadata": {}, + "source": [ + "Next, we'll grab our two API keys. You can use dotenv like below or however else you like to load environment variables" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "1490411415d7317c", + "metadata": { + "ExecuteTime": { + "end_time": "2023-12-15T20:21:11.494019Z", + "start_time": "2023-12-15T20:21:11.479154Z" + } + }, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "load_dotenv()" + ] + }, + { + "cell_type": "markdown", + "id": "8371ec020e634dd0", + "metadata": {}, + "source": [ + "This is where AgentOps comes into play. Before creating our LLM instance via Langchain, first we'll create an instance of the AO LangchainCallbackHandler. After the handler is initialiezd, a session will be recorded automatically.\n", + "\n", + "Pass in your API key, and optionally any tags to describe this session for easier lookup in the AO dashboard. You can also retrieve the `session_id` of the newly created session." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "432921383f39c9d5", + "metadata": { + "ExecuteTime": { + "end_time": "2023-12-15T20:21:12.346995Z", + "start_time": "2023-12-15T20:21:11.483591Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Agent Ops session ID: 80c6593c-ae90-4b48-98e1-d17fcd801c70\n" + ] + } + ], + "source": [ + "AGENTOPS_API_KEY = os.environ.get('AGENTOPS_API_KEY')\n", + "OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')\n", + "\n", + "handler = LangchainCallbackHandler(api_key=AGENTOPS_API_KEY, tags=['Langchain Example'])\n", + "\n", + "print(\"Agent Ops session ID: \" + handler.session_id)\n", + "llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY,\n", + " model='gpt-3.5-turbo')" + ] + }, + { + "cell_type": "markdown", + "id": "93aa09ec", + "metadata": {}, + "source": [ + "Agents generally use tools. Let's define a simple tool here. Tool usage is also recorded." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6abf26f9", + "metadata": {}, + "outputs": [], + "source": [ + "@tool\n", + "def find_movie(term) -> str:\n", + " \"\"\"Find available movies\"\"\"\n", + " return 'Citizen Kane'\n", + "\n", + "\n", + "tools = [find_movie]" + ] + }, + { + "cell_type": "markdown", + "id": "58bbca0b49302b2b", + "metadata": {}, + "source": [ + "Finally, let's use our agent! Pass in the callback handler to the agent, and all the actions will be recorded in the AO Dashboard" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "d538b20aa954ee80", + "metadata": { + "ExecuteTime": { + "end_time": "2023-12-15T20:21:12.352862Z", + "start_time": "2023-12-15T20:21:12.351126Z" + } + }, + "outputs": [], + "source": [ + "agent = initialize_agent(tools,\n", + " llm,\n", + " agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,\n", + " verbose=True,\n", + " callbacks=[handler], # You must pass in a callback handler to record your agent\n", + " handle_parsing_errors=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "6dfb127553751384", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "\n", + "\u001b[1m> Entering new AgentExecutor chain...\u001b[0m\n", + "\u001b[32;1m\u001b[1;3mCould not parse LLM output: I can use the `find_movie` tool to find the available movies.\u001b[0m\n", + "Observation: Invalid or incomplete response\n", + "Thought:\u001b[32;1m\u001b[1;3mCould not parse LLM output: I will use the `find_movie` tool to find the available movies.\u001b[0m\n", + "Observation: Invalid or incomplete response\n", + "Thought:\u001b[32;1m\u001b[1;3mI will use the `find_movie` tool to find the available movies.\n", + "Action:\n", + "```\n", + "{\n", + " \"action\": \"find_movie\",\n", + " \"action_input\": \"\"\n", + "}\n", + "```\u001b[0m\n", + "Observation: \u001b[36;1m\u001b[1;3mCitizen Kane\u001b[0m\n", + "Thought:\u001b[32;1m\u001b[1;3mCould not parse LLM output: The movie \"Citizen Kane\" is currently playing.\u001b[0m\n", + "Observation: Invalid or incomplete response\n", + "Thought:\u001b[32;1m\u001b[1;3mI will use the `find_movie` tool to find the available movies.\n", + "Action:\n", + "```\n", + "{\n", + " \"action\": \"find_movie\",\n", + " \"action_input\": \"\"\n", + "}\n", + "```\u001b[0m\n", + "Observation: \u001b[36;1m\u001b[1;3mCitizen Kane\u001b[0m\n", + "Thought:\u001b[32;1m\u001b[1;3mI will use the `find_movie` tool to find the available movies.\n", + "Action:\n", + "```\n", + "{\n", + " \"action\": \"find_movie\",\n", + " \"action_input\": \"\"\n", + "}\n", + "```\n", + "\u001b[0m\n", + "Observation: \u001b[36;1m\u001b[1;3mCitizen Kane\u001b[0m\n", + "Thought:\u001b[32;1m\u001b[1;3mCould not parse LLM output: I now know the final answer.\u001b[0m\n", + "Observation: Invalid or incomplete response\n", + "Thought:\u001b[32;1m\u001b[1;3mI now know the final answer.\n", + "Final Answer: The movie \"Citizen Kane\" is currently playing.\u001b[0m\n", + "\n", + "\u001b[1m> Finished chain.\u001b[0m\n" + ] + }, + { + "data": { + "text/plain": [ + "'The movie \"Citizen Kane\" is currently playing.'" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "agent.run(\"What movies are playing?\", callbacks=[handler])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt index 45f6e9f64..0d62c32f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ pytest==7.4.0 requests==2.31.0 requests-mock==1.11.0 -pydantic==2.4.2 \ No newline at end of file +pydantic==2.4.2 +langchain==0.0.350 \ No newline at end of file