From 51ab7a88106df7e571749faedc2dc0da1bc81aa1 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 13:14:13 +0800 Subject: [PATCH 01/13] Split the rpc_agent module into multiple submodules --- .../distributed_basic/distributed_dialog.py | 2 +- .../distributed_debate/distributed_debate.py | 2 +- examples/distributed_simulation/main.py | 2 +- src/agentscope/agents/__init__.py | 3 +- src/agentscope/agents/rpc_agent.py | 622 +----------------- src/agentscope/message.py | 2 +- src/agentscope/server/__init__.py | 9 + src/agentscope/server/launcher.py | 344 ++++++++++ src/agentscope/server/servicer.py | 305 +++++++++ tests/rpc_agent_test.py | 2 +- 10 files changed, 667 insertions(+), 626 deletions(-) create mode 100644 src/agentscope/server/__init__.py create mode 100644 src/agentscope/server/launcher.py create mode 100644 src/agentscope/server/servicer.py diff --git a/examples/distributed_basic/distributed_dialog.py b/examples/distributed_basic/distributed_dialog.py index e558c54fa..ab0de4235 100644 --- a/examples/distributed_basic/distributed_dialog.py +++ b/examples/distributed_basic/distributed_dialog.py @@ -7,7 +7,7 @@ import agentscope from agentscope.agents.user_agent import UserAgent from agentscope.agents.dialog_agent import DialogAgent -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher def parse_args() -> argparse.Namespace: diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index f5813e6f2..a4e0a4287 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -8,7 +8,7 @@ import agentscope from agentscope.agents import DialogAgent from agentscope.msghub import msghub -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index 7fd0cf19b..bb26fe533 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -11,7 +11,7 @@ import agentscope from agentscope.agents import AgentBase -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index 0d5f6f84d..7bc5f83e5 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -6,7 +6,7 @@ from .dict_dialog_agent import DictDialogAgent from .user_agent import UserAgent from .text_to_image_agent import TextToImageAgent -from .rpc_agent import RpcAgent, RpcAgentServerLauncher +from .rpc_agent import RpcAgent from .react_agent import ReActAgent @@ -20,5 +20,4 @@ "ReActAgent", "DistConf", "RpcAgent", - "RpcAgentServerLauncher", ] diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index b7c3441bc..e4274f700 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -1,43 +1,15 @@ # -*- coding: utf-8 -*- """ Base class for Rpc Agent """ - -from multiprocessing import Process, Event, Pipe -from multiprocessing.synchronize import Event as EventClass -import socket -import threading -import json -import base64 -import traceback -import asyncio -from typing import Any, Type, Optional, Union, Sequence -from concurrent import futures +from typing import Type, Optional, Union, Sequence from loguru import logger -try: - import dill - import grpc - from grpc import ServicerContext - from expiringdict import ExpiringDict -except ImportError: - dill = None - grpc = None - ServicerContext = Any - ExpiringDict = None - -from agentscope._init import init_process, _INIT_SETTINGS from agentscope.agents.agent import AgentBase from agentscope.message import ( - Msg, PlaceholderMessage, - deserialize, serialize, ) -from agentscope.rpc import ( - RpcAgentClient, - RpcMsg, - RpcAgentServicer, - add_RpcAgentServicer_to_server, -) +from agentscope.rpc import RpcAgentClient +from agentscope.server.launcher import RpcAgentServerLauncher def rpc_servicer_method( # type: ignore[no-untyped-def] @@ -217,591 +189,3 @@ def stop(self) -> None: def __del__(self) -> None: self.stop() - - -def setup_rpc_agent_server( - host: str, - port: int, - init_settings: dict = None, - start_event: EventClass = None, - stop_event: EventClass = None, - pipe: int = None, - local_mode: bool = True, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - custom_agents: list = None, -) -> None: - """Setup gRPC server rpc agent. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`): - The socket port monitored by grpc server. - init_settings (`dict`, defaults to `None`): - Init settings for agentscope.init. - start_event (`EventClass`, defaults to `None`): - An Event instance used to determine whether the child process - has been started. - stop_event (`EventClass`, defaults to `None`): - The stop Event instance used to determine whether the child - process has been stopped. - pipe (`int`, defaults to `None`): - A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): - Only listen to local requests. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. - """ - asyncio.run( - setup_rpc_agent_server_async( - host=host, - port=port, - init_settings=init_settings, - start_event=start_event, - stop_event=stop_event, - pipe=pipe, - local_mode=local_mode, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - custom_agents=custom_agents, - ), - ) - - -async def setup_rpc_agent_server_async( - host: str, - port: int, - init_settings: dict = None, - start_event: EventClass = None, - stop_event: EventClass = None, - pipe: int = None, - local_mode: bool = True, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - custom_agents: list = None, -) -> None: - """Setup gRPC server rpc agent in an async way. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`): - The socket port monitored by grpc server. - init_settings (`dict`, defaults to `None`): - Init settings for agentscope.init. - start_event (`EventClass`, defaults to `None`): - An Event instance used to determine whether the child process - has been started. - stop_event (`EventClass`, defaults to `None`): - The stop Event instance used to determine whether the child - process has been stopped. - pipe (`int`, defaults to `None`): - A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): - Only listen to local requests. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. - """ - - if init_settings is not None: - init_process(**init_settings) - servicer = AgentPlatform( - host=host, - port=port, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - ) - # update agent registry - if custom_agents is not None: - for agent_class in custom_agents: - AgentBase.register_agent_class(agent_class=agent_class) - while True: - try: - port = check_port(port) - servicer.port = port - logger.info( - f"Starting rpc server at port [{port}]...", - ) - server = grpc.aio.server( - futures.ThreadPoolExecutor(max_workers=None), - ) - add_RpcAgentServicer_to_server(servicer, server) - if local_mode: - server.add_insecure_port(f"localhost:{port}") - else: - server.add_insecure_port(f"0.0.0.0:{port}") - await server.start() - break - except OSError: - logger.warning( - f"Failed to start rpc server at port [{port}]" - f"try another port", - ) - logger.info( - f"rpc server at port [{port}] started successfully", - ) - if start_event is not None: - pipe.send(port) - start_event.set() - while not stop_event.is_set(): - await asyncio.sleep(1) - logger.info( - f"Stopping rpc server at port [{port}]", - ) - await server.stop(10.0) - else: - await server.wait_for_termination() - logger.info( - f"rpc server at port [{port}] stopped successfully", - ) - - -def find_available_port() -> int: - """Get an unoccupied socket port number.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - -def check_port(port: Optional[int] = None) -> int: - """Check if the port is available. - - Args: - port (`int`): - the port number being checked. - - Returns: - `int`: the port number that passed the check. If the port is found - to be occupied, an available port number will be automatically - returned. - """ - if port is None: - new_port = find_available_port() - logger.warning( - "gRpc server port is not provided, automatically select " - f"[{new_port}] as the port number.", - ) - return new_port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: - new_port = find_available_port() - logger.warning( - f"Port [{port}] is occupied, use [{new_port}] instead", - ) - return new_port - return port - - -class RpcAgentServerLauncher: - """The launcher of AgentPlatform (formerly RpcAgentServer).""" - - def __init__( - self, - host: str = "localhost", - port: int = None, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - local_mode: bool = False, - custom_agents: list = None, - agent_class: Type[AgentBase] = None, - agent_args: tuple = (), - agent_kwargs: dict = None, - ) -> None: - """Init a rpc agent server launcher. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`, defaults to `None`): - Port of the rpc agent server. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - local_mode (`bool`, defaults to `False`): - Whether the started rpc server only listens to local - requests. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in - `agentscope.agents`. - agent_class (`Type[AgentBase]`, deprecated): - The AgentBase subclass encapsulated by this wrapper. - agent_args (`tuple`, deprecated): The args tuple used to - initialize the agent_class. - agent_kwargs (`dict`, deprecated): The args dict used to - initialize the agent_class. - """ - self.host = host - self.port = check_port(port) - self.max_pool_size = max_pool_size - self.max_timeout_seconds = max_timeout_seconds - self.local_mode = local_mode - self.server = None - self.stop_event = None - self.parent_con = None - self.custom_agents = custom_agents - if ( - agent_class is not None - or len(agent_args) > 0 - or agent_kwargs is not None - ): - logger.warning( - "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" - " in `RpcAgentServerLauncher`", - ) - - def _launch_in_main(self) -> None: - """Launch gRPC server in main-process""" - logger.info( - f"Launching agent server at [{self.host}:{self.port}]...", - ) - asyncio.run( - setup_rpc_agent_server_async( - host=self.host, - port=self.port, - max_pool_size=self.max_pool_size, - max_timeout_seconds=self.max_timeout_seconds, - local_mode=self.local_mode, - custom_agents=self.custom_agents, - ), - ) - - def _launch_in_sub(self) -> None: - """Launch gRPC server in sub-process.""" - self.stop_event = Event() - self.parent_con, child_con = Pipe() - start_event = Event() - server_process = Process( - target=setup_rpc_agent_server, - kwargs={ - "host": self.host, - "port": self.port, - "init_settings": _INIT_SETTINGS, - "start_event": start_event, - "stop_event": self.stop_event, - "pipe": child_con, - "max_pool_size": self.max_pool_size, - "max_timeout_seconds": self.max_timeout_seconds, - "local_mode": self.local_mode, - "custom_agents": self.custom_agents, - }, - ) - server_process.start() - self.port = self.parent_con.recv() - start_event.wait() - self.server = server_process - logger.info( - f"Launch agent server at [{self.host}:{self.port}] success", - ) - - def launch(self, in_subprocess: bool = True) -> None: - """launch a rpc agent server. - - Args: - in_subprocess (bool, optional): launch the server in subprocess. - Defaults to True. For agents that need to obtain command line - input, such as UserAgent, please set this value to False. - """ - if in_subprocess: - self._launch_in_sub() - else: - self._launch_in_main() - - def wait_until_terminate(self) -> None: - """Wait for server process""" - if self.server is not None: - self.server.join() - - def shutdown(self) -> None: - """Shutdown the rpc agent server.""" - if self.server is not None: - if self.stop_event is not None: - self.stop_event.set() - self.stop_event = None - self.server.join() - if self.server.is_alive(): - self.server.kill() - logger.info( - f"Agent server at port [{self.port}] is killed.", - ) - self.server = None - - -class AgentPlatform(RpcAgentServicer): - """A platform for agent to run on (formerly RpcServerSideWrapper)""" - - def __init__( - self, - host: str = "localhost", - port: int = None, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - ): - """Init the AgentPlatform. - - Args: - host (`str`, defaults to "localhost"): - Hostname of the rpc agent server. - port (`int`, defaults to `None`): - Port of the rpc agent server. - max_pool_size (`int`, defaults to `8192`): - The max number of task results that the server can - accommodate. Note that the oldest result will be deleted - after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. Note that expired results will be - deleted. - """ - self.host = host - self.port = port - self.result_pool = ExpiringDict( - max_len=max_pool_size, - max_age_seconds=max_timeout_seconds, - ) - self.executor = futures.ThreadPoolExecutor(max_workers=None) - self.task_id_lock = threading.Lock() - self.agent_id_lock = threading.Lock() - self.task_id_counter = 0 - self.agent_pool: dict[str, AgentBase] = {} - - def get_task_id(self) -> int: - """Get the auto-increment task id.""" - with self.task_id_lock: - self.task_id_counter += 1 - return self.task_id_counter - - def agent_exists(self, agent_id: str) -> bool: - """Check whether the agent exists. - - Args: - agent_id (`str`): the agent id. - - Returns: - bool: whether the agent exists. - """ - return agent_id in self.agent_pool - - def check_and_generate_agent( - self, - agent_id: str, - agent_configs: dict, - ) -> None: - """ - Check whether the agent exists, and create new agent instance - for new agent. - - Args: - agent_id (`str`): the agent id. - agent_configs (`dict`): configuration used to initialize the agent, - with three fields (generated in `_AgentMeta`): - - .. code-block:: python - - { - "class_name": {name of the agent} - "args": {args in tuple type to init the agent} - "kwargs": {args in dict type to init the agent} - } - - """ - with self.agent_id_lock: - if agent_id not in self.agent_pool: - agent_class_name = agent_configs["class_name"] - agent_instance = AgentBase.get_agent_class(agent_class_name)( - *agent_configs["args"], - **agent_configs["kwargs"], - ) - agent_instance._agent_id = agent_id # pylint: disable=W0212 - self.agent_pool[agent_id] = agent_instance - logger.info(f"create agent instance [{agent_id}]") - - def check_and_delete_agent(self, agent_id: str) -> None: - """ - Check whether the agent exists, and delete the agent instance - for the agent_id. - - Args: - agent_id (`str`): the agent id. - """ - with self.agent_id_lock: - if agent_id in self.agent_pool: - self.agent_pool.pop(agent_id) - logger.info(f"delete agent instance [{agent_id}]") - - def call_func( # pylint: disable=W0236 - self, - request: RpcMsg, - context: ServicerContext, - ) -> RpcMsg: - """Call the specific servicer function.""" - if hasattr(self, request.target_func): - if request.target_func not in ["_create_agent", "_get"]: - if not self.agent_exists(request.agent_id): - return context.abort( - grpc.StatusCode.INVALID_ARGUMENT, - f"Agent [{request.agent_id}] not exists.", - ) - return getattr(self, request.target_func)(request) - else: - # TODO: support other user defined method - logger.error(f"Unsupported method {request.target_func}") - return context.abort( - grpc.StatusCode.INVALID_ARGUMENT, - f"Unsupported method {request.target_func}", - ) - - def _reply(self, request: RpcMsg) -> RpcMsg: - """Call function of RpcAgentService - - Args: - request (`RpcMsg`): - Message containing input parameters or input parameter - placeholders. - - Returns: - `RpcMsg`: A serialized Msg instance with attributes name, host, - port and task_id - """ - if request.value: - msg = deserialize(request.value) - else: - msg = None - task_id = self.get_task_id() - self.result_pool[task_id] = threading.Condition() - self.executor.submit( - self.process_messages, - task_id, - request.agent_id, - msg, # type: ignore[arg-type] - ) - return RpcMsg( - value=Msg( - name=self.agent_pool[request.agent_id].name, - content=None, - task_id=task_id, - ).serialize(), - ) - - def _get(self, request: RpcMsg) -> RpcMsg: - """Get function of RpcAgentService - - Args: - request (`RpcMsg`): - Identifier of message, with json format:: - - { - 'task_id': int - } - - Returns: - `RpcMsg`: Concrete values of the specific message (or part of it). - """ - msg = json.loads(request.value) - while True: - result = self.result_pool.get(msg["task_id"]) - if isinstance(result, threading.Condition): - with result: - result.wait(timeout=1) - else: - break - return RpcMsg(value=result.serialize()) - - def _observe(self, request: RpcMsg) -> RpcMsg: - """Observe function of RpcAgentService - - Args: - request (`RpcMsg`): - The serialized input to be observed. - - Returns: - `RpcMsg`: Empty RpcMsg. - """ - msgs = deserialize(request.value) - for msg in msgs: - if isinstance(msg, PlaceholderMessage): - msg.update_value() - self.agent_pool[request.agent_id].observe(msgs) - return RpcMsg() - - def _create_agent(self, request: RpcMsg) -> RpcMsg: - """Create a new agent instance for the agent_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_generate_agent( - request.agent_id, - agent_configs=( - dill.loads(base64.b64decode(request.value)) - if request.value - else None - ), - ) - return RpcMsg() - - def _clone_agent(self, request: RpcMsg) -> RpcMsg: - """Clone a new agent instance from the origin instance. - - Args: - request (RpcMsg): The `agent_id` field is the agent_id of the - agent to be cloned. - - Returns: - `RpcMsg`: The `value` field contains the agent_id of generated - agent. - """ - agent_id = request.agent_id - with self.agent_id_lock: - if agent_id not in self.agent_pool: - raise ValueError(f"Agent [{agent_id}] not exists") - ori_agent = self.agent_pool[agent_id] - new_agent = ori_agent.__class__( - *ori_agent._init_settings["args"], # pylint: disable=W0212 - **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 - ) - with self.agent_id_lock: - self.agent_pool[new_agent.agent_id] = new_agent - return RpcMsg(value=new_agent.agent_id) - - def _delete_agent(self, request: RpcMsg) -> RpcMsg: - """Delete the agent instance of the specific sesssion_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_delete_agent(request.agent_id) - return RpcMsg() - - def process_messages( - self, - task_id: int, - agent_id: str, - task_msg: dict = None, - ) -> None: - """Task processing.""" - if isinstance(task_msg, PlaceholderMessage): - task_msg.update_value() - cond = self.result_pool[task_id] - try: - result = self.agent_pool[agent_id].reply(task_msg) - self.result_pool[task_id] = result - except Exception: - error_msg = traceback.format_exc() - logger.error(f"Error in agent [{agent_id}]:\n{error_msg}") - self.result_pool[task_id] = Msg( - name="ERROR", - role="assistant", - __status="ERROR", - content=f"Error in agent [{agent_id}]:\n{error_msg}", - ) - with cond: - cond.notify_all() diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 30f35fe61..4a1df7b91 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -391,7 +391,7 @@ def serialize(self) -> str: } -def deserialize(s: str) -> Union[MessageBase, Sequence]: +def deserialize(s: Union[str, bytes]) -> Union[MessageBase, Sequence]: """Deserialize json string into MessageBase""" js_msg = json.loads(s) msg_type = js_msg.pop("__type") diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py new file mode 100644 index 000000000..18b2ad790 --- /dev/null +++ b/src/agentscope/server/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +"""Import all server related modules in the package.""" +from .launcher import RpcAgentServerLauncher +from .servicer import AgentPlatform + +__all__ = [ + "RpcAgentServerLauncher", + "AgentPlatform", +] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py new file mode 100644 index 000000000..387552115 --- /dev/null +++ b/src/agentscope/server/launcher.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +""" Server of distributed agent""" +from multiprocessing import Process, Event, Pipe +from multiprocessing.synchronize import Event as EventClass +import socket +import asyncio +from typing import Any, Type, Optional +from concurrent import futures +from loguru import logger + +try: + import grpc +except ImportError: + grpc = None + +from .servicer import AgentPlatform +from ..agents.agent import AgentBase + +try: + from ..rpc.rpc_agent_pb2_grpc import ( + add_RpcAgentServicer_to_server, + ) +except ModuleNotFoundError: + add_RpcAgentServicer_to_server = Any + + +def setup_rpc_agent_server( + host: str, + port: int, + init_settings: dict = None, + start_event: EventClass = None, + stop_event: EventClass = None, + pipe: int = None, + local_mode: bool = True, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + custom_agents: list = None, +) -> None: + """Setup gRPC server rpc agent. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`): + The socket port monitored by grpc server. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. + start_event (`EventClass`, defaults to `None`): + An Event instance used to determine whether the child process + has been started. + stop_event (`EventClass`, defaults to `None`): + The stop Event instance used to determine whether the child + process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. + local_mode (`bool`, defaults to `None`): + Only listen to local requests. + max_pool_size (`int`, defaults to `8192`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. + """ + asyncio.run( + setup_rpc_agent_server_async( + host=host, + port=port, + init_settings=init_settings, + start_event=start_event, + stop_event=stop_event, + pipe=pipe, + local_mode=local_mode, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + custom_agents=custom_agents, + ), + ) + + +async def setup_rpc_agent_server_async( + host: str, + port: int, + init_settings: dict = None, + start_event: EventClass = None, + stop_event: EventClass = None, + pipe: int = None, + local_mode: bool = True, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + custom_agents: list = None, +) -> None: + """Setup gRPC server rpc agent in an async way. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`): + The socket port monitored by grpc server. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. + start_event (`EventClass`, defaults to `None`): + An Event instance used to determine whether the child process + has been started. + stop_event (`EventClass`, defaults to `None`): + The stop Event instance used to determine whether the child + process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. + local_mode (`bool`, defaults to `None`): + Only listen to local requests. + max_pool_size (`int`, defaults to `8192`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. + """ + from agentscope._init import init_process + + if init_settings is not None: + init_process(**init_settings) + servicer = AgentPlatform( + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + ) + # update agent registry + if custom_agents is not None: + for agent_class in custom_agents: + AgentBase.register_agent_class(agent_class=agent_class) + while True: + try: + port = check_port(port) + servicer.port = port + logger.info( + f"Starting rpc server at port [{port}]...", + ) + server = grpc.aio.server( + futures.ThreadPoolExecutor(max_workers=None), + ) + add_RpcAgentServicer_to_server(servicer, server) + if local_mode: + server.add_insecure_port(f"localhost:{port}") + else: + server.add_insecure_port(f"0.0.0.0:{port}") + await server.start() + break + except OSError: + logger.warning( + f"Failed to start rpc server at port [{port}]" + f"try another port", + ) + logger.info( + f"rpc server at port [{port}] started successfully", + ) + if start_event is not None: + pipe.send(port) + start_event.set() + while not stop_event.is_set(): + await asyncio.sleep(1) + logger.info( + f"Stopping rpc server at port [{port}]", + ) + await server.stop(10.0) + else: + await server.wait_for_termination() + logger.info( + f"rpc server at port [{port}] stopped successfully", + ) + + +def find_available_port() -> int: + """Get an unoccupied socket port number.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def check_port(port: Optional[int] = None) -> int: + """Check if the port is available. + + Args: + port (`int`): + the port number being checked. + + Returns: + `int`: the port number that passed the check. If the port is found + to be occupied, an available port number will be automatically + returned. + """ + if port is None: + new_port = find_available_port() + logger.warning( + "gRpc server port is not provided, automatically select " + f"[{new_port}] as the port number.", + ) + return new_port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + new_port = find_available_port() + logger.warning( + f"Port [{port}] is occupied, use [{new_port}] instead", + ) + return new_port + return port + + +class RpcAgentServerLauncher: + """The launcher of AgentPlatform (formerly RpcAgentServer).""" + + def __init__( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + local_mode: bool = False, + custom_agents: list = None, + agent_class: Type[AgentBase] = None, + agent_args: tuple = (), + agent_kwargs: dict = None, + ) -> None: + """Init a rpc agent server launcher. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `8192`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `False`): + Whether the started rpc server only listens to local + requests. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in + `agentscope.agents`. + agent_class (`Type[AgentBase]`, deprecated): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`, deprecated): The args tuple used to + initialize the agent_class. + agent_kwargs (`dict`, deprecated): The args dict used to + initialize the agent_class. + """ + self.host = host + self.port = check_port(port) + self.max_pool_size = max_pool_size + self.max_timeout_seconds = max_timeout_seconds + self.local_mode = local_mode + self.server = None + self.stop_event = None + self.parent_con = None + self.custom_agents = custom_agents + if ( + agent_class is not None + or len(agent_args) > 0 + or agent_kwargs is not None + ): + logger.warning( + "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" + " in `RpcAgentServerLauncher`", + ) + + def _launch_in_main(self) -> None: + """Launch gRPC server in main-process""" + logger.info( + f"Launching agent server at [{self.host}:{self.port}]...", + ) + asyncio.run( + setup_rpc_agent_server_async( + host=self.host, + port=self.port, + max_pool_size=self.max_pool_size, + max_timeout_seconds=self.max_timeout_seconds, + local_mode=self.local_mode, + custom_agents=self.custom_agents, + ), + ) + + def _launch_in_sub(self) -> None: + """Launch gRPC server in sub-process.""" + from agentscope._init import _INIT_SETTINGS + + self.stop_event = Event() + self.parent_con, child_con = Pipe() + start_event = Event() + server_process = Process( + target=setup_rpc_agent_server, + kwargs={ + "host": self.host, + "port": self.port, + "init_settings": _INIT_SETTINGS, + "start_event": start_event, + "stop_event": self.stop_event, + "pipe": child_con, + "max_pool_size": self.max_pool_size, + "max_timeout_seconds": self.max_timeout_seconds, + "local_mode": self.local_mode, + "custom_agents": self.custom_agents, + }, + ) + server_process.start() + self.port = self.parent_con.recv() + start_event.wait() + self.server = server_process + logger.info( + f"Launch agent server at [{self.host}:{self.port}] success", + ) + + def launch(self, in_subprocess: bool = True) -> None: + """launch a rpc agent server. + + Args: + in_subprocess (bool, optional): launch the server in subprocess. + Defaults to True. For agents that need to obtain command line + input, such as UserAgent, please set this value to False. + """ + if in_subprocess: + self._launch_in_sub() + else: + self._launch_in_main() + + def wait_until_terminate(self) -> None: + """Wait for server process""" + if self.server is not None: + self.server.join() + + def shutdown(self) -> None: + """Shutdown the rpc agent server.""" + if self.server is not None: + if self.stop_event is not None: + self.stop_event.set() + self.stop_event = None + self.server.join() + if self.server.is_alive(): + self.server.kill() + logger.info( + f"Agent server at port [{self.port}] is killed.", + ) + self.server = None diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py new file mode 100644 index 000000000..b178720b5 --- /dev/null +++ b/src/agentscope/server/servicer.py @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- +""" Server of distributed agent""" +import threading +import base64 +import json +import traceback +from typing import Any +from concurrent import futures +from loguru import logger + +try: + import dill + import grpc + from grpc import ServicerContext + from expiringdict import ExpiringDict +except ImportError: + dill = None + grpc = None + ServicerContext = Any + ExpiringDict = None + +from ..agents.agent import AgentBase +from ..message import ( + Msg, + PlaceholderMessage, + deserialize, +) + +try: + from ..rpc.rpc_agent_pb2 import RpcMsg # pylint: disable=E0611 + from ..rpc.rpc_agent_pb2_grpc import RpcAgentServicer +except ModuleNotFoundError: + RpcMsg = Any # type: ignore[misc] + RpcAgentServicer = Any + + +class AgentPlatform(RpcAgentServicer): + """A platform for agent to run on (formerly RpcServerSideWrapper)""" + + def __init__( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + ): + """Init the AgentPlatform. + + Args: + host (`str`, defaults to "localhost"): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `8192`): + The max number of task results that the server can + accommodate. Note that the oldest result will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. Note that expired results will be + deleted. + """ + self.host = host + self.port = port + self.result_pool = ExpiringDict( + max_len=max_pool_size, + max_age_seconds=max_timeout_seconds, + ) + self.executor = futures.ThreadPoolExecutor(max_workers=None) + self.task_id_lock = threading.Lock() + self.agent_id_lock = threading.Lock() + self.task_id_counter = 0 + self.agent_pool: dict[str, AgentBase] = {} + + def get_task_id(self) -> int: + """Get the auto-increment task id.""" + with self.task_id_lock: + self.task_id_counter += 1 + return self.task_id_counter + + def agent_exists(self, agent_id: str) -> bool: + """Check whether the agent exists. + + Args: + agent_id (`str`): the agent id. + + Returns: + bool: whether the agent exists. + """ + return agent_id in self.agent_pool + + def check_and_generate_agent( + self, + agent_id: str, + agent_configs: dict, + ) -> None: + """ + Check whether the agent exists, and create new agent instance + for new agent. + + Args: + agent_id (`str`): the agent id. + agent_configs (`dict`): configuration used to initialize the agent, + with three fields (generated in `_AgentMeta`): + + .. code-block:: python + + { + "class_name": {name of the agent} + "args": {args in tuple type to init the agent} + "kwargs": {args in dict type to init the agent} + } + + """ + with self.agent_id_lock: + if agent_id not in self.agent_pool: + agent_class_name = agent_configs["class_name"] + agent_instance = AgentBase.get_agent_class(agent_class_name)( + *agent_configs["args"], + **agent_configs["kwargs"], + ) + agent_instance._agent_id = agent_id # pylint: disable=W0212 + self.agent_pool[agent_id] = agent_instance + logger.info(f"create agent instance [{agent_id}]") + + def check_and_delete_agent(self, agent_id: str) -> None: + """ + Check whether the agent exists, and delete the agent instance + for the agent_id. + + Args: + agent_id (`str`): the agent id. + """ + with self.agent_id_lock: + if agent_id in self.agent_pool: + self.agent_pool.pop(agent_id) + logger.info(f"delete agent instance [{agent_id}]") + + def call_func( # pylint: disable=W0236 + self, + request: RpcMsg, + context: ServicerContext, + ) -> RpcMsg: + """Call the specific servicer function.""" + if hasattr(self, request.target_func): + if request.target_func not in ["_create_agent", "_get"]: + if not self.agent_exists(request.agent_id): + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Agent [{request.agent_id}] not exists.", + ) + return getattr(self, request.target_func)(request) + else: + # TODO: support other user defined method + logger.error(f"Unsupported method {request.target_func}") + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Unsupported method {request.target_func}", + ) + + def _reply(self, request: RpcMsg) -> RpcMsg: + """Call function of RpcAgentService + + Args: + request (`RpcMsg`): + Message containing input parameters or input parameter + placeholders. + + Returns: + `RpcMsg`: A serialized Msg instance with attributes name, host, + port and task_id + """ + if request.value: + msg = deserialize(request.value) + else: + msg = None + task_id = self.get_task_id() + self.result_pool[task_id] = threading.Condition() + self.executor.submit( + self.process_messages, + task_id, + request.agent_id, + msg, # type: ignore[arg-type] + ) + return RpcMsg( + value=Msg( # type: ignore[arg-type] + name=self.agent_pool[request.agent_id].name, + content=None, + task_id=task_id, + ).serialize(), + ) + + def _get(self, request: RpcMsg) -> RpcMsg: + """Get function of RpcAgentService + + Args: + request (`RpcMsg`): + Identifier of message, with json format:: + + { + 'task_id': int + } + + Returns: + `RpcMsg`: Concrete values of the specific message (or part of it). + """ + msg = json.loads(request.value) + while True: + result = self.result_pool.get(msg["task_id"]) + if isinstance(result, threading.Condition): + with result: + result.wait(timeout=1) + else: + break + return RpcMsg(value=result.serialize()) + + def _observe(self, request: RpcMsg) -> RpcMsg: + """Observe function of RpcAgentService + + Args: + request (`RpcMsg`): + The serialized input to be observed. + + Returns: + `RpcMsg`: Empty RpcMsg. + """ + msgs = deserialize(request.value) + for msg in msgs: + if isinstance(msg, PlaceholderMessage): + msg.update_value() + self.agent_pool[request.agent_id].observe(msgs) + return RpcMsg() + + def _create_agent(self, request: RpcMsg) -> RpcMsg: + """Create a new agent instance for the agent_id. + + Args: + request (RpcMsg): request message with a `agent_id` field. + """ + self.check_and_generate_agent( + request.agent_id, + agent_configs=( + dill.loads(base64.b64decode(request.value)) + if request.value + else None + ), + ) + return RpcMsg() + + def _clone_agent(self, request: RpcMsg) -> RpcMsg: + """Clone a new agent instance from the origin instance. + + Args: + request (RpcMsg): The `agent_id` field is the agent_id of the + agent to be cloned. + + Returns: + `RpcMsg`: The `value` field contains the agent_id of generated + agent. + """ + agent_id = request.agent_id + with self.agent_id_lock: + if agent_id not in self.agent_pool: + raise ValueError(f"Agent [{agent_id}] not exists") + ori_agent = self.agent_pool[agent_id] + new_agent = ori_agent.__class__( + *ori_agent._init_settings["args"], # pylint: disable=W0212 + **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 + ) + with self.agent_id_lock: + self.agent_pool[new_agent.agent_id] = new_agent + return RpcMsg(value=new_agent.agent_id) # type: ignore[arg-type] + + def _delete_agent(self, request: RpcMsg) -> RpcMsg: + """Delete the agent instance of the specific sesssion_id. + + Args: + request (RpcMsg): request message with a `agent_id` field. + """ + self.check_and_delete_agent(request.agent_id) + return RpcMsg() + + def process_messages( + self, + task_id: int, + agent_id: str, + task_msg: dict = None, + ) -> None: + """Task processing.""" + if isinstance(task_msg, PlaceholderMessage): + task_msg.update_value() + cond = self.result_pool[task_id] + try: + result = self.agent_pool[agent_id].reply(task_msg) + self.result_pool[task_id] = result + except Exception: + error_msg = traceback.format_exc() + logger.error(f"Error in agent [{agent_id}]:\n{error_msg}") + self.result_pool[task_id] = Msg( + name="ERROR", + role="assistant", + __status="ERROR", + content=f"Error in agent [{agent_id}]:\n{error_msg}", + ) + with cond: + cond.notify_all() diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index d010587cd..d70613268 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -9,7 +9,7 @@ import agentscope from agentscope.agents import AgentBase, DistConf -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize From dbf5b18a7cf32c0b4d97ca8f8263d838124a0559 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 14:08:12 +0800 Subject: [PATCH 02/13] rename RpcAgentServerLauncher to AgentServerLauncher --- .../en/source/tutorial/208-distribute.md | 4 ++-- .../zh_CN/source/tutorial/208-distribute.md | 4 ++-- .../distributed_basic/distributed_dialog.py | 4 ++-- .../distributed_debate/distributed_debate.py | 4 ++-- examples/distributed_simulation/main.py | 4 ++-- src/agentscope/agents/agent.py | 4 ++-- src/agentscope/agents/rpc_agent.py | 4 ++-- src/agentscope/server/__init__.py | 4 ++-- src/agentscope/server/launcher.py | 20 +++++++++---------- tests/rpc_agent_test.py | 10 +++++----- 10 files changed, 31 insertions(+), 31 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 714f2e05f..5574276c4 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -69,7 +69,7 @@ agentscope.init( ... ) # Create an agent service process -server = RpcAgentServerLauncher( +server = AgentServerLauncher( host="ip_a", port=12001, # choose an available port ) @@ -88,7 +88,7 @@ agentscope.init( ... ) # Create an agent service process -server = RpcAgentServerLauncher( +server = AgentServerLauncher( host="ip_b", port=12002, # choose an available port ) diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index ef50f123f..3506e6641 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -68,7 +68,7 @@ agentscope.init( ... ) # Create an agent service process -server = RpcAgentServerLauncher( +server = AgentServerLauncher( host="ip_a", port=12001, # choose an available port ) @@ -87,7 +87,7 @@ agentscope.init( ... ) # Create an agent service process -server = RpcAgentServerLauncher( +server = AgentServerLauncher( host="ip_b", port=12002, # choose an available port ) diff --git a/examples/distributed_basic/distributed_dialog.py b/examples/distributed_basic/distributed_dialog.py index ab0de4235..c101fd2f6 100644 --- a/examples/distributed_basic/distributed_dialog.py +++ b/examples/distributed_basic/distributed_dialog.py @@ -7,7 +7,7 @@ import agentscope from agentscope.agents.user_agent import UserAgent from agentscope.agents.dialog_agent import DialogAgent -from agentscope.server import RpcAgentServerLauncher +from agentscope.server import AgentServerLauncher def parse_args() -> argparse.Namespace: @@ -36,7 +36,7 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: agentscope.init( model_configs="configs/model_configs.json", ) - assistant_server_launcher = RpcAgentServerLauncher( + assistant_server_launcher = AgentServerLauncher( host=assistant_host, port=assistant_port, ) diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index a4e0a4287..7fc84024b 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -8,7 +8,7 @@ import agentscope from agentscope.agents import DialogAgent from agentscope.msghub import msghub -from agentscope.server import RpcAgentServerLauncher +from agentscope.server import AgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger @@ -75,7 +75,7 @@ def setup_server(parsed_args: argparse.Namespace) -> None: ) host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") - server_launcher = RpcAgentServerLauncher( + server_launcher = AgentServerLauncher( host=host, port=port, custom_agents=[UserProxyAgent, DialogAgent], diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index bb26fe533..130527d78 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -11,7 +11,7 @@ import agentscope from agentscope.agents import AgentBase -from agentscope.server import RpcAgentServerLauncher +from agentscope.server import AgentServerLauncher from agentscope.message import Msg @@ -58,7 +58,7 @@ def setup_participant_agent_server(host: str, port: int) -> None: model_configs="configs/model_configs.json", use_monitor=False, ) - assistant_server_launcher = RpcAgentServerLauncher( + assistant_server_launcher = AgentServerLauncher( host=host, port=port, max_pool_size=16384, diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index bdf657df2..91f45527a 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -24,7 +24,7 @@ class _AgentMeta(ABCMeta): """ def __init__(cls, name: Any, bases: Any, attrs: Any) -> None: - if not hasattr(cls, "registry"): + if not hasattr(cls, "_registry"): cls._registry = {} else: if name in cls._registry: @@ -245,7 +245,7 @@ def register_agent_class(cls, agent_class: Type[AgentBase]) -> None: """ agent_class_name = agent_class.__name__ if agent_class_name in cls._registry: - logger.warning( + logger.info( f"Agent class with name [{agent_class_name}] already exists.", ) else: diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index e4274f700..3332b00d2 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -9,7 +9,7 @@ serialize, ) from agentscope.rpc import RpcAgentClient -from agentscope.server.launcher import RpcAgentServerLauncher +from agentscope.server.launcher import AgentServerLauncher def rpc_servicer_method( # type: ignore[no-untyped-def] @@ -89,7 +89,7 @@ def __init__( launch_server = port is None if launch_server: self.host = "localhost" - self.server_launcher = RpcAgentServerLauncher( + self.server_launcher = AgentServerLauncher( host=self.host, port=port, max_pool_size=max_pool_size, diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py index 18b2ad790..0ce9e8d3d 100644 --- a/src/agentscope/server/__init__.py +++ b/src/agentscope/server/__init__.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- """Import all server related modules in the package.""" -from .launcher import RpcAgentServerLauncher +from .launcher import AgentServerLauncher from .servicer import AgentPlatform __all__ = [ - "RpcAgentServerLauncher", + "AgentServerLauncher", "AgentPlatform", ] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 387552115..78afb4f0f 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -24,7 +24,7 @@ add_RpcAgentServicer_to_server = Any -def setup_rpc_agent_server( +def setup_agent_server( host: str, port: int, init_settings: dict = None, @@ -36,7 +36,7 @@ def setup_rpc_agent_server( max_timeout_seconds: int = 1800, custom_agents: list = None, ) -> None: - """Setup gRPC server rpc agent. + """Setup agent server. Args: host (`str`, defaults to `"localhost"`): @@ -63,7 +63,7 @@ def setup_rpc_agent_server( A list of custom agent classes that are not in `agentscope.agents`. """ asyncio.run( - setup_rpc_agent_server_async( + setup_agent_server_async( host=host, port=port, init_settings=init_settings, @@ -78,7 +78,7 @@ def setup_rpc_agent_server( ) -async def setup_rpc_agent_server_async( +async def setup_agent_server_async( host: str, port: int, init_settings: dict = None, @@ -90,7 +90,7 @@ async def setup_rpc_agent_server_async( max_timeout_seconds: int = 1800, custom_agents: list = None, ) -> None: - """Setup gRPC server rpc agent in an async way. + """Setup agent server in an async way. Args: host (`str`, defaults to `"localhost"`): @@ -207,7 +207,7 @@ def check_port(port: Optional[int] = None) -> int: return port -class RpcAgentServerLauncher: +class AgentServerLauncher: """The launcher of AgentPlatform (formerly RpcAgentServer).""" def __init__( @@ -222,7 +222,7 @@ def __init__( agent_args: tuple = (), agent_kwargs: dict = None, ) -> None: - """Init a rpc agent server launcher. + """Init a launcher of agent server. Args: host (`str`, defaults to `"localhost"`): @@ -262,7 +262,7 @@ def __init__( ): logger.warning( "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" - " in `RpcAgentServerLauncher`", + " in `AgentServerLauncher`", ) def _launch_in_main(self) -> None: @@ -271,7 +271,7 @@ def _launch_in_main(self) -> None: f"Launching agent server at [{self.host}:{self.port}]...", ) asyncio.run( - setup_rpc_agent_server_async( + setup_agent_server_async( host=self.host, port=self.port, max_pool_size=self.max_pool_size, @@ -289,7 +289,7 @@ def _launch_in_sub(self) -> None: self.parent_con, child_con = Pipe() start_event = Event() server_process = Process( - target=setup_rpc_agent_server, + target=setup_agent_server, kwargs={ "host": self.host, "port": self.port, diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index d70613268..bc861824f 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -9,7 +9,7 @@ import agentscope from agentscope.agents import AgentBase, DistConf -from agentscope.server import RpcAgentServerLauncher +from agentscope.server import AgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize @@ -220,7 +220,7 @@ def test_single_rpc_agent_server(self) -> None: def test_connect_to_an_existing_rpc_server(self) -> None: """test connecting to an existing server""" - launcher = RpcAgentServerLauncher( + launcher = AgentServerLauncher( # choose port automatically host="127.0.0.1", port=12010, @@ -420,7 +420,7 @@ def test_standalone_multiprocess_init(self) -> None: def test_multi_agent_in_same_server(self) -> None: """test agent server with multi agent""" - launcher = RpcAgentServerLauncher( + launcher = AgentServerLauncher( host="127.0.0.1", port=12010, local_mode=False, @@ -548,14 +548,14 @@ def test_error_handling(self) -> None: def test_agent_nesting(self) -> None: """Test agent nesting""" host = "localhost" - launcher1 = RpcAgentServerLauncher( + launcher1 = AgentServerLauncher( # choose port automatically host=host, port=12010, local_mode=False, custom_agents=[DemoGatherAgent, DemoGeneratorAgent], ) - launcher2 = RpcAgentServerLauncher( + launcher2 = AgentServerLauncher( # choose port automatically host=host, port=12011, From 7997f7a83641f2b93851866a916e998bf73d82c7 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 15:04:13 +0800 Subject: [PATCH 03/13] add as_server into setup and support graceful shutdown --- setup.py | 1 + src/agentscope/server/launcher.py | 73 ++++++++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 7dca2181b..955bb47d2 100644 --- a/setup.py +++ b/setup.py @@ -121,6 +121,7 @@ "console_scripts": [ "as_studio=agentscope.web.studio.studio:run_app", "as_workflow=agentscope.web.workstation.workflow:main", + "as_server=agentscope.server.launcher:launch", ], }, ) diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 78afb4f0f..6328fe865 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -4,6 +4,8 @@ from multiprocessing.synchronize import Event as EventClass import socket import asyncio +import signal +import argparse from typing import Any, Type, Optional from concurrent import futures from loguru import logger @@ -130,13 +132,24 @@ async def setup_agent_server_async( if custom_agents is not None: for agent_class in custom_agents: AgentBase.register_agent_class(agent_class=agent_class) + + async def shutdown_signal_handler() -> None: + logger.info( + f"Received shutdown signal. Gracefully stopping the server at " + f"[{host}:{port}].", + ) + await server.stop(grace=5) + + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown_signal_handler()), + ) while True: try: port = check_port(port) servicer.port = port - logger.info( - f"Starting rpc server at port [{port}]...", - ) server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=None), ) @@ -149,11 +162,11 @@ async def setup_agent_server_async( break except OSError: logger.warning( - f"Failed to start rpc server at port [{port}]" + f"Failed to start agent server at port [{port}]" f"try another port", ) logger.info( - f"rpc server at port [{port}] started successfully", + f"agent server at [{host}:{port}] started successfully", ) if start_event is not None: pipe.send(port) @@ -161,13 +174,13 @@ async def setup_agent_server_async( while not stop_event.is_set(): await asyncio.sleep(1) logger.info( - f"Stopping rpc server at port [{port}]", + f"Stopping agent server at [{host}:{port}]", ) await server.stop(10.0) else: await server.wait_for_termination() logger.info( - f"rpc server at port [{port}] stopped successfully", + f"agent server at [{host}:{port}] stopped successfully", ) @@ -342,3 +355,49 @@ def shutdown(self) -> None: f"Agent server at port [{self.port}] is killed.", ) self.server = None + + +def launch() -> None: + """Launch an agent server""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + type=str, + default="localhost", + help="hostname of the server", + ) + parser.add_argument( + "--port", + type=int, + default=12310, + help="socket port of the server", + ) + parser.add_argument( + "--max_pool_size", + type=int, + default=8192, + help="max number of task results that the server can accommodate", + ) + parser.add_argument( + "--max_timeout_seconds", + type=int, + default=1800, + help="max timeout for task results", + ) + parser.add_argument( + "--local_mode", + type=bool, + default=False, + help="whether the started rpc server only listens to local requests", + ) + args = parser.parse_args() + launcher = AgentServerLauncher( + host=args.host, + port=args.port, + max_pool_size=args.max_pool_size, + max_timeout_seconds=args.max_timeout_seconds, + local_mode=args.local_mode, + ) + launcher.launch(in_subprocess=False) + launcher.wait_until_terminate() From a92eac2959baff923d3df7fc36e9c6f800fe7766 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 15:37:57 +0800 Subject: [PATCH 04/13] add server_id --- src/agentscope/server/launcher.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 6328fe865..f2e2be15b 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -17,6 +17,7 @@ from .servicer import AgentPlatform from ..agents.agent import AgentBase +from ..utils.tools import _get_timestamp try: from ..rpc.rpc_agent_pb2_grpc import ( @@ -29,6 +30,7 @@ def setup_agent_server( host: str, port: int, + server_id: str, init_settings: dict = None, start_event: EventClass = None, stop_event: EventClass = None, @@ -45,6 +47,8 @@ def setup_agent_server( Hostname of the rpc agent server. port (`int`): The socket port monitored by grpc server. + server_id (`str`): + The id of the server. init_settings (`dict`, defaults to `None`): Init settings for agentscope.init. start_event (`EventClass`, defaults to `None`): @@ -68,6 +72,7 @@ def setup_agent_server( setup_agent_server_async( host=host, port=port, + server_id=server_id, init_settings=init_settings, start_event=start_event, stop_event=stop_event, @@ -83,6 +88,7 @@ def setup_agent_server( async def setup_agent_server_async( host: str, port: int, + server_id: str, init_settings: dict = None, start_event: EventClass = None, stop_event: EventClass = None, @@ -99,6 +105,8 @@ async def setup_agent_server_async( Hostname of the rpc agent server. port (`int`): The socket port monitored by grpc server. + server_id (`str`): + The id of the server. init_settings (`dict`, defaults to `None`): Init settings for agentscope.init. start_event (`EventClass`, defaults to `None`): @@ -166,7 +174,7 @@ async def shutdown_signal_handler() -> None: f"try another port", ) logger.info( - f"agent server at [{host}:{port}] started successfully", + f"agent server [{server_id}] at {host}:{port} started successfully", ) if start_event is not None: pipe.send(port) @@ -180,7 +188,7 @@ async def shutdown_signal_handler() -> None: else: await server.wait_for_termination() logger.info( - f"agent server at [{host}:{port}] stopped successfully", + f"agent server [{server_id}] at {host}:{port} stopped successfully", ) @@ -231,6 +239,7 @@ def __init__( max_timeout_seconds: int = 1800, local_mode: bool = False, custom_agents: list = None, + server_id: str = None, agent_class: Type[AgentBase] = None, agent_args: tuple = (), agent_kwargs: dict = None, @@ -252,6 +261,9 @@ def __init__( custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in `agentscope.agents`. + server_id (`str`, defaults to `None`): + The id of the agent server. If not specified, a random id + will be generated. agent_class (`Type[AgentBase]`, deprecated): The AgentBase subclass encapsulated by this wrapper. agent_args (`tuple`, deprecated): The args tuple used to @@ -268,6 +280,9 @@ def __init__( self.stop_event = None self.parent_con = None self.custom_agents = custom_agents + self.server_id = ( + self.generate_server_id() if server_id is None else server_id + ) if ( agent_class is not None or len(agent_args) > 0 @@ -278,6 +293,10 @@ def __init__( " in `AgentServerLauncher`", ) + def generate_server_id(self) -> str: + """Generate server id""" + return f"{self.host}:{self.port}-{_get_timestamp('%y%m%d-%H:%M:%S')}" + def _launch_in_main(self) -> None: """Launch gRPC server in main-process""" logger.info( @@ -287,6 +306,7 @@ def _launch_in_main(self) -> None: setup_agent_server_async( host=self.host, port=self.port, + server_id=self.server_id, max_pool_size=self.max_pool_size, max_timeout_seconds=self.max_timeout_seconds, local_mode=self.local_mode, @@ -306,6 +326,7 @@ def _launch_in_sub(self) -> None: kwargs={ "host": self.host, "port": self.port, + "server_id": self.server_id, "init_settings": _INIT_SETTINGS, "start_event": start_event, "stop_event": self.stop_event, From 9163241265c42ef4eec513f4f28e96169f032d20 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 16:17:12 +0800 Subject: [PATCH 05/13] update tutorial for agent server --- docs/sphinx_doc/en/source/index.rst | 1 + .../en/source/tutorial/208-distribute.md | 15 ++++++ .../zh_CN/source/tutorial/208-distribute.md | 15 ++++++ setup.py | 2 +- src/agentscope/server/__init__.py | 3 +- src/agentscope/server/launcher.py | 50 +++++++++++++------ 6 files changed, 69 insertions(+), 17 deletions(-) diff --git a/docs/sphinx_doc/en/source/index.rst b/docs/sphinx_doc/en/source/index.rst index fb81e2e64..1aad67356 100644 --- a/docs/sphinx_doc/en/source/index.rst +++ b/docs/sphinx_doc/en/source/index.rst @@ -38,6 +38,7 @@ AgentScope Documentation agentscope.pipelines agentscope.service agentscope.rpc + agentscope.server agentscope.web agentscope.prompt agentscope.utils diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 5574276c4..24fbc3acf 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -79,6 +79,12 @@ server.launch() server.wait_until_terminate() ``` +> For similarity, you can run the following command in your terminal rather than the above code: +> +> ```shell +> as_server --host ip_a --port 12001 +> ``` + And run the following code on `Machine2`: ```python @@ -98,6 +104,12 @@ server.launch() server.wait_until_terminate() ``` +> Similarly, you can run the following command in your terminal to setup the agent server: +> +> ```shell +> as_server --host ip_b --port 12002 +> ``` + Then, you can connect to the agent servers from the main process with the following code. ```python @@ -254,6 +266,9 @@ About more detailed technical implementation solutions, please refer to our [pap In agentscope, the agent server provides a running platform for various types of agents. Multiple agents can run in the same agent server and hold independent memory and other local states but they will share the same computation resources. + +After installing the distributed version of AgentScope, you can use the `as_server` command to start the agent server, and the detailed startup arguments can be found in the documentation of the {func}`as_server` function. + As long as the code is not modified, an agent server can provide services for multiple main processes. This means that when running mutliple applications, you only need to start the agent server for the first time, and it can be reused subsequently. diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 3506e6641..fe840aa00 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -78,6 +78,12 @@ server.launch() server.wait_until_terminate() ``` +> 为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: +> +> ```shell +> as_server --host ip_a --port 12001 +> ``` + 之后在 `Machine2` 上运行如下代码: ```python @@ -97,6 +103,12 @@ server.launch() server.wait_until_terminate() ``` +> 这里也同样可以用如下指令来代替上面的代码。 +> +> ```shell +> as_server --host ip_b --port 12002 +> ``` + 接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 ```python @@ -251,6 +263,9 @@ Placeholder 内部包含了该消息产生方的联络方法,可以通过网 #### Agent Server Agent Server 也就是智能体服务器。在 AgentScope 中,Agent Server 提供了一个让不同 Agent 实例运行的平台。多个不同类型的 Agent 可以运行在同一个 Agent Server 中并保持独立的记忆以及其他本地状态信息,但是他们将共享同一份计算资源。 + +在安装 AgentScope 的分布式版本后就可以通过 `as_server` 命令来启动 Agent Server,具体的启动参数在 {func}`as_server` 函数文档中可以找到。 + 只要没有对代码进行修改,一个已经启动的 Agent Server 可以为多个主流程提供服务。 这意味着在运行多个应用时,只需要在第一次运行前启动 Agent Server,后续这些 Agent Server 进程就可以持续复用。 diff --git a/setup.py b/setup.py index 955bb47d2..60e9c9ddd 100644 --- a/setup.py +++ b/setup.py @@ -121,7 +121,7 @@ "console_scripts": [ "as_studio=agentscope.web.studio.studio:run_app", "as_workflow=agentscope.web.workstation.workflow:main", - "as_server=agentscope.server.launcher:launch", + "as_server=agentscope.server.launcher:as_server", ], }, ) diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py index 0ce9e8d3d..9dbfe3faa 100644 --- a/src/agentscope/server/__init__.py +++ b/src/agentscope/server/__init__.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- """Import all server related modules in the package.""" -from .launcher import AgentServerLauncher +from .launcher import AgentServerLauncher, as_server from .servicer import AgentPlatform __all__ = [ "AgentServerLauncher", "AgentPlatform", + "as_server", ] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index f2e2be15b..39d30bf92 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -44,9 +44,9 @@ def setup_agent_server( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`): - The socket port monitored by grpc server. + The socket port monitored by the agent server. server_id (`str`): The id of the server. init_settings (`dict`, defaults to `None`): @@ -102,9 +102,9 @@ async def setup_agent_server_async( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`): - The socket port monitored by grpc server. + The socket port monitored by the agent server. server_id (`str`): The id of the server. init_settings (`dict`, defaults to `None`): @@ -214,7 +214,7 @@ def check_port(port: Optional[int] = None) -> int: if port is None: new_port = find_available_port() logger.warning( - "gRpc server port is not provided, automatically select " + "agent server port is not provided, automatically select " f"[{new_port}] as the port number.", ) return new_port @@ -248,15 +248,15 @@ def __init__( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`, defaults to `None`): - Port of the rpc agent server. + Socket port of the agent server. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): Timeout for task results. local_mode (`bool`, defaults to `False`): - Whether the started rpc server only listens to local + Whether the started server only listens to local requests. custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in @@ -298,7 +298,7 @@ def generate_server_id(self) -> str: return f"{self.host}:{self.port}-{_get_timestamp('%y%m%d-%H:%M:%S')}" def _launch_in_main(self) -> None: - """Launch gRPC server in main-process""" + """Launch agent server in main-process""" logger.info( f"Launching agent server at [{self.host}:{self.port}]...", ) @@ -315,7 +315,7 @@ def _launch_in_main(self) -> None: ) def _launch_in_sub(self) -> None: - """Launch gRPC server in sub-process.""" + """Launch an agent server in sub-process.""" from agentscope._init import _INIT_SETTINGS self.stop_event = Event() @@ -346,7 +346,7 @@ def _launch_in_sub(self) -> None: ) def launch(self, in_subprocess: bool = True) -> None: - """launch a rpc agent server. + """launch an agent server. Args: in_subprocess (bool, optional): launch the server in subprocess. @@ -364,7 +364,7 @@ def wait_until_terminate(self) -> None: self.server.join() def shutdown(self) -> None: - """Shutdown the rpc agent server.""" + """Shutdown the agent server.""" if self.server is not None: if self.stop_event is not None: self.stop_event.set() @@ -378,8 +378,28 @@ def shutdown(self) -> None: self.server = None -def launch() -> None: - """Launch an agent server""" +def as_server() -> None: + """Launch an agent server with terminal command. + + Note: + + The arguments of `as_server` are listed as follows: + + * `\-\-host`: the hostname of the server. + * `\-\-port`: the socket port of the server. + * `\-\-max_pool_size`: max number of task results that the server can + accommodate. + * `\-\-max_timeout_seconds`: max timeout seconds of a task. + * `\-\-local_mode`: whether the started agent server only listens to + local requests. + + In most cases, you only need to specify the `\-\-host` and `\-\-port`. + + .. code-block:: shell + + as_server --host localhost --port 12345 + + """ parser = argparse.ArgumentParser() parser.add_argument( @@ -410,7 +430,7 @@ def launch() -> None: "--local_mode", type=bool, default=False, - help="whether the started rpc server only listens to local requests", + help="whether the started agent server only listens to local requests", ) args = parser.parse_args() launcher = AgentServerLauncher( From 31228b64b3eff70bb9e08f948fab2cf3bb4f3eb0 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 16:17:12 +0800 Subject: [PATCH 06/13] update tutorial for agent server --- docs/sphinx_doc/en/source/index.rst | 1 + .../en/source/tutorial/208-distribute.md | 15 ++++++ .../zh_CN/source/tutorial/208-distribute.md | 15 ++++++ setup.py | 2 +- src/agentscope/server/__init__.py | 3 +- src/agentscope/server/launcher.py | 50 +++++++++++++------ 6 files changed, 69 insertions(+), 17 deletions(-) diff --git a/docs/sphinx_doc/en/source/index.rst b/docs/sphinx_doc/en/source/index.rst index fb81e2e64..1aad67356 100644 --- a/docs/sphinx_doc/en/source/index.rst +++ b/docs/sphinx_doc/en/source/index.rst @@ -38,6 +38,7 @@ AgentScope Documentation agentscope.pipelines agentscope.service agentscope.rpc + agentscope.server agentscope.web agentscope.prompt agentscope.utils diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 5574276c4..24fbc3acf 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -79,6 +79,12 @@ server.launch() server.wait_until_terminate() ``` +> For similarity, you can run the following command in your terminal rather than the above code: +> +> ```shell +> as_server --host ip_a --port 12001 +> ``` + And run the following code on `Machine2`: ```python @@ -98,6 +104,12 @@ server.launch() server.wait_until_terminate() ``` +> Similarly, you can run the following command in your terminal to setup the agent server: +> +> ```shell +> as_server --host ip_b --port 12002 +> ``` + Then, you can connect to the agent servers from the main process with the following code. ```python @@ -254,6 +266,9 @@ About more detailed technical implementation solutions, please refer to our [pap In agentscope, the agent server provides a running platform for various types of agents. Multiple agents can run in the same agent server and hold independent memory and other local states but they will share the same computation resources. + +After installing the distributed version of AgentScope, you can use the `as_server` command to start the agent server, and the detailed startup arguments can be found in the documentation of the {func}`as_server` function. + As long as the code is not modified, an agent server can provide services for multiple main processes. This means that when running mutliple applications, you only need to start the agent server for the first time, and it can be reused subsequently. diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 3506e6641..fe840aa00 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -78,6 +78,12 @@ server.launch() server.wait_until_terminate() ``` +> 为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: +> +> ```shell +> as_server --host ip_a --port 12001 +> ``` + 之后在 `Machine2` 上运行如下代码: ```python @@ -97,6 +103,12 @@ server.launch() server.wait_until_terminate() ``` +> 这里也同样可以用如下指令来代替上面的代码。 +> +> ```shell +> as_server --host ip_b --port 12002 +> ``` + 接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 ```python @@ -251,6 +263,9 @@ Placeholder 内部包含了该消息产生方的联络方法,可以通过网 #### Agent Server Agent Server 也就是智能体服务器。在 AgentScope 中,Agent Server 提供了一个让不同 Agent 实例运行的平台。多个不同类型的 Agent 可以运行在同一个 Agent Server 中并保持独立的记忆以及其他本地状态信息,但是他们将共享同一份计算资源。 + +在安装 AgentScope 的分布式版本后就可以通过 `as_server` 命令来启动 Agent Server,具体的启动参数在 {func}`as_server` 函数文档中可以找到。 + 只要没有对代码进行修改,一个已经启动的 Agent Server 可以为多个主流程提供服务。 这意味着在运行多个应用时,只需要在第一次运行前启动 Agent Server,后续这些 Agent Server 进程就可以持续复用。 diff --git a/setup.py b/setup.py index 955bb47d2..60e9c9ddd 100644 --- a/setup.py +++ b/setup.py @@ -121,7 +121,7 @@ "console_scripts": [ "as_studio=agentscope.web.studio.studio:run_app", "as_workflow=agentscope.web.workstation.workflow:main", - "as_server=agentscope.server.launcher:launch", + "as_server=agentscope.server.launcher:as_server", ], }, ) diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py index 0ce9e8d3d..9dbfe3faa 100644 --- a/src/agentscope/server/__init__.py +++ b/src/agentscope/server/__init__.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- """Import all server related modules in the package.""" -from .launcher import AgentServerLauncher +from .launcher import AgentServerLauncher, as_server from .servicer import AgentPlatform __all__ = [ "AgentServerLauncher", "AgentPlatform", + "as_server", ] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index f2e2be15b..648c4b2a5 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -44,9 +44,9 @@ def setup_agent_server( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`): - The socket port monitored by grpc server. + The socket port monitored by the agent server. server_id (`str`): The id of the server. init_settings (`dict`, defaults to `None`): @@ -102,9 +102,9 @@ async def setup_agent_server_async( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`): - The socket port monitored by grpc server. + The socket port monitored by the agent server. server_id (`str`): The id of the server. init_settings (`dict`, defaults to `None`): @@ -214,7 +214,7 @@ def check_port(port: Optional[int] = None) -> int: if port is None: new_port = find_available_port() logger.warning( - "gRpc server port is not provided, automatically select " + "agent server port is not provided, automatically select " f"[{new_port}] as the port number.", ) return new_port @@ -248,15 +248,15 @@ def __init__( Args: host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. + Hostname of the agent server. port (`int`, defaults to `None`): - Port of the rpc agent server. + Socket port of the agent server. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): Timeout for task results. local_mode (`bool`, defaults to `False`): - Whether the started rpc server only listens to local + Whether the started server only listens to local requests. custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in @@ -298,7 +298,7 @@ def generate_server_id(self) -> str: return f"{self.host}:{self.port}-{_get_timestamp('%y%m%d-%H:%M:%S')}" def _launch_in_main(self) -> None: - """Launch gRPC server in main-process""" + """Launch agent server in main-process""" logger.info( f"Launching agent server at [{self.host}:{self.port}]...", ) @@ -315,7 +315,7 @@ def _launch_in_main(self) -> None: ) def _launch_in_sub(self) -> None: - """Launch gRPC server in sub-process.""" + """Launch an agent server in sub-process.""" from agentscope._init import _INIT_SETTINGS self.stop_event = Event() @@ -346,7 +346,7 @@ def _launch_in_sub(self) -> None: ) def launch(self, in_subprocess: bool = True) -> None: - """launch a rpc agent server. + """launch an agent server. Args: in_subprocess (bool, optional): launch the server in subprocess. @@ -364,7 +364,7 @@ def wait_until_terminate(self) -> None: self.server.join() def shutdown(self) -> None: - """Shutdown the rpc agent server.""" + """Shutdown the agent server.""" if self.server is not None: if self.stop_event is not None: self.stop_event.set() @@ -378,8 +378,28 @@ def shutdown(self) -> None: self.server = None -def launch() -> None: - """Launch an agent server""" +def as_server() -> None: + """Launch an agent server with terminal command. + + Note: + + The arguments of `as_server` are listed as follows: + + * `--host`: the hostname of the server. + * `--port`: the socket port of the server. + * `--max_pool_size`: max number of task results that the server can + accommodate. + * `--max_timeout_seconds`: max timeout seconds of a task. + * `--local_mode`: whether the started agent server only listens to + local requests. + + In most cases, you only need to specify the `--host` and `--port`. + + .. code-block:: shell + + as_server --host localhost --port 12345 + + """ # noqa parser = argparse.ArgumentParser() parser.add_argument( @@ -410,7 +430,7 @@ def launch() -> None: "--local_mode", type=bool, default=False, - help="whether the started rpc server only listens to local requests", + help="whether the started agent server only listens to local requests", ) args = parser.parse_args() launcher = AgentServerLauncher( From 918c75097ea6bf0afb55e63f293287ac424228ff Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 16:45:53 +0800 Subject: [PATCH 07/13] fix windows add_signal_handler --- src/agentscope/server/launcher.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 272dacd34..f7feb0c34 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """ Server of distributed agent""" +import os from multiprocessing import Process, Event, Pipe from multiprocessing.synchronize import Event as EventClass import socket @@ -149,11 +150,13 @@ async def shutdown_signal_handler() -> None: await server.stop(grace=5) loop = asyncio.get_running_loop() - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler( - sig, - lambda: asyncio.create_task(shutdown_signal_handler()), - ) + if os.name != "nt": + # windows does not support add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown_signal_handler()), + ) while True: try: port = check_port(port) @@ -184,7 +187,7 @@ async def shutdown_signal_handler() -> None: logger.info( f"Stopping agent server at [{host}:{port}]", ) - await server.stop(10.0) + await server.stop(grace=10.0) else: await server.wait_for_termination() logger.info( From 8f3eaa29a96b7bb05c2ee2eae7cd95a7996ee718 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 17:32:08 +0800 Subject: [PATCH 08/13] rename agentserverlauncher --- docs/sphinx_doc/en/source/tutorial/208-distribute.md | 4 ++-- docs/sphinx_doc/zh_CN/source/index.rst | 1 + .../zh_CN/source/tutorial/208-distribute.md | 4 ++-- examples/distributed_basic/distributed_dialog.py | 4 ++-- examples/distributed_debate/distributed_debate.py | 4 ++-- examples/distributed_simulation/main.py | 4 ++-- src/agentscope/agents/rpc_agent.py | 4 ++-- src/agentscope/server/__init__.py | 8 ++++---- src/agentscope/server/launcher.py | 12 ++++++------ src/agentscope/server/servicer.py | 6 +++--- tests/rpc_agent_test.py | 10 +++++----- 11 files changed, 31 insertions(+), 30 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 24fbc3acf..e5a16d977 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -69,7 +69,7 @@ agentscope.init( ... ) # Create an agent service process -server = AgentServerLauncher( +server = RpcAgentServerLauncher( host="ip_a", port=12001, # choose an available port ) @@ -94,7 +94,7 @@ agentscope.init( ... ) # Create an agent service process -server = AgentServerLauncher( +server = RpcAgentServerLauncher( host="ip_b", port=12002, # choose an available port ) diff --git a/docs/sphinx_doc/zh_CN/source/index.rst b/docs/sphinx_doc/zh_CN/source/index.rst index 7f6c48275..662fb267c 100644 --- a/docs/sphinx_doc/zh_CN/source/index.rst +++ b/docs/sphinx_doc/zh_CN/source/index.rst @@ -38,6 +38,7 @@ AgentScope 文档 agentscope.pipelines agentscope.service agentscope.rpc + agentscope.server agentscope.web agentscope.prompt agentscope.utils diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index fe840aa00..ab4f24520 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -68,7 +68,7 @@ agentscope.init( ... ) # Create an agent service process -server = AgentServerLauncher( +server = RpcAgentServerLauncher( host="ip_a", port=12001, # choose an available port ) @@ -93,7 +93,7 @@ agentscope.init( ... ) # Create an agent service process -server = AgentServerLauncher( +server = RpcAgentServerLauncher( host="ip_b", port=12002, # choose an available port ) diff --git a/examples/distributed_basic/distributed_dialog.py b/examples/distributed_basic/distributed_dialog.py index c101fd2f6..ab0de4235 100644 --- a/examples/distributed_basic/distributed_dialog.py +++ b/examples/distributed_basic/distributed_dialog.py @@ -7,7 +7,7 @@ import agentscope from agentscope.agents.user_agent import UserAgent from agentscope.agents.dialog_agent import DialogAgent -from agentscope.server import AgentServerLauncher +from agentscope.server import RpcAgentServerLauncher def parse_args() -> argparse.Namespace: @@ -36,7 +36,7 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: agentscope.init( model_configs="configs/model_configs.json", ) - assistant_server_launcher = AgentServerLauncher( + assistant_server_launcher = RpcAgentServerLauncher( host=assistant_host, port=assistant_port, ) diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index 7fc84024b..a4e0a4287 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -8,7 +8,7 @@ import agentscope from agentscope.agents import DialogAgent from agentscope.msghub import msghub -from agentscope.server import AgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger @@ -75,7 +75,7 @@ def setup_server(parsed_args: argparse.Namespace) -> None: ) host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") - server_launcher = AgentServerLauncher( + server_launcher = RpcAgentServerLauncher( host=host, port=port, custom_agents=[UserProxyAgent, DialogAgent], diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index 130527d78..bb26fe533 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -11,7 +11,7 @@ import agentscope from agentscope.agents import AgentBase -from agentscope.server import AgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg @@ -58,7 +58,7 @@ def setup_participant_agent_server(host: str, port: int) -> None: model_configs="configs/model_configs.json", use_monitor=False, ) - assistant_server_launcher = AgentServerLauncher( + assistant_server_launcher = RpcAgentServerLauncher( host=host, port=port, max_pool_size=16384, diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 3332b00d2..e4274f700 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -9,7 +9,7 @@ serialize, ) from agentscope.rpc import RpcAgentClient -from agentscope.server.launcher import AgentServerLauncher +from agentscope.server.launcher import RpcAgentServerLauncher def rpc_servicer_method( # type: ignore[no-untyped-def] @@ -89,7 +89,7 @@ def __init__( launch_server = port is None if launch_server: self.host = "localhost" - self.server_launcher = AgentServerLauncher( + self.server_launcher = RpcAgentServerLauncher( host=self.host, port=port, max_pool_size=max_pool_size, diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py index 9dbfe3faa..8b69a542a 100644 --- a/src/agentscope/server/__init__.py +++ b/src/agentscope/server/__init__.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- """Import all server related modules in the package.""" -from .launcher import AgentServerLauncher, as_server -from .servicer import AgentPlatform +from .launcher import RpcAgentServerLauncher, as_server +from .servicer import AgentServerServicer __all__ = [ - "AgentServerLauncher", - "AgentPlatform", + "RpcAgentServerLauncher", + "AgentServerServicer", "as_server", ] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index f7feb0c34..34c09f1e7 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -16,7 +16,7 @@ except ImportError: grpc = None -from .servicer import AgentPlatform +from .servicer import AgentServerServicer from ..agents.agent import AgentBase from ..utils.tools import _get_timestamp @@ -131,7 +131,7 @@ async def setup_agent_server_async( if init_settings is not None: init_process(**init_settings) - servicer = AgentPlatform( + servicer = AgentServerServicer( host=host, port=port, max_pool_size=max_pool_size, @@ -231,8 +231,8 @@ def check_port(port: Optional[int] = None) -> int: return port -class AgentServerLauncher: - """The launcher of AgentPlatform (formerly RpcAgentServer).""" +class RpcAgentServerLauncher: + """The launcher of AgentServer.""" def __init__( self, @@ -293,7 +293,7 @@ def __init__( ): logger.warning( "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" - " in `AgentServerLauncher`", + " in `RpcAgentServerLauncher`", ) def generate_server_id(self) -> str: @@ -436,7 +436,7 @@ def as_server() -> None: help="whether the started agent server only listens to local requests", ) args = parser.parse_args() - launcher = AgentServerLauncher( + launcher = RpcAgentServerLauncher( host=args.host, port=args.port, max_pool_size=args.max_pool_size, diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index b178720b5..055e8d7ab 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -34,8 +34,8 @@ RpcAgentServicer = Any -class AgentPlatform(RpcAgentServicer): - """A platform for agent to run on (formerly RpcServerSideWrapper)""" +class AgentServerServicer(RpcAgentServicer): + """A Servicer for agent to run on (formerly RpcServerSideWrapper)""" def __init__( self, @@ -44,7 +44,7 @@ def __init__( max_pool_size: int = 8192, max_timeout_seconds: int = 1800, ): - """Init the AgentPlatform. + """Init the AgentServerServicer. Args: host (`str`, defaults to "localhost"): diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index bc861824f..d70613268 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -9,7 +9,7 @@ import agentscope from agentscope.agents import AgentBase, DistConf -from agentscope.server import AgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize @@ -220,7 +220,7 @@ def test_single_rpc_agent_server(self) -> None: def test_connect_to_an_existing_rpc_server(self) -> None: """test connecting to an existing server""" - launcher = AgentServerLauncher( + launcher = RpcAgentServerLauncher( # choose port automatically host="127.0.0.1", port=12010, @@ -420,7 +420,7 @@ def test_standalone_multiprocess_init(self) -> None: def test_multi_agent_in_same_server(self) -> None: """test agent server with multi agent""" - launcher = AgentServerLauncher( + launcher = RpcAgentServerLauncher( host="127.0.0.1", port=12010, local_mode=False, @@ -548,14 +548,14 @@ def test_error_handling(self) -> None: def test_agent_nesting(self) -> None: """Test agent nesting""" host = "localhost" - launcher1 = AgentServerLauncher( + launcher1 = RpcAgentServerLauncher( # choose port automatically host=host, port=12010, local_mode=False, custom_agents=[DemoGatherAgent, DemoGeneratorAgent], ) - launcher2 = AgentServerLauncher( + launcher2 = RpcAgentServerLauncher( # choose port automatically host=host, port=12011, From 49c5dcf0784379303b0189fb2133566de07d6229 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 18:25:40 +0800 Subject: [PATCH 09/13] add model_config_path and update tutorial --- .../en/source/tutorial/208-distribute.md | 15 ++++--- .../zh_CN/source/tutorial/208-distribute.md | 16 ++++--- src/agentscope/server/launcher.py | 43 +++++++++++++------ 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index e5a16d977..3b9f7a605 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -59,14 +59,16 @@ b = AgentB( #### Independent Process Mode In the Independent Process Mode, we need to start the agent server process on the target machine first. +When starting the agent server process, you need to specify a model config file, which contains the models which can be used in the agent server, the IP address and port of the agent server process For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). -You can run the following code on `Machine1`: +You can run the following code on `Machine1`, and make sure you have put your model config file in `model_config_path_a`. ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_a, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -82,16 +84,17 @@ server.wait_until_terminate() > For similarity, you can run the following command in your terminal rather than the above code: > > ```shell -> as_server --host ip_a --port 12001 +> as_server --host ip_a --port 12001 --model-config-path model_config_path_a > ``` -And run the following code on `Machine2`: +And put your model config file accordingly in `model_config_path_b` and run the following code on `Machine2`. ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_b, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -107,7 +110,7 @@ server.wait_until_terminate() > Similarly, you can run the following command in your terminal to setup the agent server: > > ```shell -> as_server --host ip_b --port 12002 +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b > ``` Then, you can connect to the agent servers from the main process with the following code. diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index ab4f24520..0a576d228 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -57,15 +57,16 @@ b = AgentB( #### 独立进程模式 -在独立进程模式中,需要首先在目标机器上启动智能体服务器进程。 +在独立进程模式中,需要首先在目标机器上启动智能体服务器进程,启动时需要提供该服务器能够使用的模型的配置信息,以及服务器的 IP 和端口号。 例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 -你可以先在 `Machine1` 上运行如下代码: +你可以先在 `Machine1` 上运行如下代码,运行之前请确保已经将模型配置文件放置在 `model_config_path_a` 位置。: ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_a, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -81,16 +82,17 @@ server.wait_until_terminate() > 为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: > > ```shell -> as_server --host ip_a --port 12001 +> as_server --host ip_a --port 12001 --model-config-path model_config_path_a > ``` -之后在 `Machine2` 上运行如下代码: +在 `Machine2` 上运行如下代码,这里同样要确保已经将模型配置文件放置在 `model_config_path_b` 位置。 ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_b, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -106,7 +108,7 @@ server.wait_until_terminate() > 这里也同样可以用如下指令来代替上面的代码。 > > ```shell -> as_server --host ip_b --port 12002 +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b > ``` 接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 34c09f1e7..abd13a24d 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -16,12 +16,13 @@ except ImportError: grpc = None -from .servicer import AgentServerServicer -from ..agents.agent import AgentBase -from ..utils.tools import _get_timestamp +import agentscope +from agentscope.server.servicer import AgentServerServicer +from agentscope.agents.agent import AgentBase +from agentscope.utils.tools import _get_timestamp try: - from ..rpc.rpc_agent_pb2_grpc import ( + from agentscope.rpc.rpc_agent_pb2_grpc import ( add_RpcAgentServicer_to_server, ) except ModuleNotFoundError: @@ -390,20 +391,21 @@ def as_server() -> None: * `--host`: the hostname of the server. * `--port`: the socket port of the server. - * `--max_pool_size`: max number of task results that the server can + * `--max-pool-size`: max number of task results that the server can accommodate. - * `--max_timeout_seconds`: max timeout seconds of a task. - * `--local_mode`: whether the started agent server only listens to + * `--max-timeout-seconds`: max timeout seconds of a task. + * `--local-mode`: whether the started agent server only listens to local requests. + * `--model-config-path`: the path to the model config json file - In most cases, you only need to specify the `--host` and `--port`. + In most cases, you only need to specify the `--host`, `--port` and + `--model-config-path`. .. code-block:: shell - as_server --host localhost --port 12345 - - """ + as_server --host localhost --port 12345 --model-config-path config.json + """ # noqa parser = argparse.ArgumentParser() parser.add_argument( "--host", @@ -418,24 +420,37 @@ def as_server() -> None: help="socket port of the server", ) parser.add_argument( - "--max_pool_size", + "--max-pool-size", type=int, default=8192, help="max number of task results that the server can accommodate", ) parser.add_argument( - "--max_timeout_seconds", + "--max-timeout-seconds", type=int, default=1800, help="max timeout for task results", ) parser.add_argument( - "--local_mode", + "--local-mode", type=bool, default=False, help="whether the started agent server only listens to local requests", ) + parser.add_argument( + "--model-config-path", + type=str, + help="path to the model config json file", + ) args = parser.parse_args() + agentscope.init( + project="agent_server", + name=f"server_{args.host}:{args.port}", + runtime_id=_get_timestamp( + "server_{}_{}_%y%m%d-%H%M%S", + ).format(args.host, args.port), + model_configs=args.model_config_path, + ) launcher = RpcAgentServerLauncher( host=args.host, port=args.port, From b635e35def0409c43befe5b361d787415b14f338 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Mon, 13 May 2024 10:32:02 +0800 Subject: [PATCH 10/13] update tutorial --- docs/sphinx_doc/en/source/tutorial/208-distribute.md | 2 +- docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 3b9f7a605..6aa7c205c 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -61,7 +61,7 @@ b = AgentB( In the Independent Process Mode, we need to start the agent server process on the target machine first. When starting the agent server process, you need to specify a model config file, which contains the models which can be used in the agent server, the IP address and port of the agent server process For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). -You can run the following code on `Machine1`, and make sure you have put your model config file in `model_config_path_a`. +You can run the following code on `Machine1`, and make sure you have put your model config file in `model_config_path_a`. The example model config file instances are located under `examples/model_configs_template`. ```python # import some packages diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 0a576d228..ecbe50a6a 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -59,7 +59,7 @@ b = AgentB( 在独立进程模式中,需要首先在目标机器上启动智能体服务器进程,启动时需要提供该服务器能够使用的模型的配置信息,以及服务器的 IP 和端口号。 例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 -你可以先在 `Machine1` 上运行如下代码,运行之前请确保已经将模型配置文件放置在 `model_config_path_a` 位置。: +你可以先在 `Machine1` 上运行如下代码,运行之前请确保已经将模型配置文件放置在 `model_config_path_a` 位置,模型配置文件样例可参考 `examples/model_configs_template`。 ```python # import some packages From c96156d788385217fe3b1004cd60a70fad431f75 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Wed, 15 May 2024 10:18:13 +0800 Subject: [PATCH 11/13] fix pre-commit --- src/agentscope/server/servicer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index e6b4749cd..0b3dd179b 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -21,7 +21,10 @@ grpc = ImportErrorReporter(import_error, "distribute") ServicerContext = ImportErrorReporter(import_error, "distribute") ExpiringDict = ImportErrorReporter(import_error, "distribute") - RpcMsg = ImportErrorReporter(import_error, "distribute") + RpcMsg = ImportErrorReporter( # type: ignore[misc] + import_error, + "distribute", + ) RpcAgentServicer = ImportErrorReporter(import_error, "distribute") from ..agents.agent import AgentBase From bf86cc5995ae94c90098af840c7e8b04036a8de2 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Thu, 16 May 2024 20:56:00 +0800 Subject: [PATCH 12/13] fix comments --- .../en/source/tutorial/208-distribute.md | 4 +- .../zh_CN/source/tutorial/208-distribute.md | 4 +- src/agentscope/server/launcher.py | 54 ++++--------------- src/agentscope/server/servicer.py | 2 +- src/agentscope/utils/tools.py | 39 +++++++++++++- 5 files changed, 53 insertions(+), 50 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 6aa7c205c..0381a13f1 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -61,7 +61,7 @@ b = AgentB( In the Independent Process Mode, we need to start the agent server process on the target machine first. When starting the agent server process, you need to specify a model config file, which contains the models which can be used in the agent server, the IP address and port of the agent server process For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). -You can run the following code on `Machine1`, and make sure you have put your model config file in `model_config_path_a`. The example model config file instances are located under `examples/model_configs_template`. +You can run the following code on `Machine1`.Before running, make sure that the machine has access to all models that used in your application, specifically, you need to put your model config file in `model_config_path_a` and set environment variables such as your model API key correctly in `Machine1`. The example model config file instances are located under `examples/model_configs_template`. ```python # import some packages @@ -87,7 +87,7 @@ server.wait_until_terminate() > as_server --host ip_a --port 12001 --model-config-path model_config_path_a > ``` -And put your model config file accordingly in `model_config_path_b` and run the following code on `Machine2`. +Then put your model config file accordingly in `model_config_path_b`, set environment variables, and run the following code on `Machine2`. ```python # import some packages diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index ecbe50a6a..a185bd5da 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -59,7 +59,7 @@ b = AgentB( 在独立进程模式中,需要首先在目标机器上启动智能体服务器进程,启动时需要提供该服务器能够使用的模型的配置信息,以及服务器的 IP 和端口号。 例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 -你可以先在 `Machine1` 上运行如下代码,运行之前请确保已经将模型配置文件放置在 `model_config_path_a` 位置,模型配置文件样例可参考 `examples/model_configs_template`。 +你可以在 `Machine1` 上运行如下代码。在运行之前请确保该机器能够正确访问到应用中所使用的所有模型。具体来讲,需要将用到的所有模型的配置信息放置在 `model_config_path_a` 文件中,并检查API key 等环境变量是否正确设置,模型配置文件样例可参考 `examples/model_configs_template`。 ```python # import some packages @@ -85,7 +85,7 @@ server.wait_until_terminate() > as_server --host ip_a --port 12001 --model-config-path model_config_path_a > ``` -在 `Machine2` 上运行如下代码,这里同样要确保已经将模型配置文件放置在 `model_config_path_b` 位置。 +在 `Machine2` 上运行如下代码,这里同样要确保已经将模型配置文件放置在 `model_config_path_b` 位置并设置环境变量,从而确保运行在该机器上的 Agent 能够正常访问到模型。 ```python # import some packages diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index c5c17ed30..fda6ffa19 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -3,11 +3,10 @@ import os from multiprocessing import Process, Event, Pipe from multiprocessing.synchronize import Event as EventClass -import socket import asyncio import signal import argparse -from typing import Type, Optional +from typing import Type from concurrent import futures from loguru import logger @@ -28,10 +27,13 @@ import agentscope from agentscope.server.servicer import AgentServerServicer from agentscope.agents.agent import AgentBase -from agentscope.utils.tools import _get_timestamp +from agentscope.utils.tools import ( + _get_timestamp, + check_port, +) -def setup_agent_server( +def _setup_agent_server( host: str, port: int, server_id: str, @@ -73,7 +75,7 @@ def setup_agent_server( A list of custom agent classes that are not in `agentscope.agents`. """ asyncio.run( - setup_agent_server_async( + _setup_agent_server_async( host=host, port=port, server_id=server_id, @@ -89,7 +91,7 @@ def setup_agent_server( ) -async def setup_agent_server_async( +async def _setup_agent_server_async( host: str, port: int, server_id: str, @@ -198,42 +200,6 @@ async def shutdown_signal_handler() -> None: ) -def find_available_port() -> int: - """Get an unoccupied socket port number.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - -def check_port(port: Optional[int] = None) -> int: - """Check if the port is available. - - Args: - port (`int`): - the port number being checked. - - Returns: - `int`: the port number that passed the check. If the port is found - to be occupied, an available port number will be automatically - returned. - """ - if port is None: - new_port = find_available_port() - logger.warning( - "agent server port is not provided, automatically select " - f"[{new_port}] as the port number.", - ) - return new_port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: - new_port = find_available_port() - logger.warning( - f"Port [{port}] is occupied, use [{new_port}] instead", - ) - return new_port - return port - - class RpcAgentServerLauncher: """The launcher of AgentServer.""" @@ -309,7 +275,7 @@ def _launch_in_main(self) -> None: f"Launching agent server at [{self.host}:{self.port}]...", ) asyncio.run( - setup_agent_server_async( + _setup_agent_server_async( host=self.host, port=self.port, server_id=self.server_id, @@ -328,7 +294,7 @@ def _launch_in_sub(self) -> None: self.parent_con, child_con = Pipe() start_event = Event() server_process = Process( - target=setup_agent_server, + target=_setup_agent_server, kwargs={ "host": self.host, "port": self.port, diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index 0b3dd179b..8f6134f81 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -36,7 +36,7 @@ class AgentServerServicer(RpcAgentServicer): - """A Servicer for agent to run on (formerly RpcServerSideWrapper)""" + """A Servicer for RPC Agent Server (formerly RpcServerSideWrapper)""" def __init__( self, diff --git a/src/agentscope/utils/tools.py b/src/agentscope/utils/tools.py index 8ebd23777..8888d99e6 100644 --- a/src/agentscope/utils/tools.py +++ b/src/agentscope/utils/tools.py @@ -6,7 +6,8 @@ import os.path import secrets import string -from typing import Any, Literal, List +import socket +from typing import Any, Literal, List, Optional from urllib.parse import urlparse @@ -61,6 +62,42 @@ def to_dialog_str(item: dict) -> str: return f"{speaker}: {content}" +def find_available_port() -> int: + """Get an unoccupied socket port number.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def check_port(port: Optional[int] = None) -> int: + """Check if the port is available. + + Args: + port (`int`): + the port number being checked. + + Returns: + `int`: the port number that passed the check. If the port is found + to be occupied, an available port number will be automatically + returned. + """ + if port is None: + new_port = find_available_port() + logger.warning( + "agent server port is not provided, automatically select " + f"[{new_port}] as the port number.", + ) + return new_port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + new_port = find_available_port() + logger.warning( + f"Port [{port}] is occupied, use [{new_port}] instead", + ) + return new_port + return port + + def _guess_type_by_extension( url: str, ) -> Literal["image", "audio", "video", "file"]: From c9eaf2df3a167ee9980d95bf97f290d02e8a2951 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 17 May 2024 11:44:27 +0800 Subject: [PATCH 13/13] fix comments --- src/agentscope/agents/agent.py | 16 +++++++--- src/agentscope/server/launcher.py | 49 +++++++++++++++++++++---------- src/agentscope/server/servicer.py | 29 +++++++++++------- 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index 91f45527a..4341952d9 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -384,14 +384,22 @@ def to_dist( port (`int`, defaults to `None`): Port of the rpc agent server. max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. + Only takes effect when `host` and `port` are not filled in. + The max number of agent reply messages that the started agent + server can accommodate. Note that the oldest message will be + deleted after exceeding the pool size. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. + Only takes effect when `host` and `port` are not filled in. + Maximum time for reply messages to be cached in the launched + agent server. Note that expired messages will be deleted. local_mode (`bool`, defaults to `True`): - Whether the started rpc server only listens to local + Only takes effect when `host` and `port` are not filled in. + Whether the started agent server only listens to local requests. lazy_launch (`bool`, defaults to `True`): - Only launch the server when the agent is called. + Only takes effect when `host` and `port` are not filled in. + If `True`, launch the agent server when the agent is called, + otherwise, launch the agent server immediately. launch_server(`bool`, defaults to `None`): This field has been deprecated and will be removed in future releases. diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index fda6ffa19..ed5ed7f67 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -68,9 +68,9 @@ def _setup_agent_server( local_mode (`bool`, defaults to `None`): Only listen to local requests. max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. + Max number of agent replies that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. + Timeout for agent replies. custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in `agentscope.agents`. """ @@ -124,11 +124,15 @@ async def _setup_agent_server_async( pipe (`int`, defaults to `None`): A pipe instance used to pass the actual port of the server. local_mode (`bool`, defaults to `None`): - Only listen to local requests. + If `True`, only listen to requests from "localhost", otherwise, + listen to requests from all hosts. max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted + after exceeding the pool size. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in `agentscope.agents`. """ @@ -224,12 +228,15 @@ def __init__( port (`int`, defaults to `None`): Socket port of the agent server. max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted + after exceeding the pool size. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. local_mode (`bool`, defaults to `False`): - Whether the started server only listens to local - requests. + If `True`, only listen to requests from "localhost", otherwise, + listen to requests from all hosts. custom_agents (`list`, defaults to `None`): A list of custom agent classes that are not in `agentscope.agents`. @@ -359,9 +366,11 @@ def as_server() -> None: * `--host`: the hostname of the server. * `--port`: the socket port of the server. - * `--max-pool-size`: max number of task results that the server can - accommodate. - * `--max-timeout-seconds`: max timeout seconds of a task. + * `--max-pool-size`: max number of agent reply messages that the server + can accommodate. Note that the oldest message will be deleted + after exceeding the pool size. + * `--max-timeout-seconds`: max time for reply messages to be cached + in the server. Note that expired messages will be deleted. * `--local-mode`: whether the started agent server only listens to local requests. * `--model-config-path`: the path to the model config json file @@ -391,19 +400,29 @@ def as_server() -> None: "--max-pool-size", type=int, default=8192, - help="max number of task results that the server can accommodate", + help=( + "max number of agent reply messages that the server " + "can accommodate. Note that the oldest message will be deleted " + "after exceeding the pool size." + ), ) parser.add_argument( "--max-timeout-seconds", type=int, default=1800, - help="max timeout for task results", + help=( + "max time for agent reply messages to be cached" + "in the server. Note that expired messages will be deleted." + ), ) parser.add_argument( "--local-mode", type=bool, default=False, - help="whether the started agent server only listens to local requests", + help=( + "If `True`, only listen to requests from 'localhost', otherwise, " + "listen to requests from all hosts." + ), ) parser.add_argument( "--model-config-path", diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index 8f6134f81..53c63425f 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -53,12 +53,12 @@ def __init__( port (`int`, defaults to `None`): Port of the rpc agent server. max_pool_size (`int`, defaults to `8192`): - The max number of task results that the server can - accommodate. Note that the oldest result will be deleted + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted after exceeding the pool size. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. Note that expired results will be - deleted. + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. """ self.host = host self.port = port @@ -73,7 +73,8 @@ def __init__( self.agent_pool: dict[str, AgentBase] = {} def get_task_id(self) -> int: - """Get the auto-increment task id.""" + """Get the auto-increment task id. + Each reply call will get a unique task id.""" with self.task_id_lock: self.task_id_counter += 1 return self.task_id_counter @@ -191,11 +192,11 @@ def _reply(self, request: RpcMsg) -> RpcMsg: ) def _get(self, request: RpcMsg) -> RpcMsg: - """Get function of RpcAgentService + """Get a reply message with specific task_id. Args: request (`RpcMsg`): - Identifier of message, with json format:: + The task id that generated this message, with json format:: { 'task_id': int @@ -215,7 +216,7 @@ def _get(self, request: RpcMsg) -> RpcMsg: return RpcMsg(value=result.serialize()) def _observe(self, request: RpcMsg) -> RpcMsg: - """Observe function of RpcAgentService + """Observe function of the original agent. Args: request (`RpcMsg`): @@ -232,7 +233,7 @@ def _observe(self, request: RpcMsg) -> RpcMsg: return RpcMsg() def _create_agent(self, request: RpcMsg) -> RpcMsg: - """Create a new agent instance for the agent_id. + """Create a new agent instance with the given agent_id. Args: request (RpcMsg): request message with a `agent_id` field. @@ -272,7 +273,7 @@ def _clone_agent(self, request: RpcMsg) -> RpcMsg: return RpcMsg(value=new_agent.agent_id) # type: ignore[arg-type] def _delete_agent(self, request: RpcMsg) -> RpcMsg: - """Delete the agent instance of the specific sesssion_id. + """Delete the agent instance of the specific agent_id. Args: request (RpcMsg): request message with a `agent_id` field. @@ -286,7 +287,13 @@ def process_messages( agent_id: str, task_msg: dict = None, ) -> None: - """Task processing.""" + """Processing an input message and generate its reply message. + + Args: + task_id (`int`): task id of the input message, . + agent_id (`str`): the id of the agent that accepted the message. + task_msg (`dict`): the input message. + """ if isinstance(task_msg, PlaceholderMessage): task_msg.update_value() cond = self.result_pool[task_id]