diff --git a/examples/swe_agent/main.ipynb b/examples/swe_agent/main.ipynb index fe6a1f261..966cad85c 100644 --- a/examples/swe_agent/main.ipynb +++ b/examples/swe_agent/main.ipynb @@ -6,10 +6,11 @@ "source": [ "# Example with SWE-agent\n", "\n", - "SWE-agent is an agent designed for solving github issues.\n", - "More details can be found in https://swe-agent.com/.\n", + "SWE-agent(SoftWare Engineering Agent) is an agent designed for solving real world software engineering problems, such as fixing github issues.\n", + "More details can be found in their [homepage](https://swe-agent.com/) and related [github repo](https://swe-agent.com/).\n", "\n", "In the example here, we partially implement the SWE-agent, and provide a simple example of how to use the implemented SWE-agent to fix a bug in a python file.\n", + "You should note that currently how to enable agents with stronger programming capabilities remains an open challenge, and the performance of the paritially implemented SWE-agent is not guaranteed.\n", "\n", "## Prerequisites\n", "\n", @@ -39,16 +40,39 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-05-15 10:44:41.957 | WARNING | agentscope.service.service_toolkit:get:626 - The service factory will be deprecated in the future. Try to use the `ServiceToolkit` class instead.\n", + "2024-05-15 10:44:41.959 | WARNING | agentscope.service.service_toolkit:get:626 - The service factory will be deprecated in the future. Try to use the `ServiceToolkit` class instead.\n", + "2024-05-15 10:44:41.959 | WARNING | agentscope.service.service_toolkit:get:626 - The service factory will be deprecated in the future. Try to use the `ServiceToolkit` class instead.\n", + "2024-05-15 10:44:41.959 | WARNING | agentscope.service.service_toolkit:get:626 - The service factory will be deprecated in the future. Try to use the `ServiceToolkit` class instead.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2024-05-15 10:44:41.977 | INFO | agentscope.models:read_model_configs:176 - Load configs for model wrapper: gpt-3.5-turbo, gpt-4, gpt-3.5-turbo-old, gpt-4-old\n", + "2024-05-15 10:44:41.979 | INFO | agentscope.utils.monitor:_create_monitor_table:396 - Init [monitor_metrics] as the monitor table\n", + "2024-05-15 10:44:41.979 | INFO | agentscope.utils.monitor:_create_monitor_table:397 - Init [monitor_metrics_quota_exceeded] as the monitor trigger\n", + "2024-05-15 10:44:41.979 | INFO | agentscope.utils.monitor:__init__:366 - SqliteMonitor initialization completed at [./runs/run_20240515-104441_s1ome5/agentscope.db]\n", + "2024-05-15 10:44:41.989 | INFO | agentscope.models.model:__init__:201 - Initialize model by configuration [gpt-4]\n", + "2024-05-15 10:44:41.989 | INFO | agentscope.models.model:__init__:201 - Initialize model by configuration [gpt-4]\n" + ] + } + ], "source": [ "from swe_agent import SWEAgent\n", "\n", "import agentscope\n", "from agentscope.models import load_model_by_config_name\n", "agentscope.init(\n", - " model_configs=\"xxx\", # fill you model config file path here\n", + " model_configs=\"/Users/wenhao/Disk/Codes/mydev/agentscope/examples/software_engineering_agent/configs/model_configs copy.json\", # fill you model config file path here\n", ")\n", "loaded_model = load_model_by_config_name('gpt-4')\n", "\n", @@ -87,7 +111,7 @@ } ], "source": [ - "from agentscope.service import write_file\n", + "from swe_agent_service_func import write_file\n", "\n", "# here the gcd code is written in a wrong way\n", "wrong_gcd_text = \"\"\"\n", @@ -127,9 +151,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "user: The file in gcd.py is not returning the correct answer, can you fix the file for me?.\n", + "user: The file in gcd.py is not returning the correct answer, can you fix the file for me?\n", "assistant: {\n", - " \"thought\": \"First, I need to view the file gcd.py to understand the bug the user is referring to. I'll read the file gcd.py using the `read_file` command.\",\n", + " \"thought\": \"To debug the gcd.py file, first we need to read the contents of the file to understand the existing program logic.\",\n", " \"action\": {\n", " \"name\": \"read_file\",\n", " \"arguments\": {\n", @@ -155,45 +179,21 @@ "print(\"LCM of 12 and 18 is:\", lcm(12, 18))\n", "\n", "\n", - "assistant: {\n", - " \"thought\": \"I can see that the gcd algorithm implemented in the gcd function within gcd.py file is incorrect. In the while loop of the gcd function, a and b should repeatedly subtract one from the other until a is zero but it's subtracting a from itself which would result in always zero. I'll correct this by modifying the line 5: 'a, b = b, a' to 'a, b = b % a, a'. \",\n", - " \"action\": {\n", - " \"name\": \"write_file\",\n", - " \"arguments\": {\n", - " \"start_line\": 3,\n", - " \"end_line\": 6,\n", - " \"content\": \" if a == 0:\\n return b\\n while b != 0:\\n a, b = b % a, a\",\n", - " \"file_path\": \"gcd.py\"\n", - " }\n", - " }\n", - "}\n", - "assistant: \n", - "====Observation====\n", - "WRITE OPERATION:\n", - "You have written to \"gcd.py\" on these lines: 3:6.\n", - "def gcd(a, b):\n", - " if a == 0:\n", - " if a == 0:\n", - " return b\n", - " while b != 0:\n", - " a, b = b % a, a\n", + "assistant: ```json\n", + "Upon reviewing the contents of gcd.py, I can see the gcd function is incorrectly implemented. While the function starts correctly by evaluating `if a == 0: return b`, the loop structure `while a != 0: a, b = b, a` does not accurately compute the greatest common divisor (gcd). \n", "\n", - "def lcm(a, b):\n", - " return (a * b) // gcd(a, b)\n", - "\n", - "# testing on GCD and LCM functions\n", - "print(\"GCD of 12 and 18 is:\", gcd(12, 18))\n", - "print(\"LCM of 12 and 18 is:\", lcm(12, 18))\n", + "Instead, this loop intends to swap a and b while 'a' is not zero, but does not perform any operation that would actually calculate the gcd. In fact, without proper adjustments, this results in an infinite loop when 'a' is not zero.\n", "\n", + "The correct implementation should be using Euclid's algorithm for calculating gcd which goes: while b is not zero, swap a, b = b, remainder of (a divided by b), when b becomes zero, 'a' is the gcd.\n", "\n", + "I will replace the current gcd function in the file with this correct implementation.```\n", + "system: {'action': {'name': 'error'}, 'error_msg': 'Your response is not a JSON object, and cannot be parsed by `json.loads` in parse function:\\n## Your Response:\\n[YOUR RESPONSE BEGIN]\\n```json\\nUpon reviewing the contents of gcd.py, I can see the gcd function is incorrectly implemented. While the function starts correctly by evaluating `if a == 0: return b`, the loop structure `while a != 0: a, b = b, a` does not accurately compute the greatest common divisor (gcd). \\n\\nInstead, this loop intends to swap a and b while \\'a\\' is not zero, but does not perform any operation that would actually calculate the gcd. In fact, without proper adjustments, this results in an infinite loop when \\'a\\' is not zero.\\n\\nThe correct implementation should be using Euclid\\'s algorithm for calculating gcd which goes: while b is not zero, swap a, b = b, remainder of (a divided by b), when b becomes zero, \\'a\\' is the gcd.\\n\\nI will replace the current gcd function in the file with this correct implementation.```\\n[YOUR RESPONSE END]\\n\\n## Error Information:\\nThe content between ```json and ``` MUST be a JSON object.When parsing \"```json\\nUpon reviewing the contents of gcd.py, I can see the gcd function is incorrectly implemented. While the function starts correctly by evaluating `if a == 0: return b`, the loop structure `while a != 0: a, b = b, a` does not accurately compute the greatest common divisor (gcd). \\n\\nInstead, this loop intends to swap a and b while \\'a\\' is not zero, but does not perform any operation that would actually calculate the gcd. In fact, without proper adjustments, this results in an infinite loop when \\'a\\' is not zero.\\n\\nThe correct implementation should be using Euclid\\'s algorithm for calculating gcd which goes: while b is not zero, swap a, b = b, remainder of (a divided by b), when b becomes zero, \\'a\\' is the gcd.\\n\\nI will replace the current gcd function in the file with this correct implementation.```\", an error occurred: Expecting value: line 2 column 1 (char 1)\\n\\nAnalyze the reason, and re-correct your response in the correct format.'}\n", "assistant: {\n", - " \"thought\": \"The gcd function implemented is incorrect and it's causing the lcm function to fail as well. In the gcd function, while loop is incorrect. We should calculate gcd based on the fact that GCD of two numbers a and b is same as the GCD of b and a mod b. 'a mod b' can be calculated using the '%' operator. When b becomes 0, a will hold the largest common divisor (GCD). Let's fix this.\",\n", + " \"thought\": \"To fix the gcd function, I need to use Euclid's algorithm. The algorithm is: while b is not zero, swap a, b = b, remainder of (a divided by b), when b becomes zero, 'a' is the gcd. I will replace the current faulty gcd function in the file with this correct implementation.\",\n", " \"action\": {\n", " \"name\": \"write_file\",\n", " \"arguments\": {\n", - " \"start_line\": 2,\n", - " \"end_line\": 6,\n", - " \"content\": \" if b == 0:\\n return a\\n while b:\\n a, b = b, a % b\\n return a\",\n", + " \"content\": \"def gcd(a, b):\\n while b != 0:\\n a, b = b, a % b\\n return a\\n\\ndef lcm(a, b):\\n return (a * b) // gcd(a, b)\\n\\n# testing on GCD and LCM functions\\nprint(\\\"GCD of 12 and 18 is:\\\", gcd(12, 18))\\nprint(\\\"LCM of 12 and 18 is:\\\", lcm(12, 18))\",\n", " \"file_path\": \"gcd.py\"\n", " }\n", " }\n", @@ -201,11 +201,8 @@ "assistant: \n", "====Observation====\n", "WRITE OPERATION:\n", - "You have written to \"gcd.py\" on these lines: 2:6.\n", - "def gcd(a, b):\n", - " if b == 0:\n", - " return a\n", - " while b:\n", + "You have written to \"gcd.py\" on these lines: 0:-1.def gcd(a, b):\n", + " while b != 0:\n", " a, b = b, a % b\n", " return a\n", "\n", @@ -216,9 +213,8 @@ "print(\"GCD of 12 and 18 is:\", gcd(12, 18))\n", "print(\"LCM of 12 and 18 is:\", lcm(12, 18))\n", "\n", - "\n", "assistant: {\n", - " \"thought\": \"It seems the gcd function in gcd.py has been fixed and any changes required have already been made in my previous actions. I should run the python file to determine if it produces expected results. To do that, I can use the 'execute_shell_command' by running the 'python3' command with 'gcd.py'.\",\n", + " \"thought\": \"Now I have fixed the gcd function, I now need to test if it's working as expected. I can do this by executing the Python file.\",\n", " \"action\": {\n", " \"name\": \"execute_shell_command\",\n", " \"arguments\": {\n", @@ -231,7 +227,7 @@ "GCD of 12 and 18 is: 6\n", "LCM of 12 and 18 is: 36\n", "assistant: {\n", - " \"thought\": \"The gcd.py file is fixed correctly and no further actions are needed.\",\n", + " \"thought\": \"The code has been corrected and tested. It is now returning the correct gcd and lcm. No further actions are required.\",\n", " \"action\": {\n", " \"name\": \"exit\",\n", " \"arguments\": {}\n", @@ -266,7 +262,7 @@ "```text\n", "\n", "assistant: {\n", - " \"thought\": \"It seems the gcd function in gcd.py has been fixed and any changes required have already been made in my previous actions. I should run the python file to determine if it produces expected results. To do that, I can use the 'execute_shell_command' by running the 'python3' command with 'gcd.py'.\",\n", + " \"thought\": \"Now I have fixed the gcd function, I now need to test if it's working as expected. I can do this by executing the Python file.\",\n", " \"action\": {\n", " \"name\": \"execute_shell_command\",\n", " \"arguments\": {\n", @@ -278,10 +274,8 @@ "====Observation====\n", "GCD of 12 and 18 is: 6\n", "LCM of 12 and 18 is: 36\n", - "\n", - "\n", "assistant: {\n", - " \"thought\": \"The gcd.py file is fixed correctly and no further actions are needed.\",\n", + " \"thought\": \"The code has been corrected and tested. It is now returning the correct gcd and lcm. No further actions are required.\",\n", " \"action\": {\n", " \"name\": \"exit\",\n", " \"arguments\": {}\n", @@ -303,7 +297,7 @@ "Above we shown a example of how to use the SWE-agent to fix code errors.\n", "Although the design of the SWE-agent is primarily aimed at addressing GitHub issues, with modifications, it can also be utilized for more general programming tasks.\n", "\n", - "Currently, how to program using an agent remains a challenging open question, with the efficacy of agent programming potentially influenced by factors such as prompt construction, model capabilities, and the complexity of the task at hand. Here we just provide an interesting toy example. \n", + "Currently, how to enable agent with general programming ablities remains a challenging open question, with the efficacy of agent programming potentially influenced by factors such as prompt construction, model capabilities, and the complexity of the task at hand. Here we just provide an interesting toy example. \n", "\n", "We encourage users to experiment by altering the prompts within this example or by assigning different tasks to the agent, among other methods of exploration. Please feel free to experiment and explore on your own. The AgentScope team will continue to provide updates, enhancing the capabilities of the Programming Agents in the future!" ] diff --git a/examples/swe_agent/swe_agent.py b/examples/swe_agent/swe_agent.py index 350bbb047..3b55431d5 100644 --- a/examples/swe_agent/swe_agent.py +++ b/examples/swe_agent/swe_agent.py @@ -9,15 +9,19 @@ from agentscope.agents import AgentBase from agentscope.message import Msg -from agentscope.models import ResponseParser, ResponseParsingError +from agentscope.exception import ResponseParsingError +from agentscope.parsers import MarkdownJsonDictParser from typing import List, Callable import json from agentscope.service import ( ServiceFactory, execute_shell_command, +) + +from swe_agent_service_func import ( + exec_py_linting, write_file, read_file, - exec_py_linting, ) from swe_agent_prompts import ( @@ -108,6 +112,7 @@ def __init__( self.main_goal = "" self.commands_prompt = "" + self.parser = MarkdownJsonDictParser() self.get_commands_prompt() def get_current_file_content(self) -> None: @@ -157,12 +162,12 @@ def step(self) -> Msg: in_prompt = self.model.format(message_list) res = self.model( in_prompt, - parse_func=ResponseParser.to_dict, + parse_func=self.parser.parse, max_retries=1, - ).json + ) except ResponseParsingError as e: - response_msg = Msg(self.name, e.response.text, "assistant") + response_msg = Msg(self.name, e.raw_response, "assistant") self.speak(response_msg) # Re-correct by model itself @@ -171,9 +176,9 @@ def step(self) -> Msg: content={ "action": {"name": "error"}, "error_msg": ERROR_INFO_PROMPT.format( - parse_func=ResponseParser.to_dict, - error_info=e.error_info, - response=e.response.text, + parse_func=self.parser.parse, + error_info=e.message, + response=e.raw_response, ), }, role="system", @@ -183,16 +188,16 @@ def step(self) -> Msg: self.running_memory.append(error_msg) return error_msg - msg_res = Msg(self.name, res, role="assistant") + msg_res = Msg(self.name, res.parsed, role="assistant") self.speak( - Msg(self.name, json.dumps(res, indent=4), role="assistant"), + Msg(self.name, json.dumps(res.parsed, indent=4), role="assistant"), ) # parse and execute action - action = res.get("action") + action = res.parsed.get("action") - obs = self.prase_command(res["action"]) + obs = self.prase_command(res.parsed["action"]) self.speak( Msg(self.name, "\n====Observation====\n" + obs, role="assistant"), ) diff --git a/examples/swe_agent/swe_agent_prompts.py b/examples/swe_agent/swe_agent_prompts.py index ef7a742c0..b743671cf 100644 --- a/examples/swe_agent/swe_agent_prompts.py +++ b/examples/swe_agent/swe_agent_prompts.py @@ -96,7 +96,7 @@ def get_step_prompt( 5. After modifying python files, you can run `exec_py_linting` to check for errors. If there are errors, fix them and repeat the previous step. - NOTE THAT THIS ENVIRONMENT DOES NOT SUPPORT INTERACTIVE SESSION COMMANDS, such as "vim" or "python", or "python3". So DONOT execute them by running `execute_shell_command` with `python` command or `python3` command. + NOTE THAT THIS ENVIRONMENT DOES NOT SUPPORT INTERACTIVE SESSION COMMANDS, such as "vim" or "python", or "python3". So DONOT execute them by running `execute_shell_command` with `python` command or `python3` command if the code need additional inputs. If you want to check whether a python file is valid, you can use `exec_py_linting` to check for errors. {RESPONSE_FORMAT_PROMPT} diff --git a/examples/swe_agent/swe_agent_service_func.py b/examples/swe_agent/swe_agent_service_func.py new file mode 100644 index 000000000..76bdaecce --- /dev/null +++ b/examples/swe_agent/swe_agent_service_func.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0301 +""" +Tools for swe-agent, such as checking files with linting and formatting, +writing and reading files by lines, etc. +""" +import subprocess +import os + +from agentscope.service.service_response import ServiceResponse +from agentscope.service.service_status import ServiceExecStatus + + +def exec_py_linting(file_path: str) -> ServiceResponse: + """ + Executes flake8 linting on the given .py file with specified checks and + returns the linting result. + + Args: + file_path (`str`): The path to the Python file to lint. + + Returns: + ServiceResponse: Contains either the output from the flake8 command as + a string if successful, or an error message including the error type. + """ + command = f"flake8 --isolated --select=F821,F822,F831,\ + E111,E112,E113,E999,E902 {file_path}" + + try: + result = subprocess.run( + command, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content=result.stdout.strip() + if result.stdout + else "No lint errors found.", + ) + except subprocess.CalledProcessError as e: + error_message = ( + e.stderr.strip() + if e.stderr + else "An error occurred while linting the file." + ) + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + except Exception as e: + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=str(e), + ) + + +def write_file( + file_path: str, + content: str, + start_line: int = 0, + end_line: int = -1, +) -> ServiceResponse: + """ + Write content to a file by replacing the current lines between and with . Default start_line = 0 and end_line = -1. Calling this with no args will replace the whole file, so besure to use this with caution when writing to a file that already exists. + + Args: + file_path (`str`): The path to the file to write to. + content (`str`): The content to write to the file. + start_line (`Optional[int]`, defaults to `0`): The start line of the file to be replace with . + end_line (`Optional[int]`, defaults to `-1`): The end line of the file to be replace with . end_line = -1 means the end of the file, otherwise it should be a positive integer indicating the line number. + """ # noqa + try: + mode = "w" if not os.path.exists(file_path) else "r+" + insert = content.split("\n") + with open(file_path, mode, encoding="utf-8") as file: + if mode != "w": + all_lines = file.readlines() + new_file = [""] if start_line == 0 else all_lines[:start_line] + new_file += [i + "\n" for i in insert] + last_line = end_line + 1 + new_file += [""] if end_line == -1 else all_lines[last_line:] + else: + new_file = insert + + file.seek(0) + file.writelines(new_file) + file.truncate() + obs = f'WRITE OPERATION:\nYou have written to "{file_path}" \ + on these lines: {start_line}:{end_line}.' + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content=obs + "".join(new_file), + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) + + +def read_file( + file_path: str, + start_line: int = 0, + end_line: int = -1, +) -> ServiceResponse: + """ + Shows a given file's contents starting from up to . Default: start_line = 0, end_line = -1. By default the whole file will be read. + + Args: + file_path (`str`): The path to the file to read. + start_line (`Optional[int]`, defaults to `0`): The start line of the file to be read. + end_line (`Optional[int]`, defaults to `-1`): The end line of the file to be read. + """ # noqa + start_line = max(start_line, 0) + try: + with open(file_path, "r", encoding="utf-8") as file: + if end_line == -1: + if start_line == 0: + code_view = file.read() + else: + all_lines = file.readlines() + code_slice = all_lines[start_line:] + code_view = "".join(code_slice) + else: + all_lines = file.readlines() + num_lines = len(all_lines) + begin = max(0, min(start_line, num_lines - 2)) + end_line = ( + -1 if end_line > num_lines else max(begin + 1, end_line) + ) + code_slice = all_lines[begin:end_line] + code_view = "".join(code_slice) + return ServiceResponse( + status=ServiceExecStatus.SUCCESS, + content=f"{code_view}", + ) + except Exception as e: + error_message = f"{e.__class__.__name__}: {e}" + return ServiceResponse( + status=ServiceExecStatus.ERROR, + content=error_message, + ) diff --git a/src/agentscope/service/__init__.py b/src/agentscope/service/__init__.py index d7dea65b4..dce26c195 100644 --- a/src/agentscope/service/__init__.py +++ b/src/agentscope/service/__init__.py @@ -13,10 +13,7 @@ move_directory, list_directory_content, get_current_directory, - write_file, - read_file, ) -from .file.check import exec_py_linting from .file.text import read_text_file, write_text_file from .file.json import read_json_file, write_json_file from .sql_query.mysql import query_mysql @@ -73,9 +70,6 @@ def get_help() -> None: "load_web", "parse_html_to_text", "download_from_url", - "write_file", - "read_file", - "exec_py_linting", # to be deprecated "ServiceFactory", ] diff --git a/src/agentscope/service/file/check.py b/src/agentscope/service/file/check.py deleted file mode 100644 index 96c675e05..000000000 --- a/src/agentscope/service/file/check.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Tools for checking files, such as linting and formatting. -""" -import subprocess - -from agentscope.service.service_response import ServiceResponse -from agentscope.service.service_status import ServiceExecStatus - - -def exec_py_linting(file_path: str) -> ServiceResponse: - """ - Executes flake8 linting on the given .py file with specified checks and - returns the linting result. - - Args: - file_path (`str`): The path to the Python file to lint. - - Returns: - ServiceResponse: Contains either the output from the flake8 command as - a string if successful, or an error message including the error type. - """ - command = f"flake8 --isolated --select=F821,F822,F831,\ - E111,E112,E113,E999,E902 {file_path}" - - try: - result = subprocess.run( - command, - shell=True, - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - return ServiceResponse( - status=ServiceExecStatus.SUCCESS, - content=result.stdout.strip() - if result.stdout - else "No lint errors found.", - ) - except subprocess.CalledProcessError as e: - error_message = ( - e.stderr.strip() - if e.stderr - else "An error occurred while linting the file." - ) - return ServiceResponse( - status=ServiceExecStatus.ERROR, - content=error_message, - ) - except Exception as e: - return ServiceResponse( - status=ServiceExecStatus.ERROR, - content=str(e), - ) diff --git a/src/agentscope/service/file/common.py b/src/agentscope/service/file/common.py index dc6439fb7..11199a666 100644 --- a/src/agentscope/service/file/common.py +++ b/src/agentscope/service/file/common.py @@ -272,92 +272,3 @@ def get_current_directory() -> ServiceResponse: status=ServiceExecStatus.ERROR, content=error_message, ) - - -def write_file( - file_path: str, - content: str, - start_line: int = 0, - end_line: int = -1, -) -> ServiceResponse: - """ - Write content to a file by replacing the current lines between and with . Default start_line = 0 and end_line = -1. Calling this with no args will replace the whole file, so besure to use this with caution when writing to a file that already exists. - - Args: - file_path (`str`): The path to the file to write to. - content (`str`): The content to write to the file. - start_line (`Optional[int]`, defaults to `0`): The start line of the file to be replace with . - end_line (`Optional[int]`, defaults to `-1`): The end line of the file to be replace with . end_line = -1 means the end of the file, otherwise it should be a positive integer indicating the line number. - """ # noqa - try: - mode = "w" if not os.path.exists(file_path) else "r+" - insert = content.split("\n") - with open(file_path, mode, encoding="utf-8") as file: - if mode != "w": - all_lines = file.readlines() - new_file = [""] if start_line == 0 else all_lines[:start_line] - new_file += [i + "\n" for i in insert] - last_line = end_line + 1 - new_file += [""] if end_line == -1 else all_lines[last_line:] - else: - new_file = insert - - file.seek(0) - file.writelines(new_file) - file.truncate() - obs = f'WRITE OPERATION:\nYou have written to "{file_path}" \ - on these lines: {start_line}:{end_line}.' - return ServiceResponse( - status=ServiceExecStatus.SUCCESS, - content=obs + "".join(new_file), - ) - except Exception as e: - error_message = f"{e.__class__.__name__}: {e}" - return ServiceResponse( - status=ServiceExecStatus.ERROR, - content=error_message, - ) - - -def read_file( - file_path: str, - start_line: int = 0, - end_line: int = -1, -) -> ServiceResponse: - """ - Shows a given file's contents starting from up to . Default: start_line = 0, end_line = -1. By default the whole file will be read. - - Args: - file_path (`str`): The path to the file to read. - start_line (`Optional[int]`, defaults to `0`): The start line of the file to be read. - end_line (`Optional[int]`, defaults to `-1`): The end line of the file to be read. - """ # noqa - start_line = max(start_line, 0) - try: - with open(file_path, "r", encoding="utf-8") as file: - if end_line == -1: - if start_line == 0: - code_view = file.read() - else: - all_lines = file.readlines() - code_slice = all_lines[start_line:] - code_view = "".join(code_slice) - else: - all_lines = file.readlines() - num_lines = len(all_lines) - begin = max(0, min(start_line, num_lines - 2)) - end_line = ( - -1 if end_line > num_lines else max(begin + 1, end_line) - ) - code_slice = all_lines[begin:end_line] - code_view = "".join(code_slice) - return ServiceResponse( - status=ServiceExecStatus.SUCCESS, - content=f"{code_view}", - ) - except Exception as e: - error_message = f"{e.__class__.__name__}: {e}" - return ServiceResponse( - status=ServiceExecStatus.ERROR, - content=error_message, - )