From 306a3b56e2f4f885d592aebed48ef21e00a73a4c Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sun, 22 Dec 2024 07:08:25 +0530 Subject: [PATCH] overhauled handler code --- agentops/partners/taskweaver_event_handler.py | 242 ++++++++++-------- 1 file changed, 141 insertions(+), 101 deletions(-) diff --git a/agentops/partners/taskweaver_event_handler.py b/agentops/partners/taskweaver_event_handler.py index 0f4d7d7e..73c219d7 100755 --- a/agentops/partners/taskweaver_event_handler.py +++ b/agentops/partners/taskweaver_event_handler.py @@ -1,156 +1,196 @@ from taskweaver.module.event_emitter import ( SessionEventHandlerBase, - TaskWeaverEvent, - EventScope, SessionEventType, RoundEventType, PostEventType, ) import agentops -from typing import Dict, Optional, Any -from uuid import UUID +from agentops.event import ActionEvent, ErrorEvent, ToolEvent +from datetime import datetime, timezone +from typing import Dict, Any +import logger + +ATTACHMENT_TOOLS = [ + "thought", + "reply_type", + "reply_content", + "verification", + "code_error", + "execution_status", + "execution_result", + "artifact_paths", + "revise_message", + "function", + "web_exploring_plan", + "web_exploring_screenshot", + "web_exploring_link", +] class TaskWeaverEventHandler(SessionEventHandlerBase): def __init__(self): super().__init__() - self.current_round_id: Optional[str] = None - self.agent_sessions: Dict[str, Any] = {} - self._message_buffer: Dict[str, str] = {} + self._message_buffer: Dict[str, Dict[str, Any]] = {} self._attachment_buffer: Dict[str, Dict[str, Any]] = {} - self._active_agents: Dict[str, str] = {} # Maps role_round_id to agent_id + self._active_agents: Dict[str, str] = {} - def _get_or_create_agent(self, role: str) -> str: + def _get_or_create_agent(self, role: str): """Get existing agent ID or create new agent for role+round combination""" - agent_key = f"{role}" - if agent_key not in self._active_agents: + if role not in self._active_agents: agent_id = agentops.create_agent(name=role) - if agent_id: # Only store if agent creation was successful - self._active_agents[agent_key] = agent_id - return self._active_agents.get(agent_key) + if agent_id: + self._active_agents[role] = agent_id + return self._active_agents.get(role) def handle_session(self, type: SessionEventType, msg: str, extra: Any, **kwargs: Any): agentops.record( - agentops.ActionEvent(action_type=type.value, params={"message": msg}, returns=str(extra) if extra else None) + ActionEvent(action_type=type.value, params={"extra": extra, "message": msg}) ) def handle_round(self, type: RoundEventType, msg: str, extra: Any, round_id: str, **kwargs: Any): - if type == RoundEventType.round_start: - self.current_round_id = round_id - agentops.record(agentops.ActionEvent(action_type="round_start", params={"round_id": round_id}, returns=msg)) - - elif type == RoundEventType.round_error: + if type == RoundEventType.round_error: agentops.record( - agentops.ErrorEvent(error_type="round_error", details={"message": msg, "round_id": round_id}) + ErrorEvent( + error_type=type.value, + details={"round_id": round_id, "message": msg, "extra": extra} + ) ) - - elif type == RoundEventType.round_end: - agentops.record(agentops.ActionEvent(action_type="round_end", params={"round_id": round_id}, returns=msg)) - self.current_round_id = None + logger.error(f"Could not record the Round event: {msg}") + self.cleanup_round() + else: + agentops.record( + ActionEvent( + action_type=type.value, + params={"round_id": round_id, "extra": extra}, + returns=msg, + ) + ) + if type == RoundEventType.round_end: + self.cleanup_round() def handle_post(self, type: PostEventType, msg: str, extra: Any, post_id: str, round_id: str, **kwargs: Any): role = extra.get("role", "Planner") agent_id = self._get_or_create_agent(role=role) - if type == PostEventType.post_start: + if type == PostEventType.post_error: agentops.record( - agentops.ActionEvent( - action_type="post_start", - params={ - "post_id": post_id, - "round_id": round_id, - "agent_id": agent_id, - }, - returns=msg, + ErrorEvent( + error_type=type.value, + details={"post_id": post_id, "round_id": round_id, "message": msg, "extra": extra}, ) ) + logger.error(f"Could not record the Post event: {msg}") - elif type == PostEventType.post_message_update: - is_end = extra.get("is_end", False) - if not is_end: - self._message_buffer[post_id] = self._message_buffer.get(post_id, "") + msg - else: - agentops.record( - agentops.ActionEvent( - action_type="post_message_update", - params={ - "post_id": post_id, - "round_id": round_id, - "agent_id": agent_id, - "is_end": is_end, - "model": extra.get("model", None), - }, - returns=self._message_buffer.get(post_id, ""), - ) + elif type == PostEventType.post_start or type == PostEventType.post_end: + agentops.record( + ActionEvent( + action_type=type.value, + params={"post_id": post_id, "round_id": round_id, "extra": extra}, + returns=msg, + agent_id=agent_id, ) + ) - if is_end: - self._message_buffer.pop(post_id, None) - + elif type == PostEventType.post_status_update: + agentops.record( + ActionEvent( + action_type=type.value, + params={"post_id": post_id, "round_id": round_id, "extra": extra}, + returns=msg, + agent_id=agent_id, + ) + ) + elif type == PostEventType.post_attachment_update: - attachment_id = extra.get("id", "") - attachment_type = extra.get("type", "") - is_end = extra.get("is_end", False) + attachment_id = extra["id"] + attachment_type = extra["type"].value + is_end = extra["is_end"] if attachment_id not in self._attachment_buffer: self._attachment_buffer[attachment_id] = { - "type": attachment_type, - "content": "", - "post_id": post_id, - "round_id": round_id, - "agent_id": agent_id, + "role": attachment_type, + "content": [], + "init_timestamp": datetime.now(timezone.utc).isoformat(), + "end_timestamp": None, } - agentops.record( - agentops.ActionEvent( - action_type="attachment_stream_start", - params={ - "attachment_id": attachment_id, - "attachment_type": str(attachment_type), - "post_id": post_id, - "round_id": round_id, - "agent_id": agent_id, - }, + self._attachment_buffer[attachment_id]["content"].append(str(msg)) + + if is_end: + self._attachment_buffer[attachment_id]["end_timestamp"] = datetime.now(timezone.utc).isoformat() + complete_message = "".join(self._attachment_buffer[attachment_id]["content"]) + + if attachment_type in ATTACHMENT_TOOLS: + agentops.record( + ToolEvent( + name=type.value, + init_timestamp=self._attachment_buffer[attachment_id]["init_timestamp"], + end_timestamp=self._attachment_buffer[attachment_id]["end_timestamp"], + params={ + "post_id": post_id, + "round_id": round_id, + "attachment_id": attachment_id, + "attachment_type": self._attachment_buffer[attachment_id]["role"], + "extra": extra, + }, + returns=complete_message, + agent_id=agent_id, + ) ) - ) + else: + agentops.record( + ActionEvent( + action_type=type.value, + init_timestamp=self._attachment_buffer[attachment_id]["init_timestamp"], + end_timestamp=self._attachment_buffer[attachment_id]["end_timestamp"], + params={ + "post_id": post_id, + "round_id": round_id, + "attachment_id": attachment_id, + "attachment_type": self._attachment_buffer[attachment_id]["role"], + "extra": extra, + }, + returns=complete_message, + agent_id=agent_id, + ) + ) + + self._attachment_buffer.pop(attachment_id, None) + + elif type == PostEventType.post_message_update: + is_end = extra["is_end"] - self._attachment_buffer[attachment_id]["content"] += str(msg) + if post_id not in self._message_buffer: + self._message_buffer[post_id] = { + "content": [], + "init_timestamp": datetime.now(timezone.utc).isoformat(), + "end_timestamp": None, + } + self._message_buffer[post_id]["content"].append(str(msg)) + if is_end: - buffer = self._attachment_buffer[attachment_id] + self._message_buffer[post_id]["end_timestamp"] = datetime.now(timezone.utc).isoformat() + complete_message = "".join(self._message_buffer[post_id]["content"]) agentops.record( - agentops.ToolEvent( - name=str(buffer["type"]), + ActionEvent( + action_type=type.value, + init_timestamp=self._message_buffer[post_id]["init_timestamp"], + end_timestamp=self._message_buffer[post_id]["end_timestamp"], params={ - "post_id": buffer["post_id"], - "round_id": buffer["round_id"], - "attachment_id": attachment_id, + "post_id": post_id, + "round_id": round_id, + "extra": extra, }, - returns=buffer["content"], - agent_id=buffer["agent_id"], + returns=complete_message, + agent_id=agent_id, ) ) - self._attachment_buffer.pop(attachment_id) - - elif type == PostEventType.post_error: - agentops.record( - agentops.ErrorEvent( - error_type="post_error", - details={"message": msg, "post_id": post_id, "round_id": round_id}, - ) - ) - elif type == PostEventType.post_end: - agentops.record( - agentops.ActionEvent( - action_type="post_end", - params={"post_id": post_id, "round_id": round_id, "agent_id": agent_id}, - returns=msg, - ) - ) + self._message_buffer.pop(post_id, None) - def cleanup_round(self, round_id: str): + def cleanup_round(self): """Cleanup agents and buffers for a completed round""" - self._active_agents = {k: v for k, v in self._active_agents.items() if not k.endswith(round_id)} + self._active_agents.clear() self._message_buffer.clear() self._attachment_buffer.clear()