From d0f475bee761dc0594527227f29eb0aea12939fe Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Mon, 21 Oct 2024 11:45:01 +0800 Subject: [PATCH 1/9] Create di_multi_agent.py update update update updated Update di_multiagent.py update update Create README.md Update README.md undo unnecessary changes undo unnecessary changes Update .pre-commit-config.yaml Update .pre-commit-config.yaml Update common.py remove changes in other files Update README.md update updated README and directory name remove Update di_multiagent.py --- .flake8 | 5 +- .../data_interpreter_mulit-agent/README.md | 156 +++++++ .../data_interpreter_mulit-agent/di_agents.py | 402 ++++++++++++++++ .../di_multiagent.py | 433 ++++++++++++++++++ 4 files changed, 995 insertions(+), 1 deletion(-) create mode 100644 examples/data_interpreter_mulit-agent/README.md create mode 100644 examples/data_interpreter_mulit-agent/di_agents.py create mode 100644 examples/data_interpreter_mulit-agent/di_multiagent.py diff --git a/.flake8 b/.flake8 index b44183ed2..1e233f4da 100644 --- a/.flake8 +++ b/.flake8 @@ -9,4 +9,7 @@ ignore = F401 F403 W503 - E731 \ No newline at end of file + E731 +per-file-ignores = + examples/data_interpreter_mulit-agent/di_agents.py:E501 + examples/data_interpreter_mulit-agent/di_multiagent.py:E501 \ No newline at end of file diff --git a/examples/data_interpreter_mulit-agent/README.md b/examples/data_interpreter_mulit-agent/README.md new file mode 100644 index 000000000..6675ec29f --- /dev/null +++ b/examples/data_interpreter_mulit-agent/README.md @@ -0,0 +1,156 @@ +# Multi-Agent Pipeline for Complex Task Solving + +This example will show: + +- How to decompose a complex task into manageable subtasks using a Planner Agent. +- How to iteratively solve, verify, and replan subtasks using Solver, Verifier, and Replanning Agents. +- How to synthesize subtask results into a final answer using a Synthesizer Agent. + +## Background + +In complex problem-solving, it's often necessary to break down tasks into smaller, more manageable subtasks. A multi-agent system can handle this by assigning specialized agents to different aspects of the problem-solving process. This example demonstrates how to implement such a pipeline using specialized agents for planning, solving, verifying, replanning, and synthesizing tasks. + +The pipeline consists of the following agents: + +- **PlannerAgent**: Decomposes the overall task into subtasks. +- **SolverAgent** (using `ReActAgent`): Solves each subtask. +- **VerifierAgent**: Verifies the solutions to each subtask. +- **ReplanningAgent**: Replans or decomposes subtasks if verification fails. +- **SynthesizerAgent**: Synthesizes the results of all subtasks into a final answer. + +By orchestrating these agents, the system can handle complex tasks that require iterative processing and dynamic adjustment based on intermediate results. + +## Tested Models + +These models are tested in this example. For other models, some modifications may be needed. + +- **Anthropic Claude:** `claude-3-5-sonnet-20240620`, `claude-3-5-sonnet-20241022`, `claude-3-5-haiku-20241022` (accessed via the `litellm` package configuration). +- **OpenAI:** `GPT4-o`, `GPT4-o-mini`. +- **DashScope:** `qwen-max`, `qwen-max-1201`. + +## Prerequisites + +To run this example, you need: + +- **Agentscope** package installed: + + ```bash + pip install agentscope + ``` + +- **Environment Variables**: Set up the following environment variables with your API keys. This can be done in a `.env` file or directly in your environment. + + - `OPENAI_API_KEY` (if using OpenAI models) + - `DASHSCOPE_API_KEY` (if using DashScope models) + - `ANTHROPIC_API_KEY` (required for using Claude models via `litellm`) + +- **Code Execution Environment**: Modify the code execution restrictions in Agentscope to allow the necessary operations for your tasks. Specifically, comment out the following `os` functions and `sys` modules in the `os_funcs_to_disable` and `sys_modules_to_disable` lists located in: + + ```plaintext + src/agentscope/service/execute_code/exec_python.py + ``` + + **Comment out these `os` functions in `os_funcs_to_disable`:** + + - `putenv` + - `remove` + - `unlink` + - `getcwd` + - `chdir` + + **Comment out these modules in `sys_modules_to_disable`:** + + - `joblib` + + This step enables the executed code by the agents to perform required operations that are otherwise restricted by default. Ensure you understand the security implications of modifying these restrictions. + +- Comment out the following in `src/agentscope/utils/common.py`: + ```python + @contextlib.contextmanager + def create_tempdir() -> Generator: + """ + A context manager that creates a temporary directory and changes the + current working directory to it. + The implementation of this contextmanager are borrowed from + https://github.com/openai/human-eval/blob/master/human_eval/execution.py + """ + with tempfile.TemporaryDirectory() as dirname: + with _chdir(dirname): + yield dirname + ``` + + and add + ```python + @contextlib.contextmanager + def create_tempdir() -> Generator: + """ + A context manager that uses the curreny directory. + """ + yield + ``` + to use the current directory for code execution. + +- **Optional Packages** (if needed for specific tools or extended functionalities): + + - `litellm` for interacting with the Claude model. + + ```bash + pip install litellm + ``` + + - Additional Python libraries as required by your code (e.g., `csv`, `dotenv`). + +Ensure that you have the necessary API access and that your environment is correctly configured to use the specified models. + +## Examples + +This section demonstrates the pipeline's effectiveness on two different complex tasks. For your own task, replace `"Your task description here."` with your task in `input_task` in `di_multiagent.py` script. + +### Task 1: Mathematical Problem Solving + +**Problem**: Solve this math problem: The greatest common divisor of positive integers m and n is 6. The least common multiple of m and n is 126. What is the least possible value of m + n? + +**Solution Output**: +``` +Based on the results of the subtasks, we can synthesize the solution to the overall task as follows: + +1. Functions for calculating GCD and LCM were defined and saved. +2. Possible pairs of m and n that satisfy the conditions (GCD = 6 and LCM = 126) were found to be (6, 126) and (18, 42). +3. The least possible value of m + n was calculated. + +The answer to the overall task is: + +The least possible value of m + n is 60, where m = 18 and n = 42. + +This solution satisfies all the given conditions: +- The greatest common divisor of m and n is 6. +- The least common multiple of m and n is 126. +- The sum of m and n (18 + 42 = 60) is the least possible value among the valid pairs. +``` + +### Task 2: Titanic Survival Prediction + +**Problem**: Predict passenger survival outcomes using the Titanic dataset. Perform data analysis, preprocessing, feature engineering, and modeling. Report accuracy on the evaluation data. + +**Solution Output**: +``` +The Titanic passenger survival prediction task has been successfully completed. Here's a summary of the process and results: + +1. Data Analysis: + - The training dataset contained 712 entries with 12 columns. + - The target variable 'Survived' had a 37.5% overall survival rate. + - Key factors influencing survival included Sex (females had a higher survival rate) and Passenger Class. + +2. Data Preprocessing and Feature Engineering: + - Missing values were handled through imputation or dropping columns. + - New features were created, including 'Title' and 'FamilySize'. + - Categorical variables were encoded, and unnecessary columns were dropped. + - The final preprocessed dataset had 712 samples and 10 features. + +3. Modeling: + - Three models were trained and evaluated: Logistic Regression, Random Forest, and Gradient Boosting. + - Gradient Boosting performed the best in cross-validation with an accuracy of 0.8160. + +4. Final Evaluation: + - The best model (Gradient Boosting) was used to make predictions on the evaluation dataset. + - The final accuracy on the evaluation data (179 samples) was 0.8212 (82.12%). \ No newline at end of file diff --git a/examples/data_interpreter_mulit-agent/di_agents.py b/examples/data_interpreter_mulit-agent/di_agents.py new file mode 100644 index 000000000..1a0dfe984 --- /dev/null +++ b/examples/data_interpreter_mulit-agent/di_agents.py @@ -0,0 +1,402 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0301 +""" +This script defines all the agents used in the data interpreter pipeline. +""" +import os +import csv +from typing import Any, Dict, List, Tuple, Optional, Union, Sequence +from agentscope.agents import ReActAgent +from agentscope.agents.agent import AgentBase +from agentscope.message import Msg +from agentscope.models import ModelResponse +from agentscope.parsers.json_object_parser import MarkdownJsonObjectParser +from agentscope.service import ServiceToolkit + +from agentscope.service.service_response import ServiceResponse +from agentscope.service.service_status import ServiceExecStatus + + +def read_csv_file(file_path: str) -> ServiceResponse: + """ + Read and parse a CSV file. + + Args: + file_path (`str`): + The path to the CSV file to be read. + + Returns: + `ServiceResponse`: Where the boolean indicates success, the + Any is the parsed CSV content (typically a list of rows), and + the str contains an error message if any, including the error type. + """ + try: + with open(file_path, "r", encoding="utf-8") as file: + reader = csv.reader(file) + data: List[List[str]] = list(reader) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content=data, + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + + +def write_csv_file( + file_path: str, + data: List[List[Any]], + overwrite: bool = False, +) -> ServiceResponse: + """ + Write data to a CSV file. + + Args: + file_path (`str`): + The path to the file where the CSV data will be written. + data (`List[List[Any]]`): + The data to write to the CSV file + (each inner list represents a row). + overwrite (`bool`): + Whether to overwrite the file if it already exists. + + Returns: + `ServiceResponse`: where the boolean indicates success, and the + str contains an error message if any, including the error type. + """ + if not overwrite and os.path.exists(file_path): + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content="FileExistsError: The file already exists.", + ) + try: + with open(file_path, "w", encoding="utf-8", newline="") as file: + writer = csv.writer(file) + writer.writerows(data) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content="Success", + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + + +class PlannerAgent(AgentBase): + """ + PlannerAgent is responsible for decomposing complex tasks into manageable + subtasks. + + This agent takes an overall task and breaks it down into subtasks that can + be solved using available tools or code execution. It ensures that each + subtask is appropriately sized and prioritizes using tools over code + execution when possible. + """ + + def __init__( + self, + name: str, + sys_prompt: str, + model_config_name: str, + service_toolkit: ServiceToolkit, + ): + super().__init__( + name=name, + sys_prompt=sys_prompt, + model_config_name=model_config_name, + ) + self.service_toolkit = service_toolkit + + def __call__(self, x: Msg) -> List[Dict[str, Any]]: + return self.plan(x) + + def plan(self, x: Msg) -> List[Dict[str, Any]]: + """ + Decompose the task provided in the message into subtasks. + + Args: + x (Msg): Message containing the task to be decomposed. + + Returns: + List[Dict[str, Any]]: List of subtasks as dictionaries. + """ + messages = x.content + subtasks = self._decompose_task(messages) + return subtasks + + def _decompose_task( + self, + task: str, + max_tasks: int = 5, + ) -> List[Dict[str, Any]]: + # Implement task decomposition + message = [ + { + "role": "user", + "content": f""" + Task: {task} + - Given the task above, break it down into subtasks with dependencies if it is sufficently complex to be solved in one go. + - Every subtask should be solvable through either executing code or using tools. The information of all the tools available are here: + {self.service_toolkit.tools_instruction} + - The subtask should not be too simple. If a task can be solve with a single block of code in one go, it should not be broken down further. Example: a subtask cannot be simply installing or importing libraries. + - Prioritze using other tools over `execute_python_code` and take the tools available into consideration when decomposing the task. Provide a JSON structure with the following format for the decomposition: + ```json + [ + {{ + "task_id": str = "unique identifier for a task in plan, can be an ordinal", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + "task_type": "type of this task, should be one of Available Task Types", + "task_type": "type of this task, should be one of Available Task Types", + "tool_info": "recommended tool(s)' name(s) for solving this task", + }}, + ... + ] + ``` + - The maximum number of subtasks allowed is {max_tasks}. + """, + }, + ] + + response_text: str = self.model(message).text.strip() + response = ModelResponse(text=response_text) + parser = MarkdownJsonObjectParser() + parsed_response: List[Dict[str, Any]] = parser.parse(response) + return parsed_response.parsed + + +class VerifierAgent(ReActAgent): + """ + VerifierAgent verifies if a given result successfully solves a subtask. + + This agent checks the result of a subtask execution to ensure it meets the + requirements of the current subtask. It uses reasoning and available tools + to perform the verification and reports whether the subtask is solved. + """ + + def __init__( + self, + name: str, + sys_prompt: str, + model_config_name: str, + service_toolkit: ServiceToolkit, + *, + max_iters: int = 10, + verbose: bool = True, + ): + super().__init__( + name=name, + sys_prompt=sys_prompt, + model_config_name=model_config_name, + service_toolkit=service_toolkit, + max_iters=max_iters, + verbose=verbose, + ) + + def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: + Verification_PROMPT = """- Given `overall_task` and `solved_dependent_sub_tasks` as context, verify if the information in `result` can succesfully solve `current_sub_task` with your reasoning trace. + - If you think code or tools are helpful for verification, use `execute_python_code` and/or other tools available to do verification. + - Do not simply trust the claim in `result`. VERIFY IT. + - If the information in `result` cannot solve `current_sub_task`, Do NOT attempt to fix it. Report it IMMEDIATELY. You job is just to do the verification. + - If the given result can succesfully solve `current_sub_task`, ALWAYS output 'True' at the very end of your response; otherwise, output why the given result cannot succesfully solve `current_sub_task` and followed by 'False'. + - DO NOT call `finish` before the entire verification process is completed. After the entire verification is completed, use `finish` tool IMMEDIATELY.""" + + msg = Msg( + name="Verifier", + role="system", + content=Verification_PROMPT + x.content, + ) + verdict = super().reply(msg) + + return verdict + + +class SynthesizerAgent(AgentBase): + """ + SynthesizerAgent combines the results of all subtasks to produce the final + answer. + + This agent takes the overall task and the results of each solved subtask, + synthesizes them, and generates a comprehensive answer for the overall task. + """ + + def __init__( + self, + name: str, + sys_prompt: str, + model_config_name: str, + ): + super().__init__( + name=name, + sys_prompt=sys_prompt, + model_config_name=model_config_name, + ) + + def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: + Synthesize_PROMPT = """Given `overall_task` and all solved `subtasks`, synthesize the result of each tasks and give a answer for `overall_task`.""" + + message = [ + { + "role": "user", + "content": Synthesize_PROMPT + " " + x.content, + }, + ] + final_answer_str: str = self.model(message).text.strip() + final_msg: Msg = Msg( + name=self.name, + role="assistant", + content=final_answer_str, + ) + return final_msg + + +class ReplanningAgent(AgentBase): + """ + ReplanningAgent handles replanning when a subtask cannot be solved as is. + + This agent decides whether to substitute an unsolvable subtask with a new one + or to further decompose it into smaller subtasks. It then provides the + updated plan or decomposition to continue solving the overall task. + """ + + def __init__( + self, + name: str, + sys_prompt: str, + model_config_name: str, + service_toolkit: ServiceToolkit, + ): + super().__init__( + name=name, + sys_prompt=sys_prompt, + model_config_name=model_config_name, + ) + self.service_toolkit = service_toolkit + + def __call__(self, x: Msg) -> Tuple[str, Any]: + return self.replan(x) + + def replan(self, x: Msg) -> Tuple[str, Any]: + """ + Decide to replan or decompose a subtask based on the given message. + + Args: + x (Msg): Message containing the current task and context. + + Returns: + Tuple[str, Any]: ('replan_subtask', new_tasks) or + ('decompose_subtask', subtasks), depending on the decision. + + Raises: + ValueError: If unable to determine how to revise the subtask. + """ + + task = x.content + revising_PROMPT = """Based on `overall_task` and all solved `subtasks`, and the `VERDICT`, decide if it is better to : + 1. come out with another subtask in place of `current_sub_task` if you think the reason `current_sub_task` is unsolvable is it is infeasible to solve; + 2. further break `current_sub_task` into more subtasks if you think the reason `current_sub_task` is unsolvable is it is still too complex. + If it is better to do '1', output 'replan_subtask'. If it is better to do '2', output 'decompose_subtask'.""" + message = [ + { + "role": "user", + "content": revising_PROMPT + " " + task, + }, + ] + option = self.model(message).text.strip() + print("replanning option: ", option) + if "replan_subtask" in option: + new_tasks = self._replanning(task) + return ("replan_subtask", new_tasks) + elif "decompose_subtask" in option: + subtasks = self._decompose_task(task) + return ("decompose_subtask", subtasks) + else: + raise ValueError("Not clear how to revise subtask.") + + def _replanning(self, task: str) -> List[Dict[str, Any]]: + replanning_PROMPT = f"""Based on `overall_task` and all solved `subtasks`, and the `VERDICT`: + 1. Substitute `current_sub_task` with a new `current_sub_task` in order to better achieve `overall_task`. + 2. Modify all substasks that have dependency on `current_sub_task` based on the new `current_sub_task` if needed. + 3. Follow the format below to list your revised subtasks, including the solved subtasks: + ```json + [ + {{ + "task_id": str = "unique identifier for a task in plan, can be an ordinal", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + "task_type": "type of this task, should be one of Available Task Types", + "task_type": "type of this task, should be one of Available Task Types", + "tool_info": "recommended tool(s)' name(s) for solving this task", + }}, + ... + ] + ``` + 4. Every new task/subtask should be solvable through either executing code or using tools. The information of all the tools available are here: + {self.service_toolkit.tools_instruction} """ + message = [ + { + "role": "user", + "content": replanning_PROMPT + " " + task, + }, + ] + response_text: str = self.model(message).text.strip() + response = ModelResponse(text=response_text) + parser = MarkdownJsonObjectParser() + parsed_response: List[Dict[str, Any]] = parser.parse(response) + return parsed_response.parsed + + def _decompose_task( + self, + task: str, + max_tasks: int = 5, + ) -> List[Dict[str, Any]]: + """ + Decompose a complex subtask into smaller, more manageable subtasks. + + Args: + task (str): The task to be decomposed + max_tasks (int, optional): Maximum number of subtasks allowed. Defaults to 5. + + Returns: + List[Dict[str, Any]]: List of decomposed subtasks as dictionaries + """ + message = [ + { + "role": "user", + "content": f""" + Task: {task} + - Given the task above which was determined to be too complex, break it down into smaller, more manageable subtasks. + - Every subtask should be solvable through either executing code or using tools. The information of all the tools available are here: + {self.service_toolkit.tools_instruction} + - The subtask should not be too simple. If a task can be solved with a single block of code in one go, it should not be broken down further. + - Prioritize using other tools over `execute_python_code` and take the tools available into consideration when decomposing the task. + - Provide a JSON structure with the following format for the decomposition: + ```json + [ + {{ + "task_id": str = "unique identifier for a task in plan, can be an ordinal", + "dependent_task_ids": list[str] = "ids of tasks prerequisite to this task", + "instruction": "what you should do in this task, one short phrase or sentence", + "task_type": "type of this task, should be one of Available Task Types", + "task_type": "type of this task, should be one of Available Task Types", + "tool_info": "recommended tool(s)' name(s) for solving this task", + }}, + ... + ] + ``` + - The maximum number of subtasks allowed is {max_tasks}. + """, + }, + ] + + response_text: str = self.model(message).text.strip() + response = ModelResponse(text=response_text) + parser = MarkdownJsonObjectParser() + parsed_response: List[Dict[str, Any]] = parser.parse(response) + return parsed_response.parsed diff --git a/examples/data_interpreter_mulit-agent/di_multiagent.py b/examples/data_interpreter_mulit-agent/di_multiagent.py new file mode 100644 index 000000000..3e20ffd47 --- /dev/null +++ b/examples/data_interpreter_mulit-agent/di_multiagent.py @@ -0,0 +1,433 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0301 +""" +This script demonstrates task solving with a data interpreter pipeline +formed by multiple agents, each specialized in one aspect of problem solving. +""" +import csv +import os +import copy +from typing import Any, List, Dict, Optional +from dotenv import load_dotenv, find_dotenv +from di_agents import ( + PlannerAgent, + VerifierAgent, + SynthesizerAgent, + ReplanningAgent, +) +import agentscope +from agentscope.agents import ReActAgent +from agentscope.message import Msg +from agentscope.service import ( + ServiceToolkit, + execute_python_code, + list_directory_content, + get_current_directory, + execute_shell_command, +) + +from agentscope.service.service_response import ServiceResponse +from agentscope.service.service_status import ServiceExecStatus + +# Global variables for agents with type annotations +planner_agent: PlannerAgent +solver_agent: ReActAgent +verifier_agent: VerifierAgent +synthesizer_agent: SynthesizerAgent +replanner_agent: ReplanningAgent + +# Global variables for failure tracking +total_failure_count = 0 +max_total_failures = 3 # Adjust as needed + + +def read_csv_file(file_path: str) -> ServiceResponse: + """ + Read and parse a CSV file. + + Args: + file_path (`str`): + The path to the CSV file to be read. + + Returns: + `ServiceResponse`: Where the boolean indicates success, the + Any is the parsed CSV content (typically a list of rows), + and the str contains an error message if any, including + the error type. + """ + try: + with open(file_path, "r", encoding="utf-8") as file: + reader = csv.reader(file) + data: List[List[str]] = list(reader) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content=data, + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + + +def write_csv_file( + file_path: str, + data: List[List[Any]], + overwrite: bool = False, +) -> ServiceResponse: + """ + Write data to a CSV file. + + Args: + file_path (`str`): + The path to the file where the CSV data will be written. + data (`List[List[Any]]`): + The data to write to the CSV file (each inner list represents a row). + overwrite (`bool`): + Whether to overwrite the file if it already exists. + + Returns: + `ServiceResponse`: where the boolean indicates success, and the + str contains an error message if any, including the error type. + """ + if not overwrite and os.path.exists(file_path): + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content="FileExistsError: The file already exists.", + ) + try: + with open(file_path, "w", encoding="utf-8", newline="") as file: + writer = csv.writer(file) + writer.writerows(data) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content="Success", + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + + +_ = load_dotenv(find_dotenv()) # read local .env file + +openai_api_key = os.getenv("OPENAI_API_KEY") +dashscope_api_key = os.getenv("DASHSCOPE_API_KEY") +os.environ["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY") + + +STRUCTURAL_PROMPT = """ + # overall_task: {overall_task} + + # solved_sub_tasks: {solved_sub_tasks} + + # current_sub_task: {current_sub_task} + + # Instruction + - Conditioning on `overall_task` and `solved_sub_tasks`, solve `current_sub_task` with the appropriate tools provided. Note that you should only use `overall_task` and `solved_sub_tasks` as context as opposed to solving them. DO NOT attempt to solve `overall_task` or `solved_sub_tasks`. + - When using tools, ALWAYS prioritize using the tool mentioned in `tool_info` over other tool or code for solving `current_sub_task`. + - While some concise thoughts are helpful, code is required, unless other tools are used. If certain python libraries are not installed, use `execute_shell_command` to install them. + - At each step, if some data is fetched/generated, it is a good practice to save it. + + # Output Instruction + - Always output one and only one code block in your response. The code block must be self-contained, i.e., does not rely on previously generated code to be executed successfully, because the execution environments do not persist between calls of `execute_python_code`. Always use print statement on the final solution. E.g., if `res` is the final output, use `print(res)` at the end of your code. Output the code itself if the task at hand is to generate that code. After that, use `execute_python_code` to execute your code. Based on the result from code execution or tool using, determine if `current_sub_task` is solved. + - After `current_sub_task` is solved, return explicitly the result(s) for `current_sub_task` that is/are needed in the subsequent subtasks. If certain code are needed for the subsequent tasks, OUTPUT THE COMPLETE CODE. If the code is long, save it in txt or json format, and output the path for next round's use. If the result(s) contain(s) a lot of data, save the result(s) locally, output the path before proceed. + - DO NOT USE `finish` tool before executing the code if code execution is required. If the result involves a lot of data, save the data as csv, txt, json file(s) etc. for the ease of processing in the following subtasks. + """ + + +def process_subtasks( + subtasks: List[Dict[str, Any]], + task: str, + solved_dependent_sub_tasks: str, +) -> str: + """ + Process and solve subtasks recursively while handling failures and replanning when necessary. + This function implements a robust task-solving pipeline that includes verification, + failure tracking, and dynamic replanning capabilities. + + Args: + subtasks (List[Dict[str, Any]]): List of subtasks to be processed, where each subtask + is a dictionary containing task instructions and metadata. + task (str): The overall task description that provides context for subtask processing. + solved_dependent_sub_tasks (str): String containing the accumulated results and context + from previously solved subtasks. + + Returns: + str: The final synthesized answer after processing all subtasks, incorporating + the results from successful subtask executions and any necessary replanning. + """ + global total_failure_count, max_total_failures, replanner_agent + subtask_index: int = 0 + aggregated_result: str = "" + while subtask_index < len(subtasks): + print("current subtask:", subtasks[subtask_index]) + if subtask_index > 0: + solved_dependent_sub_tasks += str(subtasks[subtask_index - 1]) + prompt: str = STRUCTURAL_PROMPT.format( + overall_task=task, + solved_sub_tasks=solved_dependent_sub_tasks, + current_sub_task=subtasks[subtask_index], + ) + msg: Msg = Msg(name="Planner", role="system", content=prompt) + + verdict: str = "non" + failure_count: int = 0 + max_failure: int = 3 # Adjust as needed + result: Optional[Msg] = None + while "True" not in verdict[-5:]: + if verdict != "non": + msg = Msg( + name="Planner", + role="system", + content=prompt + " VERDICT: " + verdict, + ) + failure_count += 1 + total_failure_count += 1 + + if failure_count > max_failure: + # Check if total failures exceed max_total_failures + if total_failure_count > max_total_failures: + print("Exceeded maximum total failures. Aborting.") + return ( + "Failed to solve the task due to excessive failures." + ) + + # Call the replanner agent + result_content: str = str(result.content) if result else "" + msg_replan: Msg = Msg( + name="replanner", + role="system", + content=( + "overall_task: " + + task + + "\nsolved_dependent_sub_tasks: " + + solved_dependent_sub_tasks + + "\ncurrent_sub_task: " + + subtasks[subtask_index]["instruction"] + + "\nresult: " + + result_content + + "\nVERDICT: " + + verdict + + "\nall_subtasks: " + + str(subtasks) + ), + ) + decision: str + output: Any + decision, output = replanner_agent(msg_replan) + if decision == "decompose_subtask": + # Decompose current subtask into sub-subtasks + print("Decomposing current subtask into sub-subtasks...") + # Recursively process the new subtasks + final_answer = process_subtasks( + output, + task, + solved_dependent_sub_tasks, + ) + # After processing sub-subtasks, set the result of current subtask + subtasks[subtask_index]["result"] = final_answer + aggregated_result += final_answer + subtask_index += 1 + break # Break the while loop + if decision == "replan_subtask": + # Update subtasks with the new plan + print("Replanning current and subsequent subtasks...") + # Replace current and subsequent subtasks + # subtasks = subtasks[:subtask_index] + output + subtasks = copy.deepcopy(output) + # Reset failure_count + failure_count = 0 + # Continue with the updated subtasks + break # Break and restart the while loop with new subtasks + raise ValueError( + "Unknown decision from replanning_agent.", + ) + + # Proceed with solving the subtask + result = solver_agent(msg) + msg_verifier: Msg = Msg( + name="Verifier", + role="system", + content=( + "overall_task: " + + task + + "\nsolved_dependent_sub_tasks: " + + solved_dependent_sub_tasks + + "\ncurrent_sub_task: " + + subtasks[subtask_index]["instruction"] + + "\nresult: " + + str(result.content) + ), + ) + verdict = verifier_agent(msg_verifier).content + + # Store the result if verification passed + if "True" in verdict[-5:]: + subtasks[subtask_index]["result"] = str(result.content) + aggregated_result += str(result.content) + subtask_index += 1 # Move to the next subtask + # Reset failure_count after a successful subtask + failure_count = 0 + + # Once all subtasks are processed, synthesize the final answer + msg_synthesizer: Msg = Msg( + name="synthesizer", + role="system", + content="overall_task: " + task + "\nsubtasks: " + str(subtasks), + ) + final_answer = synthesizer_agent(msg_synthesizer).content + return final_answer + + +def problem_solving(task: str) -> str: + """ + Solve the given task by planning, processing subtasks, and synthesizing the final answer. + + Args: + task (str): The task description to be solved. + + Returns: + str: The final solution to the task. + """ + global total_failure_count, max_total_failures + total_failure_count = 0 + max_total_failures = 10 # Adjust as needed + + task_msg: Msg = Msg(name="Planner", role="system", content=task) + subtasks: List[Dict[str, Any]] = planner_agent(task_msg) + solved_dependent_sub_tasks: str = "" + + final_answer = process_subtasks(subtasks, task, solved_dependent_sub_tasks) + + return final_answer + + +def init_agents() -> None: + """ + Initialize all agents with the required configurations. + """ + global planner_agent, solver_agent, verifier_agent, synthesizer_agent, replanner_agent + + agentscope.init( + model_configs=[ + { + "config_name": "gpt_config", + "model_type": "openai_chat", + # "model_name": "chatgpt-4o-latest", + "model_name": "gpt-4o-mini", + # "model_name": "gpt-4o", + # "model_name": "o1-mini", + "api_key": openai_api_key, + "generate_args": { + "temperature": 0.0, + }, + }, + { + "config_name": "dashscope", + "model_type": "dashscope_chat", + "model_name": "qwen-max-1201", + "api_key": dashscope_api_key, + "generate_args": { + "temperature": 0.0, + }, + }, + { + "config_name": "lite_llm_claude", + "model_type": "litellm_chat", + "model_name": "claude-3-5-haiku-20241022", + # "model_name": "claude-3-5-sonnet-20241022", + "generate_args": { + # "max_tokens": 4096, + "temperature": 0.0, + }, + }, + { + "model_type": "post_api_chat", + "config_name": "my_post_api", + "api_url": "https://xxx", + "headers": {}, + }, + { + "config_name": "dashscope_chat", + "model_type": "dashscope_chat", + "model_name": "qwen-max", + "api_key": "", + "generate_args": { + "temperature": 0.7, + }, + }, + ], + project="Multi-Agent Conversation", + save_api_invoke=True, + use_monitor=True, # Enable token usage monitoring + ) + + # Create a ServiceToolkit instance + service_toolkit = ServiceToolkit() + # Add your tools to the service_toolkit here if needed + service_toolkit.add( + execute_python_code, + ) + service_toolkit.add( + list_directory_content, + ) + service_toolkit.add( + get_current_directory, + ) + service_toolkit.add( + execute_shell_command, + ) + + # Initialize the agents + planner_agent = PlannerAgent( + name="planner", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) + + solver_agent = ReActAgent( + name="solver", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) + + verifier_agent = VerifierAgent( + name="verifier", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) + + synthesizer_agent = SynthesizerAgent( + name="synthesizer", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + ) + + replanner_agent = ReplanningAgent( + name="replanner", + sys_prompt="You're a helpful assistant.", + model_config_name="dashscope_chat", + service_toolkit=service_toolkit, + ) + + +def main() -> None: + """Initialize agents and run an example task through the problem-solving pipeline.""" + # Initialize agents + init_agents() + + # Example task (you can replace this with any task) + input_task = "Your task description here." + final_solution = problem_solving(input_task) + print("final solution: ", final_solution) + + +if __name__ == "__main__": + main() From 83edd23adf2b83674c9307d37446c8899c03c836 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:51:53 +0800 Subject: [PATCH 2/9] Update di_multiagent.py --- examples/data_interpreter_mulit-agent/di_multiagent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/data_interpreter_mulit-agent/di_multiagent.py b/examples/data_interpreter_mulit-agent/di_multiagent.py index 3e20ffd47..ec85f8767 100644 --- a/examples/data_interpreter_mulit-agent/di_multiagent.py +++ b/examples/data_interpreter_mulit-agent/di_multiagent.py @@ -355,7 +355,7 @@ def init_agents() -> None: "config_name": "dashscope_chat", "model_type": "dashscope_chat", "model_name": "qwen-max", - "api_key": "", + "api_key": dashscope_api_key, "generate_args": { "temperature": 0.7, }, From 1c5f1bab3a1a7eb8fa52a78ab905c5d60288325e Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:55:22 +0800 Subject: [PATCH 3/9] Update di_multiagent.py --- examples/data_interpreter_mulit-agent/di_multiagent.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/examples/data_interpreter_mulit-agent/di_multiagent.py b/examples/data_interpreter_mulit-agent/di_multiagent.py index ec85f8767..be3a9c7ba 100644 --- a/examples/data_interpreter_mulit-agent/di_multiagent.py +++ b/examples/data_interpreter_mulit-agent/di_multiagent.py @@ -326,15 +326,6 @@ def init_agents() -> None: "temperature": 0.0, }, }, - { - "config_name": "dashscope", - "model_type": "dashscope_chat", - "model_name": "qwen-max-1201", - "api_key": dashscope_api_key, - "generate_args": { - "temperature": 0.0, - }, - }, { "config_name": "lite_llm_claude", "model_type": "litellm_chat", From 0941af39fbc52018310c5c422370d4b449f978a6 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 20 Nov 2024 12:09:18 +0800 Subject: [PATCH 4/9] Update react_agent.py Slight modification to the format_instruction to improve the success rate of `RegexTaggedContentParser` parsing the model API output. --- src/agentscope/agents/react_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentscope/agents/react_agent.py b/src/agentscope/agents/react_agent.py index a806c40a9..a65d575f0 100644 --- a/src/agentscope/agents/react_agent.py +++ b/src/agentscope/agents/react_agent.py @@ -109,7 +109,7 @@ def __init__( # Initialize a parser object to formulate the response from the model self.parser = RegexTaggedContentParser( - format_instruction="""Respond with specific tags as outlined below: + format_instruction="""Respond with specific tags as outlined below in json format: {what you thought} {the function name you want to call} <{argument name}>{argument value} From 41624dd9e0baa81a80118fe1c723e37b943791fe Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 11 Dec 2024 09:41:20 +0800 Subject: [PATCH 5/9] Update react_agent.py --- src/agentscope/agents/react_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentscope/agents/react_agent.py b/src/agentscope/agents/react_agent.py index a65d575f0..afcd0ad05 100644 --- a/src/agentscope/agents/react_agent.py +++ b/src/agentscope/agents/react_agent.py @@ -123,7 +123,7 @@ def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: """The reply method of the agent.""" self.memory.add(x) - for _ in range(10): + for _ in range(self.max_iters): # Step 1: Reasoning: decide what function to call function_call = self._reasoning() From b178779dfd69a291f55bd7139706e6917028e684 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:24:41 +0800 Subject: [PATCH 6/9] Update react_agent.py --- src/agentscope/agents/react_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentscope/agents/react_agent.py b/src/agentscope/agents/react_agent.py index afcd0ad05..87025afdd 100644 --- a/src/agentscope/agents/react_agent.py +++ b/src/agentscope/agents/react_agent.py @@ -109,7 +109,7 @@ def __init__( # Initialize a parser object to formulate the response from the model self.parser = RegexTaggedContentParser( - format_instruction="""Respond with specific tags as outlined below in json format: + format_instruction="""Respond with specific tags as outlined below: {what you thought} {the function name you want to call} <{argument name}>{argument value} From d22313e22d3888c2afb65b5c5a91b0541df3a3c4 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:25:34 +0800 Subject: [PATCH 7/9] Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format within this example --- .../data_interpreter_mulit-agent/di_agents.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/examples/data_interpreter_mulit-agent/di_agents.py b/examples/data_interpreter_mulit-agent/di_agents.py index 1a0dfe984..96f30b63f 100644 --- a/examples/data_interpreter_mulit-agent/di_agents.py +++ b/examples/data_interpreter_mulit-agent/di_agents.py @@ -10,9 +10,9 @@ from agentscope.agents.agent import AgentBase from agentscope.message import Msg from agentscope.models import ModelResponse -from agentscope.parsers.json_object_parser import MarkdownJsonObjectParser +from agentscope.parsers import MarkdownJsonObjectParser +from agentscope.parsers import RegexTaggedContentParser from agentscope.service import ServiceToolkit - from agentscope.service.service_response import ServiceResponse from agentscope.service.service_status import ServiceExecStatus @@ -198,6 +198,18 @@ def __init__( max_iters=max_iters, verbose=verbose, ) + + # Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format. + self.parser = RegexTaggedContentParser( + format_instruction="""Respond with specific tags as outlined below in json format: + {what you thought} + {the function name you want to call} + <{argument name}>{argument value} + <{argument name}>{argument value} + ...""", # noqa + try_parse_json=True, + required_keys=["thought", "function"], + ) def reply(self, x: Optional[Union[Msg, Sequence[Msg]]] = None) -> Msg: Verification_PROMPT = """- Given `overall_task` and `solved_dependent_sub_tasks` as context, verify if the information in `result` can succesfully solve `current_sub_task` with your reasoning trace. From e95b6bbc42c435cedfd017bfbac09e3db1513489 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:27:52 +0800 Subject: [PATCH 8/9] # Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format within this example. --- .../di_multiagent.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/examples/data_interpreter_mulit-agent/di_multiagent.py b/examples/data_interpreter_mulit-agent/di_multiagent.py index be3a9c7ba..784b667dd 100644 --- a/examples/data_interpreter_mulit-agent/di_multiagent.py +++ b/examples/data_interpreter_mulit-agent/di_multiagent.py @@ -28,6 +28,7 @@ from agentscope.service.service_response import ServiceResponse from agentscope.service.service_status import ServiceExecStatus +from agentscope.parsers import RegexTaggedContentParser # Global variables for agents with type annotations planner_agent: PlannerAgent @@ -345,10 +346,10 @@ def init_agents() -> None: { "config_name": "dashscope_chat", "model_type": "dashscope_chat", - "model_name": "qwen-max", + "model_name": "qwen-plus", "api_key": dashscope_api_key, "generate_args": { - "temperature": 0.7, + "temperature": 0.0, }, }, ], @@ -388,6 +389,18 @@ def init_agents() -> None: service_toolkit=service_toolkit, ) + # Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format. + solver_agent.parser = RegexTaggedContentParser( + format_instruction="""Respond with specific tags as outlined below in json format: + {what you thought} + {the function name you want to call} + <{argument name}>{argument value} + <{argument name}>{argument value} + ...""", # noqa + try_parse_json=True, + required_keys=["thought", "function"], + ) + verifier_agent = VerifierAgent( name="verifier", sys_prompt="You're a helpful assistant.", From f78ce6b09fbbd61b9b12e290cc85a5be8322d490 Mon Sep 17 00:00:00 2001 From: zyzhang1130 <36942574+zyzhang1130@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:31:21 +0800 Subject: [PATCH 9/9] pre-commit run --- examples/data_interpreter_mulit-agent/di_agents.py | 2 +- examples/data_interpreter_mulit-agent/di_multiagent.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/data_interpreter_mulit-agent/di_agents.py b/examples/data_interpreter_mulit-agent/di_agents.py index 96f30b63f..7b9678e5a 100644 --- a/examples/data_interpreter_mulit-agent/di_agents.py +++ b/examples/data_interpreter_mulit-agent/di_agents.py @@ -198,7 +198,7 @@ def __init__( max_iters=max_iters, verbose=verbose, ) - + # Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format. self.parser = RegexTaggedContentParser( format_instruction="""Respond with specific tags as outlined below in json format: diff --git a/examples/data_interpreter_mulit-agent/di_multiagent.py b/examples/data_interpreter_mulit-agent/di_multiagent.py index 784b667dd..cb6f20a0c 100644 --- a/examples/data_interpreter_mulit-agent/di_multiagent.py +++ b/examples/data_interpreter_mulit-agent/di_multiagent.py @@ -391,15 +391,15 @@ def init_agents() -> None: # Overwrite the parser attribute with the custom format_instruction to reinforce the output adhere to json format. solver_agent.parser = RegexTaggedContentParser( - format_instruction="""Respond with specific tags as outlined below in json format: + format_instruction="""Respond with specific tags as outlined below in json format: {what you thought} {the function name you want to call} <{argument name}>{argument value} <{argument name}>{argument value} ...""", # noqa - try_parse_json=True, - required_keys=["thought", "function"], - ) + try_parse_json=True, + required_keys=["thought", "function"], + ) verifier_agent = VerifierAgent( name="verifier",