diff --git a/README.md b/README.md index 453b842ad..36d9dc119 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ the following libraries. - [Distributed Conversation](./examples/distributed_basic) - [Distributed Debate](./examples/distributed_debate) - [Distributed Parallel Search](./examples/distributed_search) + - [Distributed Large Scale Simulation](./examples/distributed_simulation) More models, services and examples are coming soon! diff --git a/README_ZH.md b/README_ZH.md index 970992592..2ed8e2987 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -113,6 +113,7 @@ AgentScope支持使用以下库快速部署本地模型服务。 - [分布式对话](./examples/distributed_basic) - [分布式辩论](./examples/distributed_debate) - [分布式并行搜索](./examples/distributed_search) + - [分布式大规模仿真](./examples/distributed_simulation) 更多模型API、服务和示例即将推出! diff --git a/docs/sphinx_doc/en/source/conf.py b/docs/sphinx_doc/en/source/conf.py index 2025ced67..788bda020 100644 --- a/docs/sphinx_doc/en/source/conf.py +++ b/docs/sphinx_doc/en/source/conf.py @@ -49,6 +49,11 @@ autodoc_member_order = "bysource" +autodoc_default_options = { + "members": True, + "special-members": "__init__", +} + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/sphinx_doc/en/source/tutorial/201-agent.md b/docs/sphinx_doc/en/source/tutorial/201-agent.md index 7d583331d..dbe2f3e77 100644 --- a/docs/sphinx_doc/en/source/tutorial/201-agent.md +++ b/docs/sphinx_doc/en/source/tutorial/201-agent.md @@ -16,6 +16,8 @@ Each AgentBase derivative is composed of several key characteristics: * `sys_prompt` & `engine`: The system prompt acts as predefined instructions that guide the agent in its interactions; and the `engine` is used to dynamically generate a suitable prompt. For more details about them, we defer to [Prompt Engine](206-prompt). +* `to_dist`: Used to create a distributed version of the agent, to support efficient collaboration among multiple agents. Note that `to_dist` is a reserved field and will be automatically added to the initialization function of any subclass of `AgentBase`. For more details about `to_dist`, please refer to [Distribution](208-distribute). + In addition to these attributes, `AgentBase` endows agents with pivotal methods such as `observe` and `reply`: * `observe()`: Through this method, an agent can take note of *message* without immediately replying, allowing it to update its memory based on the observed *message*. diff --git a/docs/sphinx_doc/en/source/tutorial/207-monitor.md b/docs/sphinx_doc/en/source/tutorial/207-monitor.md index e43f67b4f..76c4d08b1 100644 --- a/docs/sphinx_doc/en/source/tutorial/207-monitor.md +++ b/docs/sphinx_doc/en/source/tutorial/207-monitor.md @@ -35,8 +35,10 @@ Get a monitor instance from `MonitorFactory` to begin monitoring, and note that monitor = MonitorFactory.get_monitor() ``` -> Currently the above code returns a `SqliteMonitor` instance, which is initialized in `agentscope.init`. -> The `SqliteMonitor` class is the default implementation of `MonitorBase` class, which is based on Sqlite3. +Currently the above code returns a `SqliteMonitor` instance, which is initialized in `agentscope.init`. +The `SqliteMonitor` class is the default implementation of `MonitorBase` class, which is based on Sqlite3. + +If you don't want to use monitor, you can set `use_monitor=False` in `agentscope.init` to disable the monitor. And in this case, the `MonitorFactory.get_monitor` method will return an instance of `DummyMonitor` which has the same interface as the `SqliteMonitor` class, but does nothing inside. ### Basic Usage diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 34321f62c..714f2e05f 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -12,70 +12,158 @@ This tutorial will introduce the implementation and usage of AgentScope distribu ## Usage -In AgentScope, the process that runs the application flow is called the "main process", and all agents will run in separate processes. -According to the different relationships between the main process and the agent process, AgentScope supports two distributed modes: Master-Slave and Peer-to-Peer mode. -In the Master-Slave mode, developers can start all agent processes from the main process, while in the Peer-to-Peer mode, the agent process is independent of the main process and developers need to start the agent service on the corresponding machine. +In AgentScope, the process that runs the application flow is called the **main process**, and each agent can run in a separate process named **agent server process**. +According to the different relationships between the main process and the agent server process, AgentScope supports two modes for each agent: **Child Process** and **Independent Process** mode. -The above concepts may seem complex, but don't worry, for application developers, they only have minor differences when creating agents. Below we introduce how to create distributed agents. +- In the Child Process Mode, agent server processes will be automatically started as sub-processes from the main process. +- While in the Independent Process Mode, the agent server process is independent of the main process and developers need to start the agent server process on the corresponding machine. -### Step 1: Create a Distributed Agent +The above concepts may seem complex, but don't worry, for application developers, you only need to convert your existing agent to its distributed version. -First, the developer's agent must inherit the `agentscope.agents.AgentBase` class. `AgentBase` provides the `to_dist` method to convert the agent into its distributed version. `to_dist` mainly relies on the following parameters to implement the distributed deployment of the agent: +### Step 1: Convert your agent to its distributed version -- `host`: the hostname or IP address of the machine where the agent runs, defaults to `localhost`. -- `port`: the port of this agent's RPC server, defaults to `80`. -- `launch_server`: whether to launch an RPC server locally, defaults to `True`. +All agents in AgentScope can automatically convert to its distributed version by calling its {func}`to_dist` method. +But note that your agent must inherit from the {class}`agentscope.agents.AgentBase` class, because the `to_dist` method is provided by the `AgentBase` class. Suppose there are two agent classes `AgentA` and `AgentB`, both of which inherit from `AgentBase`. -#### Master-Slave Mode +```python +a = AgentA( + name="A" + # ... +) +b = AgentB( + name="B" + # ... +) +``` + +Next we will introduce the conversion details of both modes. -In the Master-Slave mode, since all agent processes depend on the main process, all processes actually run on the same machine. -We can start all agent processes from the main process, that is, the default parameters `launch_server=True` and `host="localhost"`, and we can omit the `port` parameter. AgentScope will automatically find an available local port for the agent process. +#### Child Process Mode + +To use this mode, you only need to call each agent's `to_dist()` method without any input parameter. AgentScope will automatically start all agent server processes from the main process. ```python +# Child Process mode a = AgentA( name="A" # ... ).to_dist() +b = AgentB( + name="B" + # ... +).to_dist() ``` -#### Peer-to-Peer Mode +#### Independent Process Mode -In the Peer-to-Peer mode, we need to start the service of the corresponding agent on the target machine first. For example, deploy an instance of `AgentA` on the machine with IP `a.b.c.d`, and its corresponding port is 12001. Run the following code on this target machine: +In the Independent Process Mode, we need to start the agent server process on the target machine first. +For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). +You can run the following code on `Machine1`: ```python -from agentscope.agents import RpcAgentServerLauncher +# import some packages +agentscope.init( + ... +) # Create an agent service process -server_a = RpcAgentServerLauncher( - agent_class=AgentA, - agent_kwargs={ - "name": "A" - ... - }, - host="a.b.c.d", - port=12001, +server = RpcAgentServerLauncher( + host="ip_a", + port=12001, # choose an available port ) # Start the service -server_a.launch() -server_a.wait_until_terminate() +server.launch() +server.wait_until_terminate() ``` -Then, we can connect to the agent service in the main process with the following code. At this time, the object `a` created in the main process can be used as a local proxy for the agent, allowing developers to write the application flow in a centralized way in the main process. +And run the following code on `Machine2`: + +```python +# import some packages + +agentscope.init( + ... +) +# Create an agent service process +server = RpcAgentServerLauncher( + host="ip_b", + port=12002, # choose an available port +) + +# Start the service +server.launch() +server.wait_until_terminate() +``` + +Then, you can connect to the agent servers from the main process with the following code. ```python a = AgentA( name="A", # ... ).to_dist( - host="a.b.c.d", + host="ip_a", port=12001, - launch_server=False, +) +b = AgentB( + name="B", + # ... +).to_dist( + host="ip_b", + port=12002, +) +``` + +The above code will deploy `AgentA` on the agent server process of `Machine1` and `AgentB` on the agent server process of `Machine2`. +And developers just need to write the application flow in a centralized way in the main process. + +#### Advanced Usage of `to_dist` + +All examples described above convert initialized agents into their distributed version through the {func}`to_dist` method, which is equivalent to initialize the agent twice, once in the main process and once in the agent server process. +For agents whose initialization process is time-consuming, the `to_dist` method is inefficient. Therefore, AgentScope also provides a method to convert the Agent instance into its distributed version while initializing it, that is, passing in `to_dist` parameter to the Agent's initialization function. + +In Child Process Mode, just pass `to_dist=True` to the Agent's initialization function. + +```python +# Child Process mode +a = AgentA( + name="A", + # ... + to_dist=True +) +b = AgentB( + name="B", + # ... + to_dist=True +) +``` + +In Independent Process Mode, you need to encapsulate the parameters of the `to_dist()` method in {class}`DistConf` instance and pass it into the `to_dist` field, for example: + +```python +a = AgentA( + name="A", + # ... + to_dist=DistConf( + host="ip_a", + port=12001, + ), +) +b = AgentB( + name="B", + # ... + to_dist=DistConf( + host="ip_b", + port=12002, + ), ) ``` +Compared with the original `to_dist()` function call, this method just initializes the agent once in the agent server process. + ### Step 2: Orchestrate Distributed Application Flow In AgentScope, the orchestration of distributed application flow is exactly the same as non-distributed programs, and developers can write the entire application flow in a centralized way. @@ -83,7 +171,7 @@ At the same time, AgentScope allows the use of a mixture of locally and distribu The following is the complete code for two agents to communicate with each other in different modes. It can be seen that AgentScope supports zero-cost migration of distributed application flow from centralized to distributed. -- All agents are centralized: +- All agents are centralized ```python # Create agent objects @@ -104,7 +192,9 @@ while x is None or x.content == "exit": x = b(x) ``` -- Agents are deployed in a distributed manner (Master-Slave mode): +- Agents are deployed in a distributed manner + - `AgentA` in Child Process mode + - `AgentB` in Independent Process Mode ```python # Create agent objects @@ -116,7 +206,10 @@ a = AgentA( b = AgentB( name="B", # ... -).to_dist() +).to_dist( + host="ip_b", + port=12002, +) # Application flow orchestration x = None @@ -148,9 +241,20 @@ By implementing each Agent as an Actor, an Agent will automatically wait for its #### PlaceHolder -Meanwhile, to support centralized application orchestration, AgentScope introduces the concept of Placeholder. A Placeholder is a special message that contains the address and port number of the agent that generated the Placeholder, which is used to indicate that the input message of the Agent is not ready yet. -When the input message of the Agent is ready, the Placeholder will be replaced by the real message, and then the actual `reply` method will be executed. +Meanwhile, to support centralized application orchestration, AgentScope introduces the concept of {class}`Placeholder`. +A Placeholder is a special message that contains the address and port number of the agent that generated the placeholder, which is used to indicate that the output message of the Agent is not ready yet. +When calling the `reply` method of a distributed agent, a placeholder is returned immediately without blocking the main process. +The interface of placeholder is exactly the same as the message, so that the orchestration flow can be written in a centralized way. +When getting values from a placeholder, the placeholder will send a request to get the real values from the source agent. +A placeholder itself is also a message, and it can be sent to other agents, and let other agents to get the real values, which can avoid sending the real values multiple times. About more detailed technical implementation solutions, please refer to our [paper](https://arxiv.org/abs/2402.14034). +#### Agent Server + +In agentscope, the agent server provides a running platform for various types of agents. +Multiple agents can run in the same agent server and hold independent memory and other local states but they will share the same computation resources. +As long as the code is not modified, an agent server can provide services for multiple main processes. +This means that when running mutliple applications, you only need to start the agent server for the first time, and it can be reused subsequently. + [[Back to the top]](#208-distribute-en) diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/201-agent.md b/docs/sphinx_doc/zh_CN/source/tutorial/201-agent.md index a14ee55c8..6959bc929 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/201-agent.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/201-agent.md @@ -17,6 +17,8 @@ * `sys_prompt`(系统提示)和`engine`(引擎):系统提示作为预定义的指令,指导agent在其互动中的行为;而engine用于动态生成合适的提示。关于它们的更多细节,我们会在[提示引擎部分](206-prompt)讨论。 +* `to_dist`(分布式):用于创建 agent 的分布式版本,以支持多 agent 的高效协作。请注意`to_dist`是一个保留字段,将自动添加到`AgentBase`所有子类的初始化函数中。关于 `to_dist` 的更多细节,请见[分布式部分](208-distribute)。 + 除了这些属性,`AgentBase` 还为agent提供了一些关键方法,如 `observe` 和 `reply`: * `observe()`:通过这个方法,一个agent可以注意到消息而不立即回复,允许它根据观察到的消息更新它的记忆。 diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/207-monitor.md b/docs/sphinx_doc/zh_CN/source/tutorial/207-monitor.md index 73e0daf4c..dc863b834 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/207-monitor.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/207-monitor.md @@ -35,8 +35,10 @@ monitor = MonitorFactory.get_monitor() ``` -> 目前上述代码返回的是 `SqliteMonitor` 实例,它在 `agentscope.init` 中初始化。 -> `SqliteMonitor` 类是基于Sqlite3的 `MonitorBase` 类的默认实现。 +目前上述代码将会返回一个 `SqliteMonitor` 实例,该实例在 `agentscope.init` 中初始化。 +`SqliteMonitor` 是一个基于 Sqlite3 的 `MonitorBase` 实现,也是当前的默认 Monitor。 + +如果不需要使用 Monitor 的相关功能,可以通过向 `agentscope.init` 中传入 `use_monitor=False` 来关闭 monitor 组件。在这种情况下,`MonitorFactory.get_monitor` 将返回一个 `DummyMonitor` 实例,该实例对外接口与 `SqliteMonitor` 完全相同,但内部不会执行任何操作。 ### 基本使用 diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index d882b7690..ef50f123f 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -12,69 +12,156 @@ AgentScope实现了基于Actor模式的智能体分布式部署和并行优化 ## 使用方法 -AgentScope中,我们将运行应用流程的进程称为“主进程”,而所有的智能体都会运行在独立的进程当中。 -根据主进程和智能体进程之间关系的不同,AgentScope支持两种分布式模式:主从模式(Master-Slave)和对等模式(Peer-to-Peer,P2P)。 -主从模式中,开发者可以从主进程中启动所有的智能体进程,而对等模式中,智能体进程相对主进程来说是独立的,需要在对应的机器上启动智能体的服务。 +AgentScope中,我们将运行应用流程的进程称为**主进程 (Main Process)**,而所有的智能体都会运行在额外的 **智能体服务器进程 (Agent Server Process)** 中。 +根据主进程域智能体服务器进程之间的关系,AgentScope 为每个 Agent 提供了两种启动模式:**子进程模式 (Child)** 和 **独立进程模式 (Indpendent)**。 +子进程模式中,开发者可以从主进程中启动所有的智能体服务器进程,而独立进程模式中,智能体服务器进程相对主进程来说是独立的,需要在对应的机器上启动智能体服务器进程。 -上述概念有些复杂,但是不用担心,对于应用开发者而言,它们仅仅在创建智能体阶段有微小的差别。下面我们介绍如何创建分布式智能体。 +上述概念有些复杂,但是不用担心,对于应用开发者而言,仅需将已有的智能体转化为对应的分布式版本,其余操作都和正常的单机版本完全一致。 -### 步骤1: 创建分布式智能体 +### 步骤1: 转化为分布式版本 -首先,开发者的智能体必须继承`agentscope.agents.AgentBase`类,`AgentBase`提供了`to_dist`方法将该Agent转化为其分布式版本。`to_dist`主要依靠以下的参数实现智能体分布式部署: - -- `host`: 用于部署智能体的机器IP地址,默认为`localhost`。 -- `port`: 智能体的RPC服务器端口,默认为`80`。 -- `launch_server`: 是否在本地启动RPC服务器,默认为`True`。 +AgentScope 中所有智能体都可以通过 {func}`to_dist` 方法转化为对应的分布式版本。 +但需要注意,你的智能体必须继承自 {class}`agentscope.agents.AgentBase` 类,因为是 `AgentBase` 提供了 `to_dist` 方法。 假设有两个智能体类`AgentA`和`AgentB`,它们都继承自 `AgentBase`。 -#### 主从模式 +```python +a = AgentA( + name="A" + # ... +) +b = AgentB( + name="B" + # ... +) +``` + +接下来我们将介绍如何将智能体转化到两种分布式模式。 + +#### 子进程模式 -主从模式中,由于所有智能体进程依赖于主进程,因此所有进程实际运行在一台机器上。 -我们可以在主进程中启动所有智能体进程,即默认参数`launch_server=True`和`host="localhost"`,同时我们可以省略`port`参数,AgentScope将会为智能体进程自动寻找空闲的本地端口。 +要使用该模式,你只需要调用各智能体的 `to_dist()` 方法,并且不需要提供任何参数。 +AgentScope 会自动帮你从主进程中启动智能体服务器进程并将智能体部署到对应的子进程上。 ```python +# Subprocess mode a = AgentA( name="A" # ... ).to_dist() +b = AgentB( + name="B" + # ... +).to_dist() ``` -#### 对等模式 +#### 独立进程模式 -对等模式中,我们需要首先在目标机器上启动对应智能体的服务,例如将`AgentA`的实例部署在IP为`a.b.c.d`的机器上,其对应的端口为12001。在这台目标机器上运行以下代码: +在独立进程模式中,需要首先在目标机器上启动智能体服务器进程。 +例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 +你可以先在 `Machine1` 上运行如下代码: ```python -from agentscope.agents import RpcAgentServerLauncher - -# 创建智能体服务进程 -server_a = RpcAgentServerLauncher( - agent_class=AgentA, - agent_kwargs={ - "name": "A" - ... - }, - host="a.b.c.d", - port=12001, +# import some packages + +agentscope.init( + ... ) -# 启动服务 -server_a.launch() -server_a.wait_until_terminate() +# Create an agent service process +server = RpcAgentServerLauncher( + host="ip_a", + port=12001, # choose an available port +) + +# Start the service +server.launch() +server.wait_until_terminate() ``` -然后,我们可以在主进程当中用以下的代码连接智能体服务,此时主进程中创建的对象`a`可以当做智能体的本地代理,允许开发者可以在主进程中采取中心化的方式编写应用流程。 +之后在 `Machine2` 上运行如下代码: + +```python +# import some packages + +agentscope.init( + ... +) +# Create an agent service process +server = RpcAgentServerLauncher( + host="ip_b", + port=12002, # choose an available port +) + +# Start the service +server.launch() +server.wait_until_terminate() +``` + +接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 ```python a = AgentA( name="A", - ... + # ... ).to_dist( - host="a.b.c.d", + host="ip_a", port=12001, - launch_server=False, +) +b = AgentB( + name="B", + # ... +).to_dist( + host="ip_b", + port=12002, +) +``` + +上述代码将会把 `AgentA` 部署到 `Machine1` 的智能体服务器进程上,并将 `AgentB` 部署到 `Machine2` 的智能体服务器进程上。 +开发者在这之后只需要用中心化的方法编排各智能体的交互逻辑即可。 + +#### `to_dist` 进阶用法 + +上面介绍的案例都是将一个已经初始化的 Agent 通过 {func}`to_dist` 方法转化为其分布式版本,相当于要执行两次初始化操作,一次在主进程中,一次在智能体进程中。如果 Agent 的初始化过程耗时较长,直接使用 `to_dist` 方法会严重影响运行效率。为此 AgentScope 也提供了在初始化 Agent 实例的同时将其转化为其分布式版本的方法,即在原 Agent 实例初始化时传入 `to_dist` 参数。 + +子进程模式下,只需要在 Agent 初始化函数中传入 `to_dist=True` 即可: + +```python +# Child Process mode +a = AgentA( + name="A", + # ... + to_dist=True +) +b = AgentB( + name="B", + # ... + to_dist=True +) +``` + +独立进程模式下, 则需要将原来 `to_dist()` 函数的参数以 {class}`DistConf` 实例的形式传入 Agent 初始化函数的 `to_dist` 域: + +```python +a = AgentA( + name="A", + # ... + to_dist=DistConf( + host="ip_a", + port=12001, + ), +) +b = AgentB( + name="B", + # ... + to_dist=DistConf( + host="ip_b", + port=12002, + ), ) ``` +相较于原有的 `to_dist()` 函数调用,该方法只会在智能体进程中初始化一次 Agent,避免了重复初始化现象。 + ### 步骤2: 编排分布式应用流程 在AgentScope中,分布式应用流程的编排和非分布式的程序完全一致,开发者可以用中心化的方式编写全部应用流程。 @@ -103,7 +190,9 @@ while x is None or x.content == "exit": x = b(x) ``` -- 智能体分布式部署(主从模式下): +- 智能体分布式部署 + - `AgentA` 使用子进程模式部署 + - `AgentB` 使用独立进程模式部署 ```python # 创建智能体对象 @@ -115,7 +204,10 @@ a = AgentA( b = AgentB( name="B", # ... -).to_dist() +).to_dist( + host="ip_b", + port=12002, +) # 应用流程编排 x = None @@ -148,9 +240,18 @@ D-->F #### PlaceHolder -同时,为了支持中心化的应用编排,AgentScope引入了Placeholder这一概念。Placeholder是一个特殊的消息,它包含了产生该Placeholder的智能体的地址和端口号,用于表示Agent的输入消息还未准备好。 -当Agent的输入消息准备好后,Placeholder会被替换为真实的消息,然后运行实际的`reply`方法 +同时,为了支持中心化的应用编排,AgentScope 引入了 {class}`Placeholder` 这一概念。 +Placeholder 可以理解为消息的指针,指向消息真正产生的位置,其对外接口与传统模式中的消息完全一致,因此可以按照传统中心化的消息使用方式编排应用。 +Placeholder 内部包含了该消息产生方的联络方法,可以通过网络获取到被指向消息的真正值。 +每个分布式部署的 Agent 在收到其他 Agent 发来的消息时都会立即返回一个 Placeholder,从而避免阻塞请求发起方。 +而请求发起方可以借助返回的 Placeholder 在真正需要消息内容时再去向原 Agent 发起请求,请求发起方甚至可以将 Placholder 发送给其他 Agent 让其他 Agent 代为获取消息内容,从而减少消息真实内容的不必要转发。 关于更加详细的技术实现方案,请参考我们的[论文](https://arxiv.org/abs/2402.14034)。 +#### Agent Server + +Agent Server 也就是智能体服务器。在 AgentScope 中,Agent Server 提供了一个让不同 Agent 实例运行的平台。多个不同类型的 Agent 可以运行在同一个 Agent Server 中并保持独立的记忆以及其他本地状态信息,但是他们将共享同一份计算资源。 +只要没有对代码进行修改,一个已经启动的 Agent Server 可以为多个主流程提供服务。 +这意味着在运行多个应用时,只需要在第一次运行前启动 Agent Server,后续这些 Agent Server 进程就可以持续复用。 + [[回到顶部]](#208-distribute-zh) diff --git a/examples/distributed_basic/distributed_dialog.py b/examples/distributed_basic/distributed_dialog.py index d3c99cfa5..e558c54fa 100644 --- a/examples/distributed_basic/distributed_dialog.py +++ b/examples/distributed_basic/distributed_dialog.py @@ -37,13 +37,6 @@ def setup_assistant_server(assistant_host: str, assistant_port: int) -> None: model_configs="configs/model_configs.json", ) assistant_server_launcher = RpcAgentServerLauncher( - agent_class=DialogAgent, - agent_kwargs={ - "name": "Assitant", - "sys_prompt": "You are a helpful assistant.", - "model_config_name": "qwen", - "use_memory": True, - }, host=assistant_host, port=assistant_port, ) @@ -64,7 +57,6 @@ def run_main_process(assistant_host: str, assistant_port: int) -> None: ).to_dist( host=assistant_host, port=assistant_port, - launch_server=False, ) user_agent = UserAgent( name="User", diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index f7bf35db6..dcb73b884 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -2,13 +2,11 @@ """ An example of distributed debate """ import argparse -import json from user_proxy_agent import UserProxyAgent import agentscope from agentscope.msghub import msghub -from agentscope.agents.dialog_agent import DialogAgent from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger @@ -76,29 +74,10 @@ def setup_server(parsed_args: argparse.Namespace) -> None: ) host = getattr(parsed_args, f"{parsed_args.role}_host") port = getattr(parsed_args, f"{parsed_args.role}_port") - if parsed_args.is_human: - agent_class = UserProxyAgent - config = {"name": parsed_args.role} - else: - with open( - "configs/debate_agent_configs.json", - "r", - encoding="utf-8", - ) as f: - configs = json.load(f) - configs = { - "pro": configs[0]["args"], - "con": configs[1]["args"], - "judge": configs[2]["args"], - } - config = configs[parsed_args.role] - agent_class = DialogAgent - server_launcher = RpcAgentServerLauncher( - agent_class=agent_class, - agent_kwargs=config, host=host, port=port, + custom_agents=[UserProxyAgent], ) server_launcher.launch(in_subprocess=False) server_launcher.wait_until_terminate() @@ -113,12 +92,10 @@ def run_main_process(parsed_args: argparse.Namespace) -> None: pro_agent = pro_agent.to_dist( host=parsed_args.pro_host, port=parsed_args.pro_port, - launch_server=False, ) con_agent = con_agent.to_dist( host=parsed_args.con_host, port=parsed_args.con_port, - launch_server=False, ) participants = [pro_agent, con_agent, judge_agent] announcements = [ diff --git a/examples/distributed_simulation/README.md b/examples/distributed_simulation/README.md new file mode 100644 index 000000000..50f20fe6c --- /dev/null +++ b/examples/distributed_simulation/README.md @@ -0,0 +1,103 @@ +# Distributed Large Scale Simulation + +> **WARNING:** +> **This example will consume a huge amount of tokens.** +> **Using paid model API with this example can introduce a high cost.** +> **Users with powerful GPUs (A800 or better) can use local inference services (such as vLLM) to run this example,** +> **while CPU inference services such as ollama is not recommended.** + +This example is a large scale simulation to demonstrate the scalability of AgentScope's distributed mode. From this example, you can learn: + +- How to run a large number of agent servers in a GPU cluster. +- How to connect to those agent servers and run a huge number of agents in them. + +> Based on this example, we deploy 64,000 agents evenly on 4 machines, and each machine has 64 CPU cores and 8 A100 GPUs. The running time is about 30s (excluding initialization time). + +## Background + +This example simulates the following scenario: + +A large number of people participate in a game in which the moderator asks each participant to provide a number between 0 and N. The moderator will calculate the average of all numbers and announce it. The person closest to the average will win. + +## Tested Models + +Only vLLM local inference service is tested for this example. + +This example will consume a huge amount of tokens. Please do not use model API that requires payment. + +## Prerequisites + +- The distribute version of AgentScope is installed +- Use MacOS or Linux (Windows requires some modifiations to the scripts) +- [Optional] Have multiple machines with powerful GPUs (A800 or better) and install [vLLM](https://github.com/vllm-project/vllm) + +## How to Run + +### Step 1: start local inference service + +> If you only have one machine and don't have a powerful GPU (A800 or better), you can ignore this step. + +You can use `start_vllm.sh` to start vllm inference services on each of your machines. +Before running the script, please set `gpu_num`, `model_path` and `base_port` properly. + +- `gpu_num`: number of GPUs for this machine. +- `model_path`: the model checkpoint path. +- `base_port`: The starting point of the port number used by the local inference services. + +For example, if `base_port` is `8010` and `gpu_num` is `4`, 4 inference services will be started, and the port numbers are `8010`, `8011`, `8012` and `8013` respectively. + +vLLM inference services start slowly, so you need to wait for these servers to actually start before proceeding to the next step. + +> The above configuration requires that the model checkpoint can be loaded by a single GPU. +> If you need to use a model that must be loaded by multiple GPUs, you need to modify the script. + +### Step 2: start agent server + +> If you only have one machine and don't have a powerful GPU, you can just use the default setting of the scripts. + +You can use `start_all_server.sh` to start multiple agent servers on each of your machine. +Before running the script, please set `base_port`, `host_name` and `moderator_num` properly. + +- `base_port`: The starting point of the port number used by the agent servers. +- `host_name`: The hostname of this machine, and must be accessible to other machines in the cluster (The default value `localhost` is only used for single machine scenario). +- `moderator_num`: Number of moderators. When the number of participants is large, this value needs to be expanded to avoid bottlenecks. + +After setting the above values correctly, you can use the script to start multiple agent server on your machine. The following command will start 10 agent servers on your machine with port numbers starting from `base_port` to `base_port + 9`, and will also start `moderator_num` agent servers for moderators with port numbers starting from `base_port + 10` to `base_port + moderator_num + 9`. + +```shell +#./start_all_server.sh +./start_all_server.sh 10 +``` + +If you have multiple machines, please make sure the `base_port` and `moderator_num` parameters are exactly the same on all machines, and start the same number of agent servers. + +### Step 3: run simulation + +You can use `run_simulation.sh` to start the simulation. +Before running the script, please set the following setting correctly: + +- `base_port`: the base port for agent servers, must be the same as used in Step 2. +- `hosts`: hostnames of all machines. If you only have one machine, use the default value `localhost`. +- `moderator_per_host`: Consistent with `moderator_num` in Step 2. +- `agent_type`: `random` or `llm`. Please use `random` if you don't have local inference service. +- `max_value`: The upper bound of numbers generated in the game. + +The command below will run a simulation with 1000 participant agents and evenly distributed those agents to the 10 agent servers started in Step 2. + +```shell +#./run_simulation.sh +./run_simulation.sh 10 1000 +``` + +The following is sample output from a single-machine (16 CPU cores) simulation scenario: + +```log +2024-04-16 10:31:53.786 | INFO | agentscope.models:read_model_configs:178 - Load configs for model wrapper: model_1, model_2, model_3, model_4, model_5, model_6, model_7, model_8 +2024-04-16 10:31:53.822 | INFO | agentscope.utils.monitor:_create_monitor_table:343 - Init [monitor_metrics] as the monitor table +2024-04-16 10:31:53.822 | INFO | agentscope.utils.monitor:_create_monitor_table:344 - Init [monitor_metrics_quota_exceeded] as the monitor trigger +2024-04-16 10:31:53.822 | INFO | agentscope.utils.monitor:__init__:313 - SqliteMonitor initialization completed at [./runs/run_20240416-103153_h0xuo5/agentscope.db] +2024-04-16 10:31:53.829 | INFO | __main__:run_main_process_new:106 - init 1000 random participant agents... +2024-04-16 10:31:53.829 | INFO | __main__:run_main_process_new:139 - init 4 moderator agents... +2024-04-16 10:31:54.211 | INFO | __main__:run_main_process_new:163 - [init takes 0.38274645805358887 s] +Moderator: The average value is 49.561 [takes 4.197571277618408 s] +``` diff --git a/examples/distributed_simulation/configs/model_configs.json b/examples/distributed_simulation/configs/model_configs.json new file mode 100644 index 000000000..327bdd76b --- /dev/null +++ b/examples/distributed_simulation/configs/model_configs.json @@ -0,0 +1,98 @@ +[ + { + "model_type": "openai_chat", + "config_name": "model_1", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8010/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_2", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8011/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_3", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8012/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_4", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8013/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_5", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8014/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_6", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8015/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_7", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8016/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + }, + { + "model_type": "openai_chat", + "config_name": "model_8", + "model_name": "path-to-your-model-dir", + "api_key": "EMPTY", + "client_args": { + "base_url": "http://127.0.0.1:8017/v1/" + }, + "generate_args": { + "temperature": 1.0 + } + } +] \ No newline at end of file diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py new file mode 100644 index 000000000..7fd0cf19b --- /dev/null +++ b/examples/distributed_simulation/main.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +""" A large-scale social simulation experiment """ + +import argparse +import time +from concurrent import futures +from concurrent.futures import as_completed +from loguru import logger + +from participant import Moderator, RandomParticipant, LLMParticipant + +import agentscope +from agentscope.agents import AgentBase +from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.message import Msg + + +def parse_args() -> argparse.Namespace: + """Parse arguments""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--role", + choices=["participant", "main"], + default="main", + ) + parser.add_argument( + "--agent-type", + choices=["random", "llm"], + default="random", + ) + parser.add_argument("--max-value", type=int, default=100) + parser.add_argument("--sleep-time", type=float, default=1.0) + parser.add_argument( + "--hosts", + type=str, + nargs="+", + default=["localhost"], + ) + parser.add_argument("--participant-num", type=int, default=100) + parser.add_argument("--base-port", type=int, default=12010) + parser.add_argument( + "--server-per-host", + type=int, + ) + parser.add_argument("--model-per-host", type=int, default=1) + parser.add_argument("--moderator-per-host", type=int, default=1) + return parser.parse_args() + + +def setup_participant_agent_server(host: str, port: int) -> None: + """Set up agent server""" + agentscope.init( + project="simulation", + name="server", + runtime_id=str(port), + save_code=False, + save_api_invoke=False, + model_configs="configs/model_configs.json", + use_monitor=False, + ) + assistant_server_launcher = RpcAgentServerLauncher( + host=host, + port=port, + max_pool_size=16384, + custom_agents=[Moderator, RandomParticipant, LLMParticipant], + ) + assistant_server_launcher.launch(in_subprocess=False) + assistant_server_launcher.wait_until_terminate() + + +def init_moderator( + name: str, + configs: list[dict], + host: str, + port: int, + agent_type: str, + max_value: int, + sleep_time: float, +) -> AgentBase: + """Init moderator""" + return Moderator( # pylint: disable=E1123 + name=name, + part_configs=configs, + agent_type=agent_type, + max_value=max_value, + sleep_time=sleep_time, + to_dist={ + "host": host, + "port": port, + }, + ) + + +def run_main_process( + hosts: list[str], + base_port: int, + server_per_host: int, + model_per_host: int, + participant_num: int, + moderator_per_host: int = 10, + agent_type: str = "random", + max_value: int = 100, + sleep_time: float = 1.0, +) -> None: + """Run main process""" + agentscope.init( + project="simulation", + name="main", + save_code=False, + save_api_invoke=False, + model_configs="configs/model_configs.json", + use_monitor=False, + ) + host_num = len(hosts) + total_agent_server_num = server_per_host * host_num + participant_per_agent_server = participant_num // total_agent_server_num + ist = time.time() + configs = [] + logger.info(f"init {participant_num} {agent_type} participant agents...") + # build init configs of participants + for i in range(participant_num): + idx = i // participant_per_agent_server + host_id = idx // server_per_host + port_id = idx % server_per_host + model_id = i % model_per_host + host = hosts[host_id] + port = base_port + port_id + config_name = f"model_{model_id + 1}" + if agent_type == "random": + configs.append( + { + "name": f"P{i}", + "host": host, + "port": port, + }, + ) + else: + configs.append( + { + "name": f"P{i}", + "model_config_name": config_name, + "host": host, + "port": port, + }, + ) + + mods = [] + moderator_num = moderator_per_host * host_num + participant_per_moderator = participant_num // moderator_num + tasks = [] + + logger.info(f"init {moderator_num} moderator agents...") + # init moderators + with futures.ThreadPoolExecutor(max_workers=None) as executor: + for i in range(moderator_num): + tasks.append( + executor.submit( + init_moderator, + name=f"mod_{i}", + configs=configs[ + i + * participant_per_moderator : (i + 1) # noqa + * participant_per_moderator + ], + host=hosts[i // moderator_per_host], + port=base_port + server_per_host + i % moderator_per_host, + agent_type=agent_type, + max_value=max_value, + sleep_time=sleep_time, + ), + ) + for task in as_completed(tasks): + mods.append(task.result()) + + iet = time.time() + logger.info(f"[init takes {iet - ist} s]") + + # run te + st = time.time() + results = [] + for p in mods: + results.append(p()) + summ = 0 + cnt = 0 + for r in results: + try: + summ += int(r["content"]["sum"]) + cnt += int(r["content"]["cnt"]) + except Exception: + logger.error(r["content"]) + et = time.time() + logger.chat( + Msg( + name="Moderator", + role="assistant", + content=f"The average value is {summ/cnt} [takes {et-st} s]", + ), + ) + + +if __name__ == "__main__": + args = parse_args() + if args.role == "participant": + setup_participant_agent_server(args.hosts[0], args.base_port) + elif args.role == "main": + run_main_process( + hosts=args.hosts, + base_port=args.base_port, + participant_num=args.participant_num, + server_per_host=args.server_per_host, + model_per_host=args.model_per_host, + moderator_per_host=args.moderator_per_host, + agent_type=args.agent_type, + sleep_time=args.sleep_time, + max_value=args.max_value, + ) diff --git a/examples/distributed_simulation/participant.py b/examples/distributed_simulation/participant.py new file mode 100644 index 000000000..dac3d17bf --- /dev/null +++ b/examples/distributed_simulation/participant.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +"""A general dialog agent.""" +import random +import time +import re +from loguru import logger + +from agentscope.message import Msg +from agentscope.agents import AgentBase + + +class RandomParticipant(AgentBase): + """A fake participant who generates number randomly.""" + + def __init__( + self, + name: str, + max_value: int = 100, + sleep_time: float = 1.0, + ) -> None: + """Initialize the participant.""" + super().__init__( + name=name, + ) + self.max_value = max_value + self.sleep_time = sleep_time + + def generate_random_response(self) -> str: + """generate a random int""" + time.sleep(self.sleep_time) + return str(random.randint(0, self.max_value)) + + def reply(self, x: dict = None) -> dict: + """Generate a random value""" + # generate a response in content + response = self.generate_random_response() + msg = Msg(self.name, content=response) + return msg + + +class LLMParticipant(AgentBase): + """A participant agent who generates number using LLM.""" + + def __init__( + self, + name: str, + model_config_name: str, + max_value: int = 100, + ) -> None: + """Initialize the participant.""" + super().__init__( + name=name, + model_config_name=model_config_name, + use_memory=True, + ) + self.max_value = max_value + self.prompt = Msg( + name="system", + role="system", + content="You are participating in a game where everyone " + f"provides a number between 0 and {max_value}. The person " + "closest to the average will win.", + ) + + def parse_value(self, txt: str) -> str: + """Parse the number from the response.""" + numbers = re.findall(r"\d+", txt) + if len(numbers) == 0: + logger.warning( + f"Fail to parse value from [{txt}], use " + f"{self.max_value // 2} instead.", + ) + return str(self.max_value // 2) + else: + return numbers[-1] + + def reply(self, x: dict = None) -> dict: + """Generate a value by LLM""" + if self.memory: + self.memory.add(x) + + # prepare prompt + prompt = self.model.format(self.prompt, self.memory.get_memory()) + + # call llm and generate response + response = self.model(prompt).text + + response = self.parse_value(response) + + msg = Msg(self.name, response, role="assistant") + + # Record the message in memory + if self.memory: + self.memory.add(msg) + + return msg + + +class Moderator(AgentBase): + """A Moderator to collect values from participants.""" + + def __init__( + self, + name: str, + part_configs: list[dict], + agent_type: str = "random", + max_value: int = 100, + sleep_time: float = 1.0, + ) -> None: + super().__init__(name) + self.max_value = max_value + if agent_type == "llm": + self.participants = [ + LLMParticipant( + name=config["name"], + model_config_name=config["model_config_name"], + max_value=max_value, + ).to_dist( + host=config["host"], + port=config["port"], + ) + for config in part_configs + ] + else: + self.participants = [ + RandomParticipant( + name=config["name"], + max_value=max_value, + sleep_time=sleep_time, + ).to_dist( + host=config["host"], + port=config["port"], + ) + for config in part_configs + ] + + def reply(self, x: dict = None) -> dict: + results = [] + msg = Msg( + name="moderator", + role="user", + content=f"Now give a number between 0 and {self.max_value}.", + ) + for p in self.participants: + results.append(p(msg)) + summ = 0 + for r in results: + try: + summ += int(r["content"]) + except Exception as e: + print(e) + return Msg( + name=self.name, + role="assistant", + content={"sum": summ, "cnt": len(self.participants)}, + ) diff --git a/examples/distributed_simulation/run_simlation.sh b/examples/distributed_simulation/run_simlation.sh new file mode 100755 index 000000000..6fac7c4c4 --- /dev/null +++ b/examples/distributed_simulation/run_simlation.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# default values +base_port=12330 +hosts="localhost" # or "server1 server2 server3 ..." +moderator_per_host=4 +model_per_host=8 +agent_type="random" # or "llm" +max_value=100 + +# check server-per-host +if ! [[ "$1" =~ ^[0-9]+$ ]]; then + echo "Usage: $0 " + exit 1 +fi + +# check participant-num +if ! [[ "$2" =~ ^[0-9]+$ ]]; then + echo "Usage: $0 " + exit 1 +fi + +mkdir -p log + +python main.py --role main --hosts ${hosts} --base-port ${base_port} --participant-num $2 --server-per-host $1 --model-per-host ${model_per_host} --moderator-per-host ${moderator_per_host} --agent-type ${agent_type} --max-value ${max_value} diff --git a/examples/distributed_simulation/start_all_server.sh b/examples/distributed_simulation/start_all_server.sh new file mode 100755 index 000000000..1c1f56aea --- /dev/null +++ b/examples/distributed_simulation/start_all_server.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# default values +base_port=12330 +host_name="localhost" +moderator_num=4 + +# get number of server +if ! [[ "$1" =~ ^[0-9]+$ ]]; then + echo "Usage: $0 " + exit 1 +fi + +participant_server_num=$1 + +# create files for pid +> .pid +# create log dir +mkdir -p log + +# start all agent servers +for ((i=0; i<(participant_server_num + moderator_num); i++)); do + port=$((base_port + i)) + python main.py --role participant --hosts ${host_name} --base-port ${port} > log/${port}.log 2>&1 & + echo $! >> .pid + echo "Started agent server on ${host_name}:${port} with PID $!" +done + +echo "All servers started" \ No newline at end of file diff --git a/examples/distributed_simulation/start_vllm.sh b/examples/distributed_simulation/start_vllm.sh new file mode 100755 index 000000000..11b92498c --- /dev/null +++ b/examples/distributed_simulation/start_vllm.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# default values +gpu_num=8 +model_path="path-to-your-model-dir" +base_port=8010 + +> .vllm_pid +mkdir -p log + +for ((i=0; i<8; i++)); do + port=$((base_port + i)) + export CUDA_VISIBLE_DEVICES=$i + python -m vllm.entrypoints.openai.api_server --model "${model_path}" --port ${port} --enforce-eager > log/vllm-${port}.log 2>&1 & + echo $! >> .vllm_pid + echo "Started vllm server on port ${port} with PID $!" +done + +echo "All vllm server started" \ No newline at end of file diff --git a/examples/distributed_simulation/stop_all_server.sh b/examples/distributed_simulation/stop_all_server.sh new file mode 100755 index 000000000..a9b72f72f --- /dev/null +++ b/examples/distributed_simulation/stop_all_server.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +if [ ! -f .pid ]; then + echo "PID file not found. Are the servers running?" + exit 1 +fi + +while read pid; do + kill -9 $pid + if [ $? -eq 0 ]; then + echo "Killed server with PID $pid" + else + echo "Failed to kill server with PID $pid" + fi +done < .pid + +rm .pid + +echo "All servers stopped." \ No newline at end of file diff --git a/examples/distributed_simulation/stop_vllm.sh b/examples/distributed_simulation/stop_vllm.sh new file mode 100755 index 000000000..eaefbcfe7 --- /dev/null +++ b/examples/distributed_simulation/stop_vllm.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +if [ ! -f .vllm_pid ]; then + echo "PID file not found. Are the servers running?" + exit 1 +fi + +while read pid; do + kill -9 $pid + if [ $? -eq 0 ]; then + echo "Killed vllm server with PID $pid" + else + echo "Failed to kill vllm server with PID $pid" + fi +done < .vllm_pid + +rm .vllm_pid + +echo "All vllm servers stopped." \ No newline at end of file diff --git a/setup.py b/setup.py index fc1f1a097..ba0293617 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ "grpcio-tools==1.60.0", "protobuf==4.25.0", "expiringdict", + "dill", ] service_requires = [ diff --git a/src/agentscope/_init.py b/src/agentscope/_init.py index eb249ec8e..7d1f44d7b 100644 --- a/src/agentscope/_init.py +++ b/src/agentscope/_init.py @@ -26,7 +26,9 @@ def init( save_log: bool = True, save_code: bool = True, save_api_invoke: bool = True, + use_monitor: bool = True, logger_level: LOG_LEVEL = _DEFAULT_LOG_LEVEL, + runtime_id: Optional[str] = None, agent_configs: Optional[Union[str, list, dict]] = None, ) -> Sequence[AgentBase]: """A unified entry to initialize the package, including model configs, @@ -40,6 +42,9 @@ def init( The project name, which is used to identify the project. name (`Optional[str]`, defaults to `None`): The name for runtime, which is used to identify this runtime. + runtime_id (`Optional[str]`, defaults to `None`): + The id for runtime, which is used to identify this runtime. Use + `None` will generate a random id. save_dir (`str`, defaults to `./runs`): The directory to save logs, files, codes, and api invocations. If `dir` is `None`, when saving logs, files, codes, and api @@ -51,6 +56,8 @@ def init( save_api_invoke (`bool`, defaults to `False`): Whether to save api invocations locally, including model and web search invocation. + use_monitor (`bool`, defaults to `True`): + Whether to activate the monitor. logger_level (`LOG_LEVEL`, defaults to `"INFO"`): The logging level of logger. agent_configs (`Optional[Union[str, list, dict]]`, defaults to `None`): @@ -63,9 +70,11 @@ def init( model_configs=model_configs, project=project, name=name, + runtime_id=runtime_id, save_dir=save_dir, save_api_invoke=save_api_invoke, save_log=save_log, + use_monitor=use_monitor, logger_level=logger_level, ) @@ -117,6 +126,7 @@ def init_process( save_dir: str = _DEFAULT_DIR, save_api_invoke: bool = False, save_log: bool = False, + use_monitor: bool = True, logger_level: LOG_LEVEL = _DEFAULT_LOG_LEVEL, ) -> None: """An entry to initialize the package in a process. @@ -139,17 +149,11 @@ def init_process( A sequence of pre-init model configs. save_log (`bool`, defaults to `False`): Whether to save logs locally. + use_monitor (`bool`, defaults to `True`): + Whether to activate the monitor. logger_level (`LOG_LEVEL`, defaults to `"INFO"`): The logging level of logger. """ - # Init logger - dir_log = str(file_manager.dir_log) if save_log else None - setup_logger(dir_log, logger_level) - - # Load model configs if needed - if model_configs is not None: - read_model_configs(model_configs) - # Init the runtime if project is not None: _runtime.project = project @@ -158,8 +162,19 @@ def init_process( if runtime_id is not None: _runtime.runtime_id = runtime_id + # Init logger + dir_log = str(file_manager.dir_log) if save_log else None + setup_logger(dir_log, logger_level) + + # Load model configs if needed + if model_configs is not None: + read_model_configs(model_configs) + # Init file manager and save configs by default file_manager.init(save_dir, save_api_invoke) # Init monitor - _ = MonitorFactory.get_monitor(db_path=file_manager.path_db) + _ = MonitorFactory.get_monitor( + db_path=file_manager.path_db, + impl_type="sqlite" if use_monitor else "dummy", + ) diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index c61fdbdc3..0d5f6f84d 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ Import all agent related modules in the package. """ -from .agent import AgentBase +from .agent import AgentBase, DistConf from .operator import Operator from .dialog_agent import DialogAgent from .dict_dialog_agent import DictDialogAgent @@ -18,6 +18,7 @@ "TextToImageAgent", "UserAgent", "ReActAgent", + "DistConf", "RpcAgent", "RpcAgentServerLauncher", ] diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index dc32dcd5c..bdf657df2 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -7,6 +7,7 @@ from typing import Sequence from typing import Union from typing import Any +from typing import Type import uuid from loguru import logger @@ -15,16 +16,116 @@ from agentscope.memory import TemporaryMemory -class _RecordInitSettingMeta(ABCMeta): - """A wrapper to record the init args into `_init_settings` field.""" +class _AgentMeta(ABCMeta): + """The meta-class for agent. + + 1. record the init args into `_init_settings` field. + 2. register class name into `registry` field. + """ + + def __init__(cls, name: Any, bases: Any, attrs: Any) -> None: + if not hasattr(cls, "registry"): + cls._registry = {} + else: + if name in cls._registry: + logger.warning( + f"Agent class with name [{name}] already exists.", + ) + else: + cls._registry[name] = cls + super().__init__(name, bases, attrs) def __call__(cls, *args: tuple, **kwargs: dict) -> Any: + to_dist = kwargs.pop("to_dist", False) + if to_dist is True: + to_dist = DistConf() + if to_dist is not False and to_dist is not None: + from .rpc_agent import RpcAgent + + if cls is not RpcAgent and not issubclass(cls, RpcAgent): + return RpcAgent( + name=( + args[0] + if len(args) > 0 + else kwargs["name"] # type: ignore[arg-type] + ), + host=to_dist.pop( # type: ignore[arg-type] + "host", + "localhost", + ), + port=to_dist.pop("port", None), # type: ignore[arg-type] + max_pool_size=kwargs.pop( # type: ignore[arg-type] + "max_pool_size", + 8192, + ), + max_timeout_seconds=to_dist.pop( # type: ignore[arg-type] + "max_timeout_seconds", + 1800, + ), + local_mode=to_dist.pop( # type: ignore[arg-type] + "local_mode", + True, + ), + lazy_launch=to_dist.pop( # type: ignore[arg-type] + "lazy_launch", + True, + ), + agent_id=cls.generate_agent_id(), + connect_existing=False, + agent_class=cls, + agent_configs={ + "args": args, + "kwargs": kwargs, + "class_name": cls.__name__, + }, + ) instance = super().__call__(*args, **kwargs) - instance._init_settings = {"args": args, "kwargs": kwargs} + instance._init_settings = { + "args": args, + "kwargs": kwargs, + "class_name": cls.__name__, + } return instance -class AgentBase(Operator, metaclass=_RecordInitSettingMeta): +class DistConf(dict): + """Distribution configuration for agents.""" + + def __init__( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + local_mode: bool = True, + lazy_launch: bool = True, + ): + """Init the distributed configuration. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `8192`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + local_mode (`bool`, defaults to `True`): + Whether the started rpc server only listens to local + requests. + lazy_launch (`bool`, defaults to `True`): + Only launch the server when the agent is called. + """ + self["host"] = host + self["port"] = port + self["max_pool_size"] = max_pool_size + self["max_timeout_seconds"] = max_timeout_seconds + self["local_mode"] = local_mode + self["lazy_launch"] = lazy_launch + + +class AgentBase(Operator, metaclass=_AgentMeta): """Base class for all agents. All agents should inherit from this class and implement the `reply` @@ -40,6 +141,7 @@ def __init__( model_config_name: str = None, use_memory: bool = True, memory_config: Optional[dict] = None, + to_dist: Optional[Union[DistConf, bool]] = False, ) -> None: r"""Initialize an agent from the given arguments. @@ -56,6 +158,31 @@ def __init__( Whether the agent has memory. memory_config (`Optional[dict]`): The config of memory. + to_dist (`Optional[Union[DistConf, bool]]`, default to `False`): + The configurations passed to :py:meth:`to_dist` method. Used in + :py:class:`_AgentMeta`, when this parameter is provided, + the agent will automatically be converted into its distributed + version. Below are some examples: + + .. code-block:: python + + # run as a sub process + agent = XXXAgent( + # ... other parameters + to_dist=True, + ) + + # connect to an existing agent server + agent = XXXAgent( + # ... other parameters + to_dist=DistConf( + host="", + port=, + # other parameters + ), + ) + + See :doc:`Tutorial` for detail. """ self.name = name self.memory_config = memory_config @@ -78,6 +205,12 @@ def __init__( # The audience of this agent, which means if this agent generates a # response, it will be passed to all agents in the audience. self._audience = None + # convert to distributed agent, conversion is in `_AgentMeta` + if to_dist is not False and to_dist is not None: + logger.info( + f"Convert {self.__class__.__name__}[{self.name}] into" + " a distributed agent.", + ) @classmethod def generate_agent_id(cls) -> str: @@ -85,6 +218,39 @@ def generate_agent_id(cls) -> str: # TODO: change cls.__name__ into a global unique agent_type return f"{cls.__name__}_{uuid.uuid4().hex}" + # todo: add a unique agent_type field to distinguish different agent class + @classmethod + def get_agent_class(cls, agent_class_name: str) -> Type[AgentBase]: + """Get the agent class based on the specific agent class name. + + Args: + agent_class_name (`str`): the name of the agent class. + + Raises: + ValueError: Agent class name not exits. + + Returns: + Type[AgentBase]: the AgentBase sub-class. + """ + if agent_class_name not in cls._registry: + raise ValueError(f"Agent [{agent_class_name}] not found.") + return cls._registry[agent_class_name] # type: ignore[return-value] + + @classmethod + def register_agent_class(cls, agent_class: Type[AgentBase]) -> None: + """Register the agent class into the registry. + + Args: + agent_class (Type[AgentBase]): the agent class to be registered. + """ + agent_class_name = agent_class.__name__ + if agent_class_name in cls._registry: + logger.warning( + f"Agent class with name [{agent_class_name}] already exists.", + ) + else: + cls._registry[agent_class_name] = agent_class + def reply(self, x: dict = None) -> dict: """Define the actions taken by this agent. @@ -206,9 +372,9 @@ def to_dist( port: int = None, max_pool_size: int = 8192, max_timeout_seconds: int = 1800, - launch_server: bool = True, local_mode: bool = True, lazy_launch: bool = True, + launch_server: bool = None, ) -> AgentBase: """Convert current agent instance into a distributed version. @@ -226,6 +392,9 @@ def to_dist( requests. lazy_launch (`bool`, defaults to `True`): Only launch the server when the agent is called. + launch_server(`bool`, defaults to `None`): + This field has been deprecated and will be removed in + future releases. Returns: `AgentBase`: the wrapped agent instance with distributed @@ -235,15 +404,20 @@ def to_dist( if issubclass(self.__class__, RpcAgent): return self + if launch_server is not None: + logger.warning( + "`launch_server` has been deprecated and will be removed in " + "future releases. When `host` and `port` is not provided, the " + "agent server will be launched automatically.", + ) return RpcAgent( + name=self.name, agent_class=self.__class__, agent_configs=self._init_settings, - name=self.name, host=host, port=port, max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, - launch_server=launch_server, local_mode=local_mode, lazy_launch=lazy_launch, agent_id=self.agent_id, diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 14b55d7bb..b7c3441bc 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -1,26 +1,27 @@ # -*- coding: utf-8 -*- """ Base class for Rpc Agent """ -from multiprocessing import Process, Event, Pipe, cpu_count +from multiprocessing import Process, Event, Pipe from multiprocessing.synchronize import Event as EventClass import socket import threading import json +import base64 import traceback -from typing import Any, Optional, Union, Type, Sequence +import asyncio +from typing import Any, Type, Optional, Union, Sequence from concurrent import futures from loguru import logger try: + import dill import grpc from grpc import ServicerContext + from expiringdict import ExpiringDict except ImportError: + dill = None grpc = None ServicerContext = Any - -try: - from expiringdict import ExpiringDict -except ImportError: ExpiringDict = None from agentscope._init import init_process, _INIT_SETTINGS @@ -63,32 +64,29 @@ class RpcAgent(AgentBase): def __init__( self, name: str, - agent_class: Type[AgentBase], - agent_configs: Optional[dict] = None, host: str = "localhost", port: int = None, - launch_server: bool = True, + agent_class: Type[AgentBase] = None, + agent_configs: Optional[dict] = None, max_pool_size: int = 8192, max_timeout_seconds: int = 1800, local_mode: bool = True, lazy_launch: bool = True, agent_id: str = None, - create_with_agent_configs: bool = True, + connect_existing: bool = False, ) -> None: """Initialize a RpcAgent instance. Args: - name (`str`): Name of the agent. - agent_class (`Type[AgentBase]`): - The AgentBase subclass encapsulated by this wrapper. - agent_configs (`dict`, defaults to `None`): The args used to - initialize the agent_class. - host (`str`, defaults to `"localhost"`): + name (`str`): the name of the agent. + host (`str`, defaults to `localhost`): Hostname of the rpc agent server. port (`int`, defaults to `None`): Port of the rpc agent server. - launch_server (`bool`, defaults to `True`): - Whether to launch the gRPC agent server. + agent_class (`Type[AgentBase]`): + the AgentBase subclass of the source agent. + agent_configs (`dict`): The args used to + initialize the agent, generated by `_AgentMeta`. max_pool_size (`int`, defaults to `8192`): Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): @@ -101,34 +99,31 @@ def __init__( agent_id (`str`, defaults to `None`): The agent id of this instance. If `None`, it will be generated randomly. - create_with_agent_configs (`bool`, defaults to `True`): - Only takes effect when `agent_configs` is provided. - If true, create the agent instance for the agent with - provided `agent_configs`, otherwise uses the agent server's - default parameters. + connect_existing (`bool`, defaults to `False`): + Set to `True`, if the agent is already running on the agent + server. """ super().__init__(name=name) + self.agent_class = agent_class + self.agent_configs = agent_configs self.host = host self.port = port self.server_launcher = None self.client = None + self.connect_existing = connect_existing if agent_id is not None: self._agent_id = agent_id - else: - self._agent_id = agent_class.generate_agent_id() - self.agent_class = agent_class + # if host and port are not provided, launch server locally + launch_server = port is None if launch_server: + self.host = "localhost" self.server_launcher = RpcAgentServerLauncher( - agent_class=agent_class, - agent_args=agent_configs["args"] if agent_configs else None, - agent_kwargs=( - agent_configs["kwargs"] if agent_configs else None - ), - host=host, + host=self.host, port=port, max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, local_mode=local_mode, + custom_agents=[agent_class], ) if not lazy_launch: self._launch_server() @@ -138,9 +133,8 @@ def __init__( port=self.port, agent_id=self.agent_id, ) - self.client.create_agent( - agent_configs if create_with_agent_configs else None, - ) + if not self.connect_existing: + self.client.create_agent(agent_configs) def _launch_server(self) -> None: """Launch a rpc server and update the port and the client""" @@ -151,6 +145,7 @@ def _launch_server(self) -> None: port=self.port, agent_id=self.agent_id, ) + self.client.create_agent(self.agent_configs) def reply(self, x: dict = None) -> dict: if self.client is None: @@ -203,14 +198,14 @@ def clone_instances( # clone instances without agent server for _ in range(generated_instance_number): + new_agent_id = self.client.call_func("_clone_agent") generated_instances.append( RpcAgent( name=self.name, - agent_class=self.agent_class, host=self.host, port=self.port, - launch_server=False, - create_with_agent_configs=False, + agent_id=new_agent_id, + connect_existing=True, ), ) return generated_instances @@ -225,9 +220,6 @@ def __del__(self) -> None: def setup_rpc_agent_server( - agent_class: Type[AgentBase], - agent_args: tuple, - agent_kwargs: dict, host: str, port: int, init_settings: dict = None, @@ -237,16 +229,11 @@ def setup_rpc_agent_server( local_mode: bool = True, max_pool_size: int = 8192, max_timeout_seconds: int = 1800, + custom_agents: list = None, ) -> None: """Setup gRPC server rpc agent. Args: - agent_class (`Type[AgentBase]`): - A subclass of AgentBase. - agent_args (`tuple`): The args tuple used to initialize the - agent_class. - agent_kwargs (`dict`): The args dict used to initialize the - agent_class. host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. port (`int`): @@ -267,36 +254,92 @@ def setup_rpc_agent_server( Max number of task results that the server can accommodate. max_timeout_seconds (`int`, defaults to `1800`): Timeout for task results. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. + """ + asyncio.run( + setup_rpc_agent_server_async( + host=host, + port=port, + init_settings=init_settings, + start_event=start_event, + stop_event=stop_event, + pipe=pipe, + local_mode=local_mode, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + custom_agents=custom_agents, + ), + ) + + +async def setup_rpc_agent_server_async( + host: str, + port: int, + init_settings: dict = None, + start_event: EventClass = None, + stop_event: EventClass = None, + pipe: int = None, + local_mode: bool = True, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + custom_agents: list = None, +) -> None: + """Setup gRPC server rpc agent in an async way. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the rpc agent server. + port (`int`): + The socket port monitored by grpc server. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. + start_event (`EventClass`, defaults to `None`): + An Event instance used to determine whether the child process + has been started. + stop_event (`EventClass`, defaults to `None`): + The stop Event instance used to determine whether the child + process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. + local_mode (`bool`, defaults to `None`): + Only listen to local requests. + max_pool_size (`int`, defaults to `8192`): + Max number of task results that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for task results. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. """ if init_settings is not None: init_process(**init_settings) - servicer = RpcServerSideWrapper( - agent_class, - agent_args, - agent_kwargs, + servicer = AgentPlatform( host=host, port=port, max_pool_size=max_pool_size, max_timeout_seconds=max_timeout_seconds, ) + # update agent registry + if custom_agents is not None: + for agent_class in custom_agents: + AgentBase.register_agent_class(agent_class=agent_class) while True: try: port = check_port(port) servicer.port = port logger.info( - f"Starting rpc server [{agent_class.__name__}] at port" - f" [{port}]...", + f"Starting rpc server at port [{port}]...", ) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=cpu_count()), + server = grpc.aio.server( + futures.ThreadPoolExecutor(max_workers=None), ) add_RpcAgentServicer_to_server(servicer, server) if local_mode: server.add_insecure_port(f"localhost:{port}") else: server.add_insecure_port(f"0.0.0.0:{port}") - server.start() + await server.start() break except OSError: logger.warning( @@ -304,22 +347,21 @@ def setup_rpc_agent_server( f"try another port", ) logger.info( - f"rpc server [{agent_class.__name__}] at port [{port}] started " - "successfully", + f"rpc server at port [{port}] started successfully", ) if start_event is not None: pipe.send(port) start_event.set() - stop_event.wait() + while not stop_event.is_set(): + await asyncio.sleep(1) logger.info( - f"Stopping rpc server [{agent_class.__name__}] at port [{port}]", + f"Stopping rpc server at port [{port}]", ) - server.stop(1.0).wait() + await server.stop(10.0) else: - server.wait_for_termination() + await server.wait_for_termination() logger.info( - f"rpc server [{agent_class.__name__}] at port [{port}] stopped " - "successfully", + f"rpc server at port [{port}] stopped successfully", ) @@ -360,28 +402,23 @@ def check_port(port: Optional[int] = None) -> int: class RpcAgentServerLauncher: - """Launcher of rpc agent server.""" + """The launcher of AgentPlatform (formerly RpcAgentServer).""" def __init__( self, - agent_class: Type[AgentBase] = None, - agent_args: tuple = (), - agent_kwargs: dict = None, host: str = "localhost", port: int = None, max_pool_size: int = 8192, max_timeout_seconds: int = 1800, local_mode: bool = False, + custom_agents: list = None, + agent_class: Type[AgentBase] = None, + agent_args: tuple = (), + agent_kwargs: dict = None, ) -> None: """Init a rpc agent server launcher. Args: - agent_class (`Type[AgentBase]`, defaults to `None`): - The AgentBase subclass encapsulated by this wrapper. - agent_args (`tuple`): The args tuple used to initialize the - agent_class. - agent_kwargs (`dict`): The args dict used to initialize the - agent_class. host (`str`, defaults to `"localhost"`): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -393,10 +430,16 @@ def __init__( local_mode (`bool`, defaults to `False`): Whether the started rpc server only listens to local requests. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in + `agentscope.agents`. + agent_class (`Type[AgentBase]`, deprecated): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`, deprecated): The args tuple used to + initialize the agent_class. + agent_kwargs (`dict`, deprecated): The args dict used to + initialize the agent_class. """ - self.agent_class = agent_class - self.agent_args = agent_args - self.agent_kwargs = agent_kwargs self.host = host self.port = check_port(port) self.max_pool_size = max_pool_size @@ -405,28 +448,32 @@ def __init__( self.server = None self.stop_event = None self.parent_con = None + self.custom_agents = custom_agents + if ( + agent_class is not None + or len(agent_args) > 0 + or agent_kwargs is not None + ): + logger.warning( + "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" + " in `RpcAgentServerLauncher`", + ) def _launch_in_main(self) -> None: """Launch gRPC server in main-process""" - server_thread = threading.Thread( - target=setup_rpc_agent_server, - kwargs={ - "agent_class": self.agent_class, - "agent_args": self.agent_args, - "agent_kwargs": self.agent_kwargs, - "host": self.host, - "port": self.port, - "max_pool_size": self.max_pool_size, - "max_timeout_seconds": self.max_timeout_seconds, - "local_mode": self.local_mode, - }, - ) - server_thread.start() logger.info( - f"Launch [{self.agent_class.__name__}] server at " - f"[{self.host}:{self.port}] success", + f"Launching agent server at [{self.host}:{self.port}]...", + ) + asyncio.run( + setup_rpc_agent_server_async( + host=self.host, + port=self.port, + max_pool_size=self.max_pool_size, + max_timeout_seconds=self.max_timeout_seconds, + local_mode=self.local_mode, + custom_agents=self.custom_agents, + ), ) - server_thread.join() def _launch_in_sub(self) -> None: """Launch gRPC server in sub-process.""" @@ -436,9 +483,6 @@ def _launch_in_sub(self) -> None: server_process = Process( target=setup_rpc_agent_server, kwargs={ - "agent_class": self.agent_class, - "agent_args": self.agent_args, - "agent_kwargs": self.agent_kwargs, "host": self.host, "port": self.port, "init_settings": _INIT_SETTINGS, @@ -448,6 +492,7 @@ def _launch_in_sub(self) -> None: "max_pool_size": self.max_pool_size, "max_timeout_seconds": self.max_timeout_seconds, "local_mode": self.local_mode, + "custom_agents": self.custom_agents, }, ) server_process.start() @@ -455,8 +500,7 @@ def _launch_in_sub(self) -> None: start_event.wait() self.server = server_process logger.info( - f"Launch [{self.agent_class.__name__}] server at " - f"[{self.host}:{self.port}] success", + f"Launch agent server at [{self.host}:{self.port}] success", ) def launch(self, in_subprocess: bool = True) -> None: @@ -487,34 +531,24 @@ def shutdown(self) -> None: if self.server.is_alive(): self.server.kill() logger.info( - f"Rpc server [{self.agent_class.__name__}] at port" - f" [{self.port}] is killed.", + f"Agent server at port [{self.port}] is killed.", ) self.server = None -class RpcServerSideWrapper(RpcAgentServicer): - """A wrapper to extend an AgentBase into a gRPC Servicer.""" +class AgentPlatform(RpcAgentServicer): + """A platform for agent to run on (formerly RpcServerSideWrapper)""" def __init__( self, - agent_class: Type[AgentBase], - agent_args: tuple, - agent_kwargs: dict, host: str = "localhost", port: int = None, max_pool_size: int = 8192, max_timeout_seconds: int = 1800, ): - """Init the service side wrapper. + """Init the AgentPlatform. Args: - agent_class (`Type[AgentBase]`): The AgentBase subclass - encapsulated by this wrapper. - agent_args (`tuple`): The args tuple used to initialize the - agent_class. - agent_kwargs (`dict`): The args dict used to initialize the - agent_class. host (`str`, defaults to "localhost"): Hostname of the rpc agent server. port (`int`, defaults to `None`): @@ -527,16 +561,13 @@ def __init__( Timeout for task results. Note that expired results will be deleted. """ - self.agent_class = agent_class - self.agent_args = agent_args - self.agent_kwargs = agent_kwargs self.host = host self.port = port self.result_pool = ExpiringDict( max_len=max_pool_size, max_age_seconds=max_timeout_seconds, ) - self.executor = futures.ThreadPoolExecutor(max_workers=cpu_count()) + self.executor = futures.ThreadPoolExecutor(max_workers=None) self.task_id_lock = threading.Lock() self.agent_id_lock = threading.Lock() self.task_id_counter = 0 @@ -548,10 +579,21 @@ def get_task_id(self) -> int: self.task_id_counter += 1 return self.task_id_counter + def agent_exists(self, agent_id: str) -> bool: + """Check whether the agent exists. + + Args: + agent_id (`str`): the agent id. + + Returns: + bool: whether the agent exists. + """ + return agent_id in self.agent_pool + def check_and_generate_agent( self, agent_id: str, - agent_configs: dict = None, + agent_configs: dict, ) -> None: """ Check whether the agent exists, and create new agent instance @@ -559,19 +601,25 @@ def check_and_generate_agent( Args: agent_id (`str`): the agent id. + agent_configs (`dict`): configuration used to initialize the agent, + with three fields (generated in `_AgentMeta`): + + .. code-block:: python + + { + "class_name": {name of the agent} + "args": {args in tuple type to init the agent} + "kwargs": {args in dict type to init the agent} + } + """ with self.agent_id_lock: if agent_id not in self.agent_pool: - if agent_configs is not None: - agent_instance = self.agent_class( - *agent_configs["args"], - **agent_configs["kwargs"], - ) - else: - agent_instance = self.agent_class( - *self.agent_args, - **self.agent_kwargs, - ) + agent_class_name = agent_configs["class_name"] + agent_instance = AgentBase.get_agent_class(agent_class_name)( + *agent_configs["args"], + **agent_configs["kwargs"], + ) agent_instance._agent_id = agent_id # pylint: disable=W0212 self.agent_pool[agent_id] = agent_instance logger.info(f"create agent instance [{agent_id}]") @@ -589,21 +637,26 @@ def check_and_delete_agent(self, agent_id: str) -> None: self.agent_pool.pop(agent_id) logger.info(f"delete agent instance [{agent_id}]") - def call_func(self, request: RpcMsg, _: ServicerContext) -> RpcMsg: + def call_func( # pylint: disable=W0236 + self, + request: RpcMsg, + context: ServicerContext, + ) -> RpcMsg: """Call the specific servicer function.""" if hasattr(self, request.target_func): if request.target_func not in ["_create_agent", "_get"]: - self.check_and_generate_agent(request.agent_id) + if not self.agent_exists(request.agent_id): + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Agent [{request.agent_id}] not exists.", + ) return getattr(self, request.target_func)(request) else: # TODO: support other user defined method logger.error(f"Unsupported method {request.target_func}") - return RpcMsg( - value=Msg( - name=self.agent_pool[request.agent_id].name, - content=f"Unsupported method {request.target_func}", - role="assistant", - ).serialize(), + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Unsupported method {request.target_func}", ) def _reply(self, request: RpcMsg) -> RpcMsg: @@ -687,10 +740,38 @@ def _create_agent(self, request: RpcMsg) -> RpcMsg: """ self.check_and_generate_agent( request.agent_id, - agent_configs=json.loads(request.value) if request.value else None, + agent_configs=( + dill.loads(base64.b64decode(request.value)) + if request.value + else None + ), ) return RpcMsg() + def _clone_agent(self, request: RpcMsg) -> RpcMsg: + """Clone a new agent instance from the origin instance. + + Args: + request (RpcMsg): The `agent_id` field is the agent_id of the + agent to be cloned. + + Returns: + `RpcMsg`: The `value` field contains the agent_id of generated + agent. + """ + agent_id = request.agent_id + with self.agent_id_lock: + if agent_id not in self.agent_pool: + raise ValueError(f"Agent [{agent_id}] not exists") + ori_agent = self.agent_pool[agent_id] + new_agent = ori_agent.__class__( + *ori_agent._init_settings["args"], # pylint: disable=W0212 + **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 + ) + with self.agent_id_lock: + self.agent_pool[new_agent.agent_id] = new_agent + return RpcMsg(value=new_agent.agent_id) + def _delete_agent(self, request: RpcMsg) -> RpcMsg: """Delete the agent instance of the specific sesssion_id. diff --git a/src/agentscope/message.py b/src/agentscope/message.py index 9327ca476..30f35fe61 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -286,7 +286,11 @@ def __init__( self._port: int = port self._task_id: int = task_id else: - self._stub = call_in_thread(client, x, "_reply") + self._stub = call_in_thread( + client, + x.serialize() if x is not None else "", + "_reply", + ) self._host = client.host self._port = client.port self._task_id = None @@ -344,7 +348,15 @@ def update_value(self) -> MessageBase: def __update_task_id(self) -> None: if self._stub is not None: - resp = deserialize(self._stub.get_response()) + try: + resp = deserialize(self._stub.get_response()) + except Exception as e: + logger.error( + f"Failed to get task_id: {self._stub.get_response()}", + ) + raise ValueError( + f"Failed to get task_id: {self._stub.get_response()}", + ) from e self._task_id = resp["task_id"] # type: ignore[call-overload] self._stub = None diff --git a/src/agentscope/rpc/rpc_agent_client.py b/src/agentscope/rpc/rpc_agent_client.py index 98b82a6d5..ab9f1a565 100644 --- a/src/agentscope/rpc/rpc_agent_client.py +++ b/src/agentscope/rpc/rpc_agent_client.py @@ -1,15 +1,19 @@ # -*- coding: utf-8 -*- """ Client of rpc agent server """ -import json import threading +import base64 from typing import Any, Optional from loguru import logger try: + import dill import grpc + from grpc import RpcError except ImportError: + dill = None grpc = None + RpcError = None try: from agentscope.rpc.rpc_agent_pb2 import RpcMsg # pylint: disable=E0611 @@ -63,18 +67,14 @@ def call_func( ) return result_msg.value - def create_agent(self, agent_configs: Optional[dict]) -> None: + def create_agent(self, agent_configs: dict) -> None: """Create a new agent for this client.""" try: if self.agent_id is None or len(self.agent_id) == 0: return self.call_func( - func_name="_create_agent", - value=( - None - if agent_configs is None - else json.dumps(agent_configs) - ), + "_create_agent", + base64.b64encode(dill.dumps(agent_configs)).decode("utf-8"), ) except Exception as e: logger.error( @@ -117,14 +117,14 @@ def get_response(self) -> str: def call_in_thread( client: RpcAgentClient, - x: dict, + value: str, func_name: str, ) -> ResponseStub: """Call rpc function in a sub-thread. Args: client (`RpcAgentClient`): the rpc client. - x (`dict`): the value of the reqeust. + x (`str`): the value of the reqeust. func_name (`str`): the name of the function being called. Returns: @@ -133,11 +133,15 @@ def call_in_thread( stub = ResponseStub() def wrapper() -> None: - resp = client.call_func( - func_name=func_name, - value=x.serialize() if x is not None else "", - ) - stub.set_response(resp) # type: ignore[arg-type] + try: + resp = client.call_func( + func_name=func_name, + value=value, + ) + stub.set_response(resp) # type: ignore[arg-type] + except RpcError as e: + logger.error(f"Fail to call {func_name} in thread: {e}") + stub.set_response(str(e)) thread = threading.Thread(target=wrapper) thread.start() diff --git a/src/agentscope/utils/monitor.py b/src/agentscope/utils/monitor.py index e1c9e98f4..08b1dc24b 100644 --- a/src/agentscope/utils/monitor.py +++ b/src/agentscope/utils/monitor.py @@ -288,6 +288,59 @@ def sqlite_cursor(db_path: str, timeout: float = 30.0) -> Generator: conn.close() +class DummyMonitor(MonitorBase): + """A monitor that does nothing""" + + def register( + self, + metric_name: str, + metric_unit: Optional[str] = None, + quota: Optional[float] = None, + ) -> bool: + return True + + def exists(self, metric_name: str) -> bool: + return True + + def add(self, metric_name: str, value: float) -> bool: + return True + + def update(self, values: dict, prefix: Optional[str] = None) -> None: + return None + + def clear(self, metric_name: str) -> bool: + return True + + def remove(self, metric_name: str) -> bool: + return True + + def get_value(self, metric_name: str) -> Optional[float]: + return 0.0 + + def get_unit(self, metric_name: str) -> Optional[str]: + return "" + + def get_quota(self, metric_name: str) -> Optional[float]: + return 0.0 + + def set_quota(self, metric_name: str, quota: float) -> bool: + return True + + def get_metric(self, metric_name: str) -> Optional[dict]: + return {} + + def get_metrics(self, filter_regex: Optional[str] = None) -> dict: + return {} + + def register_budget( + self, + model_name: str, + value: float, + prefix: Optional[str] = "local", + ) -> bool: + return True + + class SqliteMonitor(MonitorBase): """A monitor based on sqlite""" @@ -646,6 +699,8 @@ def get_monitor( if cls._instance is None: if impl_type is None or impl_type.lower() == "sqlite": cls._instance = SqliteMonitor(db_path=db_path) + elif impl_type == "dummy": + cls._instance = DummyMonitor() else: raise NotImplementedError( "Monitor with type [{type}] is not implemented.", diff --git a/tests/monitor_test.py b/tests/monitor_test.py index 63ade5f09..aa381927f 100644 --- a/tests/monitor_test.py +++ b/tests/monitor_test.py @@ -6,9 +6,12 @@ import unittest import uuid import os -from agentscope.utils import MonitorBase, QuotaExceededError, MonitorFactory +import shutil +from loguru import logger -from agentscope.utils.monitor import SqliteMonitor +import agentscope +from agentscope.utils import MonitorBase, QuotaExceededError, MonitorFactory +from agentscope.utils.monitor import SqliteMonitor, DummyMonitor class MonitorFactoryTest(unittest.TestCase): @@ -17,10 +20,10 @@ class MonitorFactoryTest(unittest.TestCase): def setUp(self) -> None: MonitorFactory._instance = None # pylint: disable=W0212 self.db_path = f"test-{uuid.uuid4()}.db" - _ = MonitorFactory.get_monitor(db_path=self.db_path) def test_get_monitor(self) -> None: """Test get monitor method of MonitorFactory.""" + _ = MonitorFactory.get_monitor(db_path=self.db_path) monitor1 = MonitorFactory.get_monitor() monitor2 = MonitorFactory.get_monitor() self.assertEqual(monitor1, monitor2) @@ -31,11 +34,59 @@ def test_get_monitor(self) -> None: self.assertTrue(monitor2.remove("token_num")) self.assertFalse(monitor1.exists("token_num")) + def test_monitor_type(self) -> None: + """Test get different type of monitor""" + monitor = MonitorFactory.get_monitor(impl_type="dummy") + self.assertTrue(isinstance(monitor, DummyMonitor)) + MonitorFactory._instance = None # pylint: disable=W0212 + monitor = MonitorFactory.get_monitor( + impl_type="sqlite", + db_path=self.db_path, + ) + self.assertTrue(isinstance(monitor, SqliteMonitor)) + def tearDown(self) -> None: MonitorFactory._instance = None # pylint: disable=W0212 os.remove(self.db_path) +class DummyMonitorTest(unittest.TestCase): + """Test class for DummyMonitor""" + + def setUp(self) -> None: + MonitorFactory._instance = None # pylint: disable=W0212 + agentscope.init( + project="test", + name="monitor", + save_dir="./test_runs", + save_log=True, + use_monitor=False, + ) + + def test_dummy_monitor(self) -> None: + """test dummy monitor""" + monitor = MonitorFactory.get_monitor() + self.assertTrue( + monitor.register_budget( + model_name="qwen", + value=100.0, + prefix="xxx", + ), + ) + self.assertTrue( + monitor.register( + "prompt_tokens", + metric_unit="token", + ), + ) + monitor.update({"call_counter": 1}) + + def tearDown(self) -> None: + MonitorFactory._instance = None # pylint: disable=W0212 + logger.remove() + shutil.rmtree("./test_runs") + + class MonitorTestBase(unittest.TestCase): """An abstract test class for MonitorBase interface""" diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index 0319cf204..d010587cd 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -8,7 +8,7 @@ from loguru import logger import agentscope -from agentscope.agents import AgentBase +from agentscope.agents import AgentBase, DistConf from agentscope.agents.rpc_agent import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage @@ -95,6 +95,55 @@ def reply(self, x: dict = None) -> dict: return x +class DemoGeneratorAgent(AgentBase): + """A demo agent to generate a number""" + + def __init__(self, name: str, value: int) -> None: + super().__init__(name) + self.value = value + + def reply(self, _: dict = None) -> dict: + time.sleep(1) + return Msg( + name=self.name, + role="assistant", + content={ + "value": self.value, + }, + ) + + +class DemoGatherAgent(AgentBase): + """A demo agent to gather value""" + + def __init__( + self, + name: str, + agents: list[DemoGeneratorAgent], + to_dist: dict = None, + ) -> None: + super().__init__(name, to_dist=to_dist) + self.agents = agents + + def reply(self, _: dict = None) -> dict: + result = [] + stime = time.time() + for agent in self.agents: + result.append(agent()) + value = 0 + for r in result: + value += r.content["value"] + etime = time.time() + return Msg( + name=self.name, + role="assistant", + content={ + "value": value, + "time": etime - stime, + }, + ) + + class DemoErrorAgent(AgentBase): """A demo Rpc agent that raise Error""" @@ -121,13 +170,9 @@ def tearDown(self) -> None: def test_single_rpc_agent_server(self) -> None: """test setup a single rpc agent""" - host = "localhost" - port = 12001 agent_a = DemoRpcAgent( name="a", - ).to_dist( - host=host, - port=port, + to_dist=True, ) self.assertIsNotNone(agent_a) msg = Msg( @@ -177,13 +222,10 @@ def test_connect_to_an_existing_rpc_server(self) -> None: """test connecting to an existing server""" launcher = RpcAgentServerLauncher( # choose port automatically - agent_class=DemoRpcAgent, - agent_kwargs={ - "name": "a", - }, - local_mode=False, host="127.0.0.1", port=12010, + local_mode=False, + custom_agents=[DemoRpcAgent], ) launcher.launch() agent_a = DemoRpcAgent( @@ -191,7 +233,6 @@ def test_connect_to_an_existing_rpc_server(self) -> None: ).to_dist( host="127.0.0.1", port=launcher.port, - launch_server=False, ) msg = Msg( name="System", @@ -226,29 +267,19 @@ def test_connect_to_an_existing_rpc_server(self) -> None: def test_multi_rpc_agent(self) -> None: """test setup multi rpc agent""" - host = "localhost" - port1 = 12001 - port2 = 12002 - port3 = 12003 agent_a = DemoRpcAgentAdd( name="a", ).to_dist( - host=host, - port=port1, lazy_launch=False, ) agent_b = DemoRpcAgentAdd( name="b", ).to_dist( - host=host, - port=port2, lazy_launch=False, ) agent_c = DemoRpcAgentAdd( name="c", ).to_dist( - host=host, - port=port3, lazy_launch=False, ) @@ -292,17 +323,9 @@ def test_multi_rpc_agent(self) -> None: def test_mix_rpc_agent_and_local_agent(self) -> None: """test to use local and rpc agent simultaneously""" - host = "localhost" - # use the same port, agents should choose available ports - # automatically - port1 = 12001 - port2 = 12001 - # rpc agent a agent_a = DemoRpcAgentAdd( name="a", ).to_dist( - host=host, - port=port1, lazy_launch=False, ) # local agent b @@ -310,12 +333,11 @@ def test_mix_rpc_agent_and_local_agent(self) -> None: name="b", ) # rpc agent c - agent_c = DemoRpcAgentAdd( + agent_c = DemoRpcAgentAdd( # pylint: disable=E1123 name="c", - ).to_dist( - host=host, - port=port2, - lazy_launch=False, + to_dist=DistConf( + lazy_launch=False, + ), ) msg = Msg( name="System", @@ -339,7 +361,8 @@ def test_msghub_compatibility(self) -> None: ).to_dist() agent_c = DemoRpcAgentWithMemory( name="c", - ).to_dist() + to_dist=True, + ) participants = [agent_a, agent_b, agent_c] annonuncement_msgs = [ Msg(name="System", content="Announcement 1", role="system"), @@ -368,24 +391,16 @@ def test_standalone_multiprocess_init(self) -> None: """test compatibility with agentscope.init""" monitor = MonitorFactory.get_monitor() monitor.register("msg_num", quota=10) - host = "localhost" - # automatically - port1 = 12001 - port2 = 12002 # rpc agent a agent_a = DemoRpcAgentWithMonitor( name="a", ).to_dist( - host=host, - port=port1, lazy_launch=False, ) # local agent b agent_b = DemoRpcAgentWithMonitor( name="b", ).to_dist( - host=host, - port=port2, lazy_launch=False, ) msg = Msg(name="System", content={"msg_num": 0}, role="system") @@ -403,17 +418,13 @@ def test_standalone_multiprocess_init(self) -> None: logger.chat(msg) self.assertTrue(msg["content"]["quota_exceeded"]) - def test_multi_agent(self) -> None: + def test_multi_agent_in_same_server(self) -> None: """test agent server with multi agent""" launcher = RpcAgentServerLauncher( - # choose port automatically - agent_class=DemoRpcAgentWithMemory, - agent_kwargs={ - "name": "a", - }, - local_mode=False, host="127.0.0.1", port=12010, + local_mode=False, + custom_agents=[DemoRpcAgentWithMemory], ) launcher.launch() # although agent1 and agent2 connect to the same server @@ -425,16 +436,15 @@ def test_multi_agent(self) -> None: agent1 = agent1.to_dist( host="127.0.0.1", port=launcher.port, - launch_server=False, ) self.assertEqual(oid, agent1.agent_id) self.assertEqual(oid, agent1.client.agent_id) - agent2 = DemoRpcAgentWithMemory( + agent2 = DemoRpcAgentWithMemory( # pylint: disable=E1123 name="a", - ).to_dist( - host="127.0.0.1", - port=launcher.port, - launch_server=False, + to_dist={ + "host": "127.0.0.1", + "port": launcher.port, + }, ) # agent3 has the same agent id as agent1 # so it share the same memory with agent1 @@ -443,7 +453,6 @@ def test_multi_agent(self) -> None: ).to_dist( host="127.0.0.1", port=launcher.port, - launch_server=False, ) agent3._agent_id = agent1.agent_id # pylint: disable=W0212 agent3.client.agent_id = agent1.client.agent_id @@ -463,7 +472,7 @@ def test_multi_agent(self) -> None: agent2.client.delete_agent() msg2 = Msg(name="System", content="First Msg for agent2") res2 = agent2(msg2) - self.assertEqual(res2.content["mem_size"], 1) + self.assertRaises(ValueError, res2.__getattr__, "content") # should override remote default parameter(e.g. name field) agent4 = DemoRpcAgentWithMemory( @@ -471,7 +480,6 @@ def test_multi_agent(self) -> None: ).to_dist( host="127.0.0.1", port=launcher.port, - launch_server=False, ) msg5 = Msg(name="System", content="Second Msg for agent4") res5 = agent4(msg5) @@ -523,9 +531,76 @@ def test_clone_instances(self) -> None: self.assertNotEqual(agent4.agent_id, agent.agent_id) self.assertIsNone(agent3.server_launcher) self.assertIsNone(agent4.server_launcher) + msg3 = Msg(name="System", content="First Msg for agent3") + res3 = agent3(msg3) + self.assertEqual(res1.content["mem_size"], 1) + msg4 = Msg(name="System", content="First Msg for agent4") + res4 = agent4(msg4) + self.assertEqual(res3.content["mem_size"], 1) + self.assertEqual(res4.content["mem_size"], 1) def test_error_handling(self) -> None: """Test error handling""" agent = DemoErrorAgent(name="a").to_dist() x = agent() self.assertRaises(RuntimeError, x.__getattr__, "content") + + def test_agent_nesting(self) -> None: + """Test agent nesting""" + host = "localhost" + launcher1 = RpcAgentServerLauncher( + # choose port automatically + host=host, + port=12010, + local_mode=False, + custom_agents=[DemoGatherAgent, DemoGeneratorAgent], + ) + launcher2 = RpcAgentServerLauncher( + # choose port automatically + host=host, + port=12011, + local_mode=False, + custom_agents=[DemoGatherAgent, DemoGeneratorAgent], + ) + launcher1.launch() + launcher2.launch() + agents = [] + for i in range(8): + if i % 2: + agents.append( + DemoGeneratorAgent(name=f"a_{i}", value=i).to_dist( + host=host, + port=launcher1.port, + ), + ) + else: + agents.append( + DemoGeneratorAgent(name=f"a_{i}", value=i).to_dist( + host=host, + port=launcher2.port, + ), + ) + gather1 = DemoGatherAgent( # pylint: disable=E1123 + name="g1", + agents=agents[:4], + to_dist=DistConf( + host=host, + port=launcher1.port, + ), + ) + gather2 = DemoGatherAgent( # pylint: disable=E1123 + name="g2", + agents=agents[4:], + to_dist={ + "host": host, + "port": launcher2.port, + }, + ) + r1 = gather1() + r2 = gather2() + self.assertEqual(r1.content["value"], 6) + self.assertEqual(r2.content["value"], 22) + self.assertTrue(0.5 < r1.content["time"] < 2) + self.assertTrue(0.5 < r2.content["time"] < 2) + launcher1.shutdown() + launcher2.shutdown()