Skip to content

Commit

Permalink
overhauled handler code
Browse files Browse the repository at this point in the history
  • Loading branch information
the-praxs committed Dec 22, 2024
1 parent 676e6ca commit 306a3b5
Showing 1 changed file with 141 additions and 101 deletions.
242 changes: 141 additions & 101 deletions agentops/partners/taskweaver_event_handler.py
Original file line number Diff line number Diff line change
@@ -1,156 +1,196 @@
from taskweaver.module.event_emitter import (
SessionEventHandlerBase,
TaskWeaverEvent,
EventScope,
SessionEventType,
RoundEventType,
PostEventType,
)
import agentops
from typing import Dict, Optional, Any
from uuid import UUID
from agentops.event import ActionEvent, ErrorEvent, ToolEvent
from datetime import datetime, timezone
from typing import Dict, Any
import logger

ATTACHMENT_TOOLS = [
"thought",
"reply_type",
"reply_content",
"verification",
"code_error",
"execution_status",
"execution_result",
"artifact_paths",
"revise_message",
"function",
"web_exploring_plan",
"web_exploring_screenshot",
"web_exploring_link",
]


class TaskWeaverEventHandler(SessionEventHandlerBase):
def __init__(self):
super().__init__()
self.current_round_id: Optional[str] = None
self.agent_sessions: Dict[str, Any] = {}
self._message_buffer: Dict[str, str] = {}
self._message_buffer: Dict[str, Dict[str, Any]] = {}
self._attachment_buffer: Dict[str, Dict[str, Any]] = {}
self._active_agents: Dict[str, str] = {} # Maps role_round_id to agent_id
self._active_agents: Dict[str, str] = {}

def _get_or_create_agent(self, role: str) -> str:
def _get_or_create_agent(self, role: str):
"""Get existing agent ID or create new agent for role+round combination"""
agent_key = f"{role}"
if agent_key not in self._active_agents:
if role not in self._active_agents:
agent_id = agentops.create_agent(name=role)
if agent_id: # Only store if agent creation was successful
self._active_agents[agent_key] = agent_id
return self._active_agents.get(agent_key)
if agent_id:
self._active_agents[role] = agent_id
return self._active_agents.get(role)

def handle_session(self, type: SessionEventType, msg: str, extra: Any, **kwargs: Any):
agentops.record(
agentops.ActionEvent(action_type=type.value, params={"message": msg}, returns=str(extra) if extra else None)
ActionEvent(action_type=type.value, params={"extra": extra, "message": msg})
)

def handle_round(self, type: RoundEventType, msg: str, extra: Any, round_id: str, **kwargs: Any):
if type == RoundEventType.round_start:
self.current_round_id = round_id
agentops.record(agentops.ActionEvent(action_type="round_start", params={"round_id": round_id}, returns=msg))

elif type == RoundEventType.round_error:
if type == RoundEventType.round_error:
agentops.record(
agentops.ErrorEvent(error_type="round_error", details={"message": msg, "round_id": round_id})
ErrorEvent(
error_type=type.value,
details={"round_id": round_id, "message": msg, "extra": extra}
)
)

elif type == RoundEventType.round_end:
agentops.record(agentops.ActionEvent(action_type="round_end", params={"round_id": round_id}, returns=msg))
self.current_round_id = None
logger.error(f"Could not record the Round event: {msg}")
self.cleanup_round()
else:
agentops.record(
ActionEvent(
action_type=type.value,
params={"round_id": round_id, "extra": extra},
returns=msg,
)
)
if type == RoundEventType.round_end:
self.cleanup_round()

def handle_post(self, type: PostEventType, msg: str, extra: Any, post_id: str, round_id: str, **kwargs: Any):
role = extra.get("role", "Planner")
agent_id = self._get_or_create_agent(role=role)

if type == PostEventType.post_start:
if type == PostEventType.post_error:
agentops.record(
agentops.ActionEvent(
action_type="post_start",
params={
"post_id": post_id,
"round_id": round_id,
"agent_id": agent_id,
},
returns=msg,
ErrorEvent(
error_type=type.value,
details={"post_id": post_id, "round_id": round_id, "message": msg, "extra": extra},
)
)
logger.error(f"Could not record the Post event: {msg}")

elif type == PostEventType.post_message_update:
is_end = extra.get("is_end", False)
if not is_end:
self._message_buffer[post_id] = self._message_buffer.get(post_id, "") + msg
else:
agentops.record(
agentops.ActionEvent(
action_type="post_message_update",
params={
"post_id": post_id,
"round_id": round_id,
"agent_id": agent_id,
"is_end": is_end,
"model": extra.get("model", None),
},
returns=self._message_buffer.get(post_id, ""),
)
elif type == PostEventType.post_start or type == PostEventType.post_end:
agentops.record(
ActionEvent(
action_type=type.value,
params={"post_id": post_id, "round_id": round_id, "extra": extra},
returns=msg,
agent_id=agent_id,
)
)

if is_end:
self._message_buffer.pop(post_id, None)

elif type == PostEventType.post_status_update:
agentops.record(
ActionEvent(
action_type=type.value,
params={"post_id": post_id, "round_id": round_id, "extra": extra},
returns=msg,
agent_id=agent_id,
)
)

elif type == PostEventType.post_attachment_update:
attachment_id = extra.get("id", "")
attachment_type = extra.get("type", "")
is_end = extra.get("is_end", False)
attachment_id = extra["id"]
attachment_type = extra["type"].value
is_end = extra["is_end"]

if attachment_id not in self._attachment_buffer:
self._attachment_buffer[attachment_id] = {
"type": attachment_type,
"content": "",
"post_id": post_id,
"round_id": round_id,
"agent_id": agent_id,
"role": attachment_type,
"content": [],
"init_timestamp": datetime.now(timezone.utc).isoformat(),
"end_timestamp": None,
}

agentops.record(
agentops.ActionEvent(
action_type="attachment_stream_start",
params={
"attachment_id": attachment_id,
"attachment_type": str(attachment_type),
"post_id": post_id,
"round_id": round_id,
"agent_id": agent_id,
},
self._attachment_buffer[attachment_id]["content"].append(str(msg))

if is_end:
self._attachment_buffer[attachment_id]["end_timestamp"] = datetime.now(timezone.utc).isoformat()
complete_message = "".join(self._attachment_buffer[attachment_id]["content"])

if attachment_type in ATTACHMENT_TOOLS:
agentops.record(
ToolEvent(
name=type.value,
init_timestamp=self._attachment_buffer[attachment_id]["init_timestamp"],
end_timestamp=self._attachment_buffer[attachment_id]["end_timestamp"],
params={
"post_id": post_id,
"round_id": round_id,
"attachment_id": attachment_id,
"attachment_type": self._attachment_buffer[attachment_id]["role"],
"extra": extra,
},
returns=complete_message,
agent_id=agent_id,
)
)
)
else:
agentops.record(
ActionEvent(
action_type=type.value,
init_timestamp=self._attachment_buffer[attachment_id]["init_timestamp"],
end_timestamp=self._attachment_buffer[attachment_id]["end_timestamp"],
params={
"post_id": post_id,
"round_id": round_id,
"attachment_id": attachment_id,
"attachment_type": self._attachment_buffer[attachment_id]["role"],
"extra": extra,
},
returns=complete_message,
agent_id=agent_id,
)
)

self._attachment_buffer.pop(attachment_id, None)

elif type == PostEventType.post_message_update:
is_end = extra["is_end"]

self._attachment_buffer[attachment_id]["content"] += str(msg)
if post_id not in self._message_buffer:
self._message_buffer[post_id] = {
"content": [],
"init_timestamp": datetime.now(timezone.utc).isoformat(),
"end_timestamp": None,
}

self._message_buffer[post_id]["content"].append(str(msg))

if is_end:
buffer = self._attachment_buffer[attachment_id]
self._message_buffer[post_id]["end_timestamp"] = datetime.now(timezone.utc).isoformat()
complete_message = "".join(self._message_buffer[post_id]["content"])
agentops.record(
agentops.ToolEvent(
name=str(buffer["type"]),
ActionEvent(
action_type=type.value,
init_timestamp=self._message_buffer[post_id]["init_timestamp"],
end_timestamp=self._message_buffer[post_id]["end_timestamp"],
params={
"post_id": buffer["post_id"],
"round_id": buffer["round_id"],
"attachment_id": attachment_id,
"post_id": post_id,
"round_id": round_id,
"extra": extra,
},
returns=buffer["content"],
agent_id=buffer["agent_id"],
returns=complete_message,
agent_id=agent_id,
)
)
self._attachment_buffer.pop(attachment_id)

elif type == PostEventType.post_error:
agentops.record(
agentops.ErrorEvent(
error_type="post_error",
details={"message": msg, "post_id": post_id, "round_id": round_id},
)
)

elif type == PostEventType.post_end:
agentops.record(
agentops.ActionEvent(
action_type="post_end",
params={"post_id": post_id, "round_id": round_id, "agent_id": agent_id},
returns=msg,
)
)
self._message_buffer.pop(post_id, None)

def cleanup_round(self, round_id: str):
def cleanup_round(self):
"""Cleanup agents and buffers for a completed round"""
self._active_agents = {k: v for k, v in self._active_agents.items() if not k.endswith(round_id)}
self._active_agents.clear()
self._message_buffer.clear()
self._attachment_buffer.clear()

0 comments on commit 306a3b5

Please sign in to comment.