From 7997f7a83641f2b93851866a916e998bf73d82c7 Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 10 May 2024 15:04:13 +0800 Subject: [PATCH] add as_server into setup and support graceful shutdown --- setup.py | 1 + src/agentscope/server/launcher.py | 73 ++++++++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 7dca2181b..955bb47d2 100644 --- a/setup.py +++ b/setup.py @@ -121,6 +121,7 @@ "console_scripts": [ "as_studio=agentscope.web.studio.studio:run_app", "as_workflow=agentscope.web.workstation.workflow:main", + "as_server=agentscope.server.launcher:launch", ], }, ) diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index 78afb4f0f..6328fe865 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -4,6 +4,8 @@ from multiprocessing.synchronize import Event as EventClass import socket import asyncio +import signal +import argparse from typing import Any, Type, Optional from concurrent import futures from loguru import logger @@ -130,13 +132,24 @@ async def setup_agent_server_async( if custom_agents is not None: for agent_class in custom_agents: AgentBase.register_agent_class(agent_class=agent_class) + + async def shutdown_signal_handler() -> None: + logger.info( + f"Received shutdown signal. Gracefully stopping the server at " + f"[{host}:{port}].", + ) + await server.stop(grace=5) + + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown_signal_handler()), + ) while True: try: port = check_port(port) servicer.port = port - logger.info( - f"Starting rpc server at port [{port}]...", - ) server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=None), ) @@ -149,11 +162,11 @@ async def setup_agent_server_async( break except OSError: logger.warning( - f"Failed to start rpc server at port [{port}]" + f"Failed to start agent server at port [{port}]" f"try another port", ) logger.info( - f"rpc server at port [{port}] started successfully", + f"agent server at [{host}:{port}] started successfully", ) if start_event is not None: pipe.send(port) @@ -161,13 +174,13 @@ async def setup_agent_server_async( while not stop_event.is_set(): await asyncio.sleep(1) logger.info( - f"Stopping rpc server at port [{port}]", + f"Stopping agent server at [{host}:{port}]", ) await server.stop(10.0) else: await server.wait_for_termination() logger.info( - f"rpc server at port [{port}] stopped successfully", + f"agent server at [{host}:{port}] stopped successfully", ) @@ -342,3 +355,49 @@ def shutdown(self) -> None: f"Agent server at port [{self.port}] is killed.", ) self.server = None + + +def launch() -> None: + """Launch an agent server""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + type=str, + default="localhost", + help="hostname of the server", + ) + parser.add_argument( + "--port", + type=int, + default=12310, + help="socket port of the server", + ) + parser.add_argument( + "--max_pool_size", + type=int, + default=8192, + help="max number of task results that the server can accommodate", + ) + parser.add_argument( + "--max_timeout_seconds", + type=int, + default=1800, + help="max timeout for task results", + ) + parser.add_argument( + "--local_mode", + type=bool, + default=False, + help="whether the started rpc server only listens to local requests", + ) + args = parser.parse_args() + launcher = AgentServerLauncher( + host=args.host, + port=args.port, + max_pool_size=args.max_pool_size, + max_timeout_seconds=args.max_timeout_seconds, + local_mode=args.local_mode, + ) + launcher.launch(in_subprocess=False) + launcher.wait_until_terminate()