Skip to content

Commit

Permalink
Rework initialization, improve logging, and other refactoring (#315)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyangqiu authored Jul 31, 2024
1 parent 4dc0382 commit e3ae159
Show file tree
Hide file tree
Showing 22 changed files with 912 additions and 1,583 deletions.
270 changes: 182 additions & 88 deletions agentops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
# agentops/__init__.py
import functools
import os
import logging
import sys
from typing import Optional, List, Union

from .client import Client
from .config import ClientConfiguration
from .event import Event, ActionEvent, LLMEvent, ToolEvent, ErrorEvent
from .decorators import record_function
from .agent import track_agent
from .decorators import record_function, track_agent
from .log_config import logger
from .session import Session
from .state import get_state, set_state


try:
from .partners.langchain_callback_handler import (
Expand All @@ -22,20 +16,13 @@
except ModuleNotFoundError:
pass

if "autogen" in sys.modules:
Client().configure(instrument_llm_calls=False)
Client().add_default_tags(["autogen"])

def noop(*args, **kwargs):
return


def check_init(child_function):
@functools.wraps(child_function)
def wrapper(*args, **kwargs):
if get_state("is_initialized"): # is initialized in state.py is not working
return child_function(*args, **kwargs)
else:
return noop(*args, **kwargs)

return wrapper
if "crewai" in sys.modules:
Client().configure(instrument_llm_calls=False)
Client().add_default_tags(["crewai"])


def init(
Expand All @@ -44,102 +31,111 @@ def init(
endpoint: Optional[str] = None,
max_wait_time: Optional[int] = None,
max_queue_size: Optional[int] = None,
tags: Optional[List[str]] = None,
instrument_llm_calls=True,
auto_start_session=True,
tags: Optional[List[str]] = None, # Deprecated
default_tags: Optional[List[str]] = None,
instrument_llm_calls: Optional[bool] = None,
auto_start_session: Optional[bool] = None,
inherited_session_id: Optional[str] = None,
skip_auto_end_session: Optional[bool] = False,
skip_auto_end_session: Optional[bool] = None,
) -> Union[Session, None]:
"""
Initializes the AgentOps singleton pattern.
Args:
api_key (str, optional): API Key for AgentOps services. If none is provided, key will
be read from the AGENTOPS_API_KEY environment variable.
parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. If none is provided, key will
be read from the AGENTOPS_PARENT_KEY environment variable.
endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will
be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'.
max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue.
Defaults to 30,000 (30 seconds)
Defaults to 5,000 (5 seconds)
max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100.
tags (List[str], optional): Tags for the sessions that can be used for grouping or
sorting later (e.g. ["GPT-4"]).
tags (List[str], optional): [Deprecated] Use `default_tags` instead.
default_tags (List[str], optional): Default tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]).
instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents.
auto_start_session (bool): Whether to start a session automatically when the client is created.
inherited_session_id (optional, str): Init Agentops with an existing Session
skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making (i.e. Crew determining when tasks are complete and ending the session)
skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making
(i.e. Crew determining when tasks are complete and ending the session)
Attributes:
"""
logging_level = os.getenv("AGENTOPS_LOGGING_LEVEL")
log_levels = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"DEBUG": logging.DEBUG,
}
logger.setLevel(log_levels.get(logging_level or "INFO", "INFO"))

c = Client(
Client().unsuppress_logs()
if Client().is_initialized:
return logger.warning("AgentOps has already been initialized")

if tags is not None:
logger.warning("The 'tags' parameter is deprecated. Use 'default_tags' instead")
if default_tags is None:
default_tags = tags

Client().configure(
api_key=api_key,
parent_key=parent_key,
endpoint=endpoint,
max_wait_time=max_wait_time,
max_queue_size=max_queue_size,
tags=tags,
default_tags=default_tags,
instrument_llm_calls=instrument_llm_calls,
auto_start_session=False, # handled below
inherited_session_id=inherited_session_id,
auto_start_session=auto_start_session,
skip_auto_end_session=skip_auto_end_session,
)

# handle auto_start_session here so we can get the session object to return rather than client above
# if the client automatically starts a session from a partner framework don't start a second
session = None
if auto_start_session and len(c.current_session_ids) == 0:
session = c.start_session(
tags=tags, config=c.config, inherited_session_id=inherited_session_id
)
if inherited_session_id is not None:
if auto_start_session == False:
Client().add_pre_init_warning(
"auto_start_session is set to False - inherited_session_id will not be used to automatically start a session"
)
return Client().initialize()

set_state("is_initialized", True)
Client().configure(auto_start_session=False)
Client().initialize()
return Client().start_session(inherited_session_id=inherited_session_id)

return session
return Client().initialize()


def end_session(
end_state: str,
end_state_reason: Optional[str] = None,
video: Optional[str] = None,
is_auto_end: Optional[bool] = False,
def configure(
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
endpoint: Optional[str] = None,
max_wait_time: Optional[int] = None,
max_queue_size: Optional[int] = None,
default_tags: Optional[List[str]] = None,
instrument_llm_calls: Optional[bool] = None,
auto_start_session: Optional[bool] = None,
skip_auto_end_session: Optional[bool] = None,
):
"""
End the current session with the AgentOps service.
Configure the AgentOps Client
Args:
end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate.
end_state_reason (str, optional): The reason for ending the session.
video (str, optional): URL to a video recording of the session
is_auto_end (bool, optional): is this an automatic use of end_session and should be skipped with bypass_auto_end_session
api_key (str, optional): API Key for AgentOps services.
parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization.
endpoint (str, optional): The endpoint for the AgentOps service.
max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue.
max_queue_size (int, optional): The maximum size of the event queue
default_tags (List[str], optional): Default tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]).
instrument_llm_calls (bool, optional): Whether to instrument LLM calls and emit LLMEvents.
auto_start_session (bool, optional): Whether to start a session automatically when the client is created.
skip_auto_end_session (bool, optional): Don't automatically end session based on your framework's decision-making
(i.e. Crew determining when tasks are complete and ending the session)
"""
Client().end_session(
end_state=end_state,
end_state_reason=end_state_reason,
video=video,
is_auto_end=is_auto_end,
Client().configure(
api_key=api_key,
parent_key=parent_key,
endpoint=endpoint,
max_wait_time=max_wait_time,
max_queue_size=max_queue_size,
default_tags=default_tags,
instrument_llm_calls=instrument_llm_calls,
auto_start_session=auto_start_session,
skip_auto_end_session=skip_auto_end_session,
)


# Mostly used for unit testing -
# prevents unexpected sessions on new tests
def end_all_sessions() -> None:
return Client().end_all_sessions()


def start_session(
tags: Optional[List[str]] = None,
config: Optional[ClientConfiguration] = None,
inherited_session_id: Optional[str] = None,
) -> Union[Session, None]:
"""
Expand All @@ -148,28 +144,71 @@ def start_session(
Args:
tags (List[str], optional): Tags that can be used for grouping or sorting later.
e.g. ["test_run"].
config: (Configuration, optional): Client configuration object,
inherited_session_id: (str, optional): Set the session ID to inherit from another client
"""
Client().unsuppress_logs()

if not Client().is_initialized:
return logger.warning(
"AgentOps has not been initialized yet. Please call agentops.init() before starting a session"
)

return Client().start_session(tags, inherited_session_id)


def end_session(
end_state: str,
end_state_reason: Optional[str] = None,
video: Optional[str] = None,
is_auto_end: Optional[bool] = False,
):
"""
End the current session with the AgentOps service.
try:
sess_result = Client().start_session(tags, config, inherited_session_id)
Args:
end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate.
end_state_reason (str, optional): The reason for ending the session.
video (str, optional): URL to a video recording of the session
"""
Client().unsuppress_logs()

set_state("is_initialized", True)
if Client().is_multi_session:
return logger.warning(
"Could not end session - multiple sessions detected. You must use session.end_session() instead of agentops.end_session()"
+ " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management"
)

return sess_result
except Exception:
pass
if not Client().has_sessions:
return logger.warning("Could not end session - no sessions detected")

Client().end_session(
end_state=end_state,
end_state_reason=end_state_reason,
video=video,
is_auto_end=is_auto_end,
)


@check_init
def record(event: Union[Event, ErrorEvent]):
"""
Record an event with the AgentOps service.
Args:
event (Event): The event to record.
"""
Client().unsuppress_logs()

if Client().is_multi_session:
return logger.warning(
"Could not record event - multiple sessions detected. You must use session.record() instead of agentops.record()"
+ " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management"
)

if not Client().has_sessions:
return logger.warning(
"Could not record event - no sessions detected. Create a session by calling agentops.start_session()"
)

Client().record(event)


Expand All @@ -180,6 +219,17 @@ def add_tags(tags: List[str]):
Args:
tags (List[str]): The list of tags to append.
"""
if Client().is_multi_session:
return logger.warning(
"Could not add tags to session - multiple sessions detected. You must use session.add_tags() instead of agentops.add_tags()"
+ " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management"
)

if not Client().has_sessions:
return logger.warning(
"Could not add tags to session - no sessions detected. Create a session by calling agentops.start_session()"
)

Client().add_tags(tags)


Expand All @@ -190,27 +240,71 @@ def set_tags(tags: List[str]):
Args:
tags (List[str]): The list of tags to set.
"""
if Client().is_multi_session:
return logger.warning(
"Could not set tags on session - multiple sessions detected. You must use session.set_tags() instead of agentops.set_tags()"
+ " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management"
)

if not Client().has_sessions:
return logger.warning(
"Could not set tags on session - no sessions detected. Create a session by calling agentops.start_session()"
)

Client().set_tags(tags)


def get_api_key() -> str:
def get_api_key() -> Union[str, None]:
return Client().api_key


def set_parent_key(parent_key):
def set_api_key(api_key: str) -> None:
Client().configure(api_key=api_key)


def set_parent_key(parent_key: str):
"""
Set the parent API key so another organization can view data.
Args:
parent_key (str): The API key of the parent organization to set.
"""
Client().set_parent_key(parent_key)
Client().configure(parent_key=parent_key)


def stop_instrumenting():
Client().stop_instrumenting()
if Client().is_initialized:
Client().stop_instrumenting()


@check_init
def create_agent(name: str, agent_id: Optional[str] = None):
if Client().is_multi_session:
return logger.warning(
"Could not create agent - multiple sessions detected. You must use session.create_agent() instead of agentops.create_agent()"
+ " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management"
)

if not Client().has_sessions:
return logger.warning(
"Could not create agent - no sessions detected. Create a session by calling agentops.start_session()"
)

return Client().create_agent(name=name, agent_id=agent_id)


def get_session(session_id: str):
"""
Get an active (not ended) session from the AgentOps service
Args:
session_id (str): the session id for the session to be retreived
"""
Client().unsuppress_logs()

return Client().get_session(session_id)


# Mostly used for unit testing -
# prevents unexpected sessions on new tests
def end_all_sessions() -> None:
return Client().end_all_sessions()
Loading

0 comments on commit e3ae159

Please sign in to comment.