Skip to content

Commit

Permalink
add as_server into setup and support graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed May 10, 2024
1 parent dbf5b18 commit 7997f7a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 7 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
"console_scripts": [
"as_studio=agentscope.web.studio.studio:run_app",
"as_workflow=agentscope.web.workstation.workflow:main",
"as_server=agentscope.server.launcher:launch",
],
},
)
73 changes: 66 additions & 7 deletions src/agentscope/server/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from multiprocessing.synchronize import Event as EventClass
import socket
import asyncio
import signal
import argparse
from typing import Any, Type, Optional
from concurrent import futures
from loguru import logger
Expand Down Expand Up @@ -130,13 +132,24 @@ async def setup_agent_server_async(
if custom_agents is not None:
for agent_class in custom_agents:
AgentBase.register_agent_class(agent_class=agent_class)

async def shutdown_signal_handler() -> None:
logger.info(
f"Received shutdown signal. Gracefully stopping the server at "
f"[{host}:{port}].",
)
await server.stop(grace=5)

loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown_signal_handler()),
)
while True:
try:
port = check_port(port)
servicer.port = port
logger.info(
f"Starting rpc server at port [{port}]...",
)
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=None),
)
Expand All @@ -149,25 +162,25 @@ async def setup_agent_server_async(
break
except OSError:
logger.warning(
f"Failed to start rpc server at port [{port}]"
f"Failed to start agent server at port [{port}]"
f"try another port",
)
logger.info(
f"rpc server at port [{port}] started successfully",
f"agent server at [{host}:{port}] started successfully",
)
if start_event is not None:
pipe.send(port)
start_event.set()
while not stop_event.is_set():
await asyncio.sleep(1)
logger.info(
f"Stopping rpc server at port [{port}]",
f"Stopping agent server at [{host}:{port}]",
)
await server.stop(10.0)
else:
await server.wait_for_termination()
logger.info(
f"rpc server at port [{port}] stopped successfully",
f"agent server at [{host}:{port}] stopped successfully",
)


Expand Down Expand Up @@ -342,3 +355,49 @@ def shutdown(self) -> None:
f"Agent server at port [{self.port}] is killed.",
)
self.server = None


def launch() -> None:
"""Launch an agent server"""

parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
type=str,
default="localhost",
help="hostname of the server",
)
parser.add_argument(
"--port",
type=int,
default=12310,
help="socket port of the server",
)
parser.add_argument(
"--max_pool_size",
type=int,
default=8192,
help="max number of task results that the server can accommodate",
)
parser.add_argument(
"--max_timeout_seconds",
type=int,
default=1800,
help="max timeout for task results",
)
parser.add_argument(
"--local_mode",
type=bool,
default=False,
help="whether the started rpc server only listens to local requests",
)
args = parser.parse_args()
launcher = AgentServerLauncher(
host=args.host,
port=args.port,
max_pool_size=args.max_pool_size,
max_timeout_seconds=args.max_timeout_seconds,
local_mode=args.local_mode,
)
launcher.launch(in_subprocess=False)
launcher.wait_until_terminate()

0 comments on commit 7997f7a

Please sign in to comment.