diff --git a/merico/github/code_task_summary/command.py b/merico/github/code_task_summary/command.py
index 4813183..3d793bc 100644
--- a/merico/github/code_task_summary/command.py
+++ b/merico/github/code_task_summary/command.py
@@ -1,25 +1,36 @@
import json
-import sys
import os
+import sys
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
-from common_util import ui_edit, assert_exit # noqa: E402
-from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url
+from common_util import assert_exit, ui_edit # noqa: E402
+from git_api import (
+ check_git_installed,
+ get_current_branch,
+ get_github_repo,
+ get_issue_info,
+ is_issue_url,
+ read_issue_by_url,
+)
+
def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None
+
# Function to generate a random branch name
PROMPT = (
"You are a coding engineer, required to summarize the ISSUE description into a coding task description of no more than 50 words. \n"
"The ISSUE description is as follows: {issue_body}, please summarize the corresponding coding task description.\n"
- "The coding task description should be output in JSON format, in the form of: {{\"summary\": \"code task summary\"}}\n"
+ 'The coding task description should be output in JSON format, in the form of: {{"summary": "code task summary"}}\n'
)
+
+
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_code_task_summary(issue_body):
pass
@@ -34,45 +45,56 @@ def get_issue_or_task(task):
if is_issue_url(task):
issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1)
-
+
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else:
return task
+
def get_issue_json(issue_id, task):
issue = {"id": "no issue id", "title": "", "body": task}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ issue = {
+ "id": issue_id,
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
return issue
+
# Main function
def main():
print("Start update code task summary ...", end="\n\n", flush=True)
-
+
is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
-
+
task = sys.argv[1]
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
-
+
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
-
+
issue = get_issue_json(issue_id, task)
assert_exit(not issue["body"], "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
# Generate 5 branch names
print("Generating code task summary ...", end="\n\n", flush=True)
- code_task_summary = generate_code_task_summary(issue_body = issue["body"])
+ code_task_summary = generate_code_task_summary(issue_body=issue["body"])
assert_exit(not code_task_summary, "Failed to generate code task summary.", exit_code=-1)
- assert_exit(not code_task_summary.get("summary", None), "Failed to generate code task summary, missing summary field in result.", exit_code=-1)
+ assert_exit(
+ not code_task_summary.get("summary", None),
+ "Failed to generate code task summary, missing summary field in result.",
+ exit_code=-1,
+ )
code_task_summary = code_task_summary["summary"]
# Select branch name
@@ -81,7 +103,7 @@ def main():
code_task_summary = code_task_summary[0]
# create and checkout branch
- print(f"Updating code task summary to config:")
+ print("Updating code task summary to config:")
config_file = os.path.join(".chat", "complete.config")
if os.path.exists(config_file):
with open(config_file, "r") as f:
@@ -93,6 +115,6 @@ def main():
json.dump(config, f, indent=4)
print("Code task summary has updated")
+
if __name__ == "__main__":
main()
-
diff --git a/merico/github/commit/commit.py b/merico/github/commit/commit.py
index bcba0fb..10ecc2d 100644
--- a/merico/github/commit/commit.py
+++ b/merico/github/commit/commit.py
@@ -4,16 +4,16 @@
import subprocess
import sys
+from devchat.llm import chat_completion_stream
+
from lib.chatmark import Checkbox, Form, TextEditor
from lib.ide_service import IDEService
-from devchat.llm import chat_completion_stream
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from common_util import assert_exit # noqa: E402
from git_api import get_issue_info
-
diff_too_large_message_en = (
"Commit failed. The modified content is too long "
"and exceeds the model's length limit. "
@@ -264,9 +264,11 @@ def generate_commit_message_base_diff(user_input, diff, issue):
"""
global language
language_prompt = "You must response commit message in chinese。\n" if language == "zh" else ""
- prompt = PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}").replace(
- "{__USER_INPUT__}", f"{user_input + language_prompt}"
- ).replace("{__ISSUE__}", f"{issue}")
+ prompt = (
+ PROMPT_COMMIT_MESSAGE_BY_DIFF_USER_INPUT.replace("{__DIFF__}", f"{diff}")
+ .replace("{__USER_INPUT__}", f"{user_input + language_prompt}")
+ .replace("{__ISSUE__}", f"{issue}")
+ )
model_token_limit_error = (
diff_too_large_message_en if language == "en" else diff_too_large_message_zh
@@ -322,7 +324,12 @@ def get_issue_json(issue_id):
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ issue = {
+ "id": issue_id,
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
return issue
@@ -392,8 +399,6 @@ def main():
if branch_name:
user_input += "\ncurrent repo branch name is:" + branch_name
commit_message = generate_commit_message_base_diff(user_input, diff, issue)
-
-
# TODO
# remove Closes #IssueNumber in commit message
diff --git a/merico/github/common_util.py b/merico/github/common_util.py
index 50d0b10..6e3f2fc 100644
--- a/merico/github/common_util.py
+++ b/merico/github/common_util.py
@@ -1,10 +1,7 @@
-
import functools
import sys
-import os
-
-from lib.chatmark import TextEditor, Form, Radio, Checkbox
+from lib.chatmark import Checkbox, Form, Radio, TextEditor
def create_ui_objs(ui_decls, args):
@@ -24,7 +21,7 @@ def edit_form(uis, args):
ui_objs, editors = create_ui_objs(uis, args)
form = Form(editors)
form.render()
-
+
values = []
for obj in ui_objs:
if isinstance(obj, TextEditor):
@@ -35,7 +32,7 @@ def edit_form(uis, args):
# TODO
pass
return values
-
+
def editor(description):
def decorator_edit(func):
@@ -43,39 +40,39 @@ def decorator_edit(func):
def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1]
return edit_form(uis, args)
-
+
if hasattr(func, "uis"):
wrapper.uis = func.uis
else:
wrapper.uis = []
wrapper.uis.append((TextEditor, description))
return wrapper
+
return decorator_edit
+
def ui_edit(ui_type, description):
def decorator_edit(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
uis = wrapper.uis[::-1]
return edit_form(uis, args)
-
+
if hasattr(func, "uis"):
wrapper.uis = func.uis
else:
wrapper.uis = []
- ui_type_class = {
- "editor": TextEditor,
- "radio": Radio,
- "checkbox": Checkbox
- }[ui_type]
+ ui_type_class = {"editor": TextEditor, "radio": Radio, "checkbox": Checkbox}[ui_type]
wrapper.uis.append((ui_type_class, description))
return wrapper
+
return decorator_edit
-def assert_exit(condition, message, exit_code = -1):
+
+def assert_exit(condition, message, exit_code=-1):
if condition:
if exit_code == 0:
print(message, end="\n\n", flush=True)
else:
print(message, end="\n\n", file=sys.stderr, flush=True)
- sys.exit(exit_code)
\ No newline at end of file
+ sys.exit(exit_code)
diff --git a/merico/github/config/command.py b/merico/github/config/command.py
index 08c8c94..6150cc5 100644
--- a/merico/github/config/command.py
+++ b/merico/github/config/command.py
@@ -1,6 +1,6 @@
-import sys
import json
import os
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
@@ -16,20 +16,22 @@ def read_issue_url():
return config_data["issue_repo"]
return ""
+
def save_issue_url(issue_url):
config_path = os.path.join(os.getcwd(), ".chat", ".workflow_config.json")
# make dirs
os.makedirs(os.path.dirname(config_path), exist_ok=True)
-
+
config_data = {}
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
config_data["issue_repo"] = issue_url
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
+
def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
@@ -39,6 +41,7 @@ def read_github_token():
return config_data["github_token"]
return ""
+
def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@@ -46,15 +49,17 @@ def save_github_token(github_token):
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
-@editor("Please specify the issue's repository, "
- "If the issue is within this repository, no need to specify. "
- "Otherwise, format as: username/repository-name")
+@editor(
+ "Please specify the issue's repository, "
+ "If the issue is within this repository, no need to specify. "
+ "Otherwise, format as: username/repository-name"
+)
@editor("Input your github TOKEN to access github api:")
def edit_issue(issue_url, github_token):
pass
diff --git a/merico/github/git_api.py b/merico/github/git_api.py
index d2db9a3..39800f4 100644
--- a/merico/github/git_api.py
+++ b/merico/github/git_api.py
@@ -1,10 +1,10 @@
-
-import subprocess
-import requests
-import time
import json
import os
+import subprocess
import sys
+import time
+
+import requests
from lib.chatmark import TextEditor
@@ -18,10 +18,7 @@ def read_github_token():
return config_data["github_token"]
# ask user to input github token
- server_access_token_editor = TextEditor(
- "",
- f"Please input your GITHUB access TOKEN to access:"
- )
+ server_access_token_editor = TextEditor("", "Please input your GITHUB access TOKEN to access:")
server_access_token_editor.render()
server_access_token = server_access_token_editor.new_text
@@ -54,10 +51,11 @@ def create_issue(title, body):
print(f"Failed to create issue: {response.content}", file=sys.stderr, end="\n\n")
return None
+
def update_issue_body(issue_url, issue_body):
"""
Update the body text of a GitHub issue.
-
+
:param issue_url: The API URL of the issue to update.
:param issue_body: The new body text for the issue.
"""
@@ -69,11 +67,10 @@ def update_issue_body(issue_url, issue_body):
"body": issue_body,
}
-
issue_api_url = f"https://api.github.com/repos/{get_github_repo(True)}/issues"
api_url = f"{issue_api_url}/{issue_url.split('/')[-1]}"
response = requests.patch(api_url, headers=headers, data=json.dumps(data))
-
+
if response.status_code == 200:
print("Issue updated successfully!")
return response.json()
@@ -81,6 +78,7 @@ def update_issue_body(issue_url, issue_body):
print(f"Failed to update issue: {response.status_code}")
return None
+
# parse sub tasks in issue body
def parse_sub_tasks(body):
sub_tasks = []
@@ -90,31 +88,31 @@ def parse_sub_tasks(body):
sub_tasks.append(line[2:])
return sub_tasks
+
def update_sub_tasks(body, tasks):
# remove all existing tasks
lines = body.split("\n")
updated_body = "\n".join(line for line in lines if not line.startswith("- ["))
-
+
# add new tasks
updated_body += "\n" + "\n".join(f"- {task}" for task in tasks)
-
+
return updated_body
+
def update_task_issue_url(body, task, issue_url):
- # task is like:
- #[ ] task name
- #[x] task name
+ # task is like:
+ # [ ] task name
+ # [x] task name
# replace task name with issue url, like:
- #[ ] [task name](url)
- #[x] [task name](url)
- if task.find('] ') == -1:
+ # [ ] [task name](url)
+ # [x] [task name](url)
+ if task.find("] ") == -1:
return None
- task = task[task.find('] ')+2 : ]
+ task = task[task.find("] ") + 2 :]
return body.replace(task, f"[{task}]({issue_url})")
-
-
def check_git_installed():
"""
Check if Git is installed on the local machine.
@@ -133,6 +131,7 @@ def check_git_installed():
print("Git is not installed on your system.")
return False
+
def create_and_checkout_branch(branch_name):
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
@@ -169,7 +168,13 @@ def get_github_repo(issue_repo=False):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
if "issue_repo" in config_data:
- print("current issue repo:", config_data["issue_repo"], end="\n\n", file=sys.stderr, flush=True)
+ print(
+ "current issue repo:",
+ config_data["issue_repo"],
+ end="\n\n",
+ file=sys.stderr,
+ flush=True,
+ )
return config_data["issue_repo"]
# 使用git命令获取当前仓库的URL
@@ -194,6 +199,7 @@ def get_github_repo(issue_repo=False):
print("==> File not found...")
return None
+
# 获取当前分支名称
def get_current_branch():
try:
@@ -229,7 +235,8 @@ def get_parent_branch():
return None
# 使用git命令获取父分支的名称
result = subprocess.check_output(
- ["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref], stderr=subprocess.STDOUT
+ ["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
+ stderr=subprocess.STDOUT,
).strip()
parent_branch_name = result.decode("utf-8")
return parent_branch_name
@@ -260,12 +267,14 @@ def get_issue_info(issue_id):
else:
return None
+
def get_issue_info_by_url(issue_url):
# get issue id from issue_url
def get_issue_id(issue_url):
# Extract the issue id from the issue_url
issue_id = issue_url.split("/")[-1]
return issue_id
+
return get_issue_info(get_issue_id(issue_url))
@@ -315,7 +324,6 @@ def create_pull_request(title, body, head, base, repo_name):
return None
-
def run_command_with_retries(command, retries=3, delay=5):
for attempt in range(retries):
try:
@@ -341,12 +349,17 @@ def check_unpushed_commits():
print(f"Error checking for unpushed commits: {e}")
return True
+
def auto_push():
# 获取当前分支名
if not check_unpushed_commits():
return True
try:
- branch = subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip().decode("utf-8")
+ branch = (
+ subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
+ .strip()
+ .decode("utf-8")
+ )
except subprocess.CalledProcessError as e:
print(f"Error getting current branch: {e}")
return False
@@ -366,21 +379,25 @@ def auto_push():
def get_recently_pr(repo):
url = f"{GITHUB_API_URL}/repos/{repo}/pulls?state=open&sort=updated"
- headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Accept": "application/vnd.github.v3+json"}
+ headers = {
+ "Authorization": f"token {GITHUB_ACCESS_TOKEN}",
+ "Accept": "application/vnd.github.v3+json",
+ }
response = requests.get(url, headers=headers)
print("=>:", url)
-
+
branch_name = get_current_branch()
if response.status_code == 200:
prs = response.json()
for pr in prs:
- if pr['head']['ref'] == branch_name:
+ if pr["head"]["ref"] == branch_name:
return pr
return None
else:
return None
-
+
+
def update_pr(pr_number, title, body, repo_name):
url = f"{GITHUB_API_URL}/repos/{repo_name}/pulls/{pr_number}"
headers = {"Authorization": f"token {GITHUB_ACCESS_TOKEN}", "Content-Type": "application/json"}
@@ -393,7 +410,3 @@ def update_pr(pr_number, title, body, repo_name):
else:
print("Failed to update PR.")
return None
-
-
-
-
diff --git a/merico/github/list_issue_tasks/command.py b/merico/github/list_issue_tasks/command.py
index 44c8e10..9b52972 100644
--- a/merico/github/list_issue_tasks/command.py
+++ b/merico/github/list_issue_tasks/command.py
@@ -1,25 +1,27 @@
-import sys
import os
+import sys
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
+from common_util import assert_exit, editor # noqa: E402
from git_api import create_issue # noqa: E402
-from common_util import editor, assert_exit # noqa: E402
-
# Function to generate issue title and body using LLM
-PROMPT = ("Based on the following description, "
- "suggest a title and a detailed body for a GitHub issue:\n\n"
- "Description: {description}\n\n"
- "Output format: {{\"title\": \"
\", \"body\": \"\"}} ")
+PROMPT = (
+ "Based on the following description, "
+ "suggest a title and a detailed body for a GitHub issue:\n\n"
+ "Description: {description}\n\n"
+ 'Output format: {{"title": "", "body": ""}} '
+)
+
+
@chat_json(prompt=PROMPT)
def generate_issue_content(description):
pass
-
@editor("Edit issue title:")
@editor("Edit issue body:")
def edit_issue(title, body):
@@ -29,14 +31,14 @@ def edit_issue(title, body):
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
-
+
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1]
-
+
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
-
+
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@@ -48,4 +50,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/merico/github/new_branch/command.py b/merico/github/new_branch/command.py
index 8c6c9d7..ea9f084 100644
--- a/merico/github/new_branch/command.py
+++ b/merico/github/new_branch/command.py
@@ -1,14 +1,18 @@
import json
-import sys
import os
+import sys
from devchat.llm import chat_json
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
-from common_util import ui_edit, assert_exit # noqa: E402
-from git_api import check_git_installed, create_and_checkout_branch, is_issue_url, read_issue_by_url # noqa: E402
-
+from common_util import assert_exit, ui_edit # noqa: E402
+from git_api import ( # noqa: E402
+ check_git_installed,
+ create_and_checkout_branch,
+ is_issue_url,
+ read_issue_by_url,
+)
# Function to generate a random branch name
PROMPT = (
@@ -20,6 +24,8 @@
"the final result is output in JSON format, "
'as follows: {{"names":["name1", "name2", .. "name5"]}}\n'
)
+
+
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_branch_name(task):
pass
@@ -29,6 +35,7 @@ def generate_branch_name(task):
def select_branch_name_ui(branch_names):
pass
+
def select_branch_name(branch_names):
[branch_selection] = select_branch_name_ui(branch_names)
assert_exit(branch_selection is None, "No branch selected.", exit_code=0)
@@ -39,7 +46,7 @@ def get_issue_or_task(task):
if is_issue_url(task):
issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1)
-
+
return json.dumps({"id": issue["number"], "title": issue["title"], "body": issue["body"]})
else:
return task
@@ -48,19 +55,23 @@ def get_issue_or_task(task):
# Main function
def main():
print("Start create branch ...", end="\n\n", flush=True)
-
+
is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)
-
+
task = sys.argv[1]
- assert_exit(not task, "You need input something about the new branch, or input a issue url.", exit_code=-1)
-
+ assert_exit(
+ not task,
+ "You need input something about the new branch, or input a issue url.",
+ exit_code=-1,
+ )
+
# read issue by url
task = get_issue_or_task(task)
# Generate 5 branch names
print("Generating branch names ...", end="\n\n", flush=True)
- branch_names = generate_branch_name(task = task)
+ branch_names = generate_branch_name(task=task)
assert_exit(not branch_names, "Failed to generate branch names.", exit_code=-1)
branch_names = branch_names["names"]
@@ -75,4 +86,3 @@ def main():
if __name__ == "__main__":
main()
-
diff --git a/merico/github/new_issue/command.py b/merico/github/new_issue/command.py
index f92c5b9..e1b95a7 100644
--- a/merico/github/new_issue/command.py
+++ b/merico/github/new_issue/command.py
@@ -1,18 +1,21 @@
-import sys
import os
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
+from common_util import assert_exit, editor # noqa: E402
from devchat.llm import chat_json # noqa: E402
from git_api import create_issue # noqa: E402
-from common_util import editor, assert_exit # noqa: E402
-
# Function to generate issue title and body using LLM
-PROMPT = ("Based on the following description, "
- "suggest a title and a detailed body for a GitHub issue:\n\n"
- "Description: {description}\n\n"
- "Output as valid JSON format: {{\"title\": \"\", \"body\": \" use \\n as new line flag.\"}} ")
+PROMPT = (
+ "Based on the following description, "
+ "suggest a title and a detailed body for a GitHub issue:\n\n"
+ "Description: {description}\n\n"
+ 'Output as valid JSON format: {{"title": "", "body": " use \\n as new line flag."}} '
+)
+
+
@chat_json(prompt=PROMPT)
def generate_issue_content(description):
pass
@@ -27,14 +30,14 @@ def edit_issue(title, body):
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
-
+
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
description = sys.argv[1]
-
+
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(description=description)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
-
+
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@@ -46,4 +49,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/merico/github/new_issue/from_task/command.py b/merico/github/new_issue/from_task/command.py
index 2a20268..9edee06 100644
--- a/merico/github/new_issue/from_task/command.py
+++ b/merico/github/new_issue/from_task/command.py
@@ -1,12 +1,18 @@
-import sys
import os
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".."))
+# noqa: E402
+from common_util import assert_exit, editor, ui_edit # noqa: E402
from devchat.llm import chat_json # noqa: E402
-from git_api import create_issue, parse_sub_tasks, update_task_issue_url, update_issue_body, get_issue_info_by_url # noqa: E402
-from common_util import editor, assert_exit, ui_edit # noqa: E402
-
+from git_api import (
+ create_issue,
+ get_issue_info_by_url,
+ parse_sub_tasks,
+ update_issue_body,
+ update_task_issue_url,
+)
# Function to generate issue title and body using LLM
PROMPT = (
@@ -14,7 +20,10 @@
"{issue_content}\n\n"
"Based on the following issue task: {task}"
"suggest a title and a detailed body for a GitHub issue:\n\n"
- "Output format: {{\"title\": \"\", \"body\": \"\"}} ")
+ 'Output format: {{"title": "", "body": ""}} '
+)
+
+
@chat_json(prompt=PROMPT)
def generate_issue_content(issue_content, task):
pass
@@ -30,16 +39,22 @@ def edit_issue(title, body):
def select_task(tasks):
pass
+
def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ return {
+ "id": issue["number"],
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
# Main function
def main():
print("start new_issue ...", end="\n\n", flush=True)
-
+
assert_exit(len(sys.argv) < 2, "Missing argument.", exit_code=-1)
issue_url = sys.argv[1]
@@ -48,17 +63,16 @@ def main():
tasks = parse_sub_tasks(old_issue["body"])
assert_exit(not tasks, "No tasks in issue body.")
-
# select task from tasks
[task] = select_task(tasks)
assert_exit(task is None, "No task selected.")
task = tasks[task]
print("task:", task, end="\n\n", flush=True)
-
+
print("Generating issue content ...", end="\n\n", flush=True)
issue_json_ob = generate_issue_content(issue_content=old_issue, task=task)
assert_exit(not issue_json_ob, "Failed to generate issue content.", exit_code=-1)
-
+
issue_title, issue_body = edit_issue(issue_json_ob["title"], issue_json_ob["body"])
assert_exit(not issue_title, "Issue creation cancelled.", exit_code=0)
print("New Issue:", issue_title, "body:", issue_body, end="\n\n", flush=True)
@@ -78,4 +92,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/merico/github/new_pr/command.py b/merico/github/new_pr/command.py
index 1e22c64..bb9c5a7 100644
--- a/merico/github/new_pr/command.py
+++ b/merico/github/new_pr/command.py
@@ -1,18 +1,20 @@
-import subprocess
-import sys
-import requests
import json
import os
-import time
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
-from lib.chatmark import TextEditor, Form # noqa: E402
-from devchat.llm import chat_json # noqa: E402
-from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, is_issue_url, read_issue_by_url
from common_util import assert_exit, ui_edit # noqa: E402
-
+from devchat.llm import chat_json # noqa: E402
+from git_api import (
+ auto_push,
+ create_pull_request,
+ get_commit_messages,
+ get_current_branch,
+ get_github_repo,
+ get_issue_info,
+)
BASH_BRANCH = "main"
@@ -23,6 +25,7 @@ def extract_issue_id(branch_name):
return branch_name.split("#")[-1]
return None
+
# 使用LLM模型生成PR内容
PROMPT = (
"Create a pull request title and body based on "
@@ -34,12 +37,17 @@ def extract_issue_id(branch_name):
"The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}'
)
+
+
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_message, user_input):
pass
+
def generate_pr_content(issue, commit_messages, user_input):
- response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages, user_input = user_input)
+ response = generate_pr_content_llm(
+ issue=json.dumps(issue), commit_messages=commit_messages, user_input=user_input
+ )
assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body")
@@ -55,40 +63,45 @@ def get_issue_json(issue_id):
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ issue = {
+ "id": issue_id,
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
return issue
-
+
# 主函数
def main():
print("start new_pr ...", end="\n\n", flush=True)
-
+
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
-
+
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
-
+
issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH)
-
+
print("generating pr title and body ...", end="\n\n", flush=True)
user_input = sys.argv[1]
pr_title, pr_body = generate_pr_content(issue, commit_messages, user_input)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
-
+
pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
-
+
pr = create_pull_request(pr_title, pr_body, branch_name, BASH_BRANCH, repo_name)
assert_exit(not pr, "Failed to create PR.", exit_code=-1)
-
+
print(f"PR created successfully: {pr['html_url']}")
diff --git a/merico/github/update_issue_tasks/command.py b/merico/github/update_issue_tasks/command.py
index 7e835d9..b1d6dbf 100644
--- a/merico/github/update_issue_tasks/command.py
+++ b/merico/github/update_issue_tasks/command.py
@@ -1,12 +1,16 @@
-import sys
import os
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
+from common_util import assert_exit, editor # noqa: E402
from devchat.llm import chat_json # noqa: E402
-from git_api import get_issue_info_by_url, parse_sub_tasks, update_sub_tasks, update_issue_body # noqa: E402
-from common_util import editor, assert_exit # noqa: E402
-
+from git_api import ( # noqa: E402
+ get_issue_info_by_url,
+ parse_sub_tasks,
+ update_issue_body,
+ update_sub_tasks,
+)
TASKS_PROMPT = (
"Following is my git issue content.\n"
@@ -18,10 +22,13 @@
"Please output all tasks in JSON format as:"
'{{"tasks": ["[ ] task1", "[ ] task2"]}}'
)
+
+
@chat_json(prompt=TASKS_PROMPT)
def generate_issue_tasks(issue_data, user_input):
pass
+
def to_task_str(tasks):
task_str = ""
for task in tasks:
@@ -34,24 +41,32 @@ def to_task_str(tasks):
def edit_issue_tasks(old_tasks, new_tasks):
pass
+
@editor("Input ISSUE url:")
def input_issue_url(url):
pass
+
@editor("How to update tasks:")
def update_tasks_input(user_input):
pass
+
def get_issue_json(issue_url):
issue = get_issue_info_by_url(issue_url)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- return {"id": issue["number"], "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ return {
+ "id": issue["number"],
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
# Main function
def main():
print("start issue tasks update ...", end="\n\n", flush=True)
-
+
[issue_url] = input_issue_url("")
assert_exit(not issue_url, "No issue url.")
print("issue url:", issue_url, end="\n\n", flush=True)
@@ -64,7 +79,7 @@ def main():
[user_input] = update_tasks_input("")
assert_exit(not user_input, "No user input")
- new_tasks = generate_issue_tasks(issue_data=issue, user_input=user_input)
+ new_tasks = generate_issue_tasks(issue_data=issue, user_input=user_input)
assert_exit(not new_tasks, "No new tasks.")
print("new_tasks:", new_tasks, end="\n\n", flush=True)
assert_exit(not new_tasks.get("tasks", []), "No new tasks.")
@@ -83,4 +98,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/merico/github/update_pr/command.py b/merico/github/update_pr/command.py
index da9f022..7cf2b15 100644
--- a/merico/github/update_pr/command.py
+++ b/merico/github/update_pr/command.py
@@ -1,24 +1,27 @@
-import subprocess
-import sys
-import requests
import json
import os
-import time
+import sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
-from lib.chatmark import TextEditor, Form # noqa: E402
-from devchat.llm import chat_json # noqa: E402
-from git_api import auto_push, check_git_installed, create_pull_request, get_commit_messages, get_current_branch, get_github_repo, get_issue_info, get_recently_pr, is_issue_url, read_issue_by_url, update_pr
from common_util import assert_exit, ui_edit # noqa: E402
-
-from lib.chatmark import TextEditor, Form # noqa: E402
-from devchat.llm import chat_completion_no_stream_return_json # noqa: E402
-
+from devchat.llm import (
+ chat_json, # noqa: E402
+)
+from git_api import (
+ auto_push,
+ get_commit_messages,
+ get_current_branch,
+ get_github_repo,
+ get_issue_info,
+ get_recently_pr,
+ update_pr,
+)
BASH_BRANCH = "main"
+
# 从分支名称中提取issue id
def extract_issue_id(branch_name):
if "#" in branch_name:
@@ -36,12 +39,15 @@ def extract_issue_id(branch_name):
"The response result should format as JSON object as following:\n"
'{{"title": "pr title", "body": "pr body"}}'
)
+
+
@chat_json(prompt=PROMPT, model="gpt-4-1106-preview")
def generate_pr_content_llm(issue, commit_messages):
pass
+
def generate_pr_content(issue, commit_messages):
- response = generate_pr_content_llm(issue = json.dumps(issue), commit_messages = commit_messages)
+ response = generate_pr_content_llm(issue=json.dumps(issue), commit_messages=commit_messages)
assert_exit(not response, "Failed to generate PR content.", exit_code=-1)
return response.get("title"), response.get("body")
@@ -57,42 +63,47 @@ def get_issue_json(issue_id):
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, "Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
- issue = {"id": issue_id, "html_url": issue["html_url"], "title": issue["title"], "body": issue["body"]}
+ issue = {
+ "id": issue_id,
+ "html_url": issue["html_url"],
+ "title": issue["title"],
+ "body": issue["body"],
+ }
return issue
# 主函数
def main():
print("start update_pr ...", end="\n\n", flush=True)
-
+
repo_name = get_github_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)
-
+
# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")
-
+
issue = get_issue_json(issue_id)
commit_messages = get_commit_messages(BASH_BRANCH)
-
+
recent_pr = get_recently_pr(repo_name)
assert_exit(not recent_pr, "Failed to get recent PR.", exit_code=-1)
-
+
print("generating pr title and body ...", end="\n\n", flush=True)
pr_title, pr_body = generate_pr_content(issue, commit_messages)
assert_exit(not pr_title, "Failed to generate PR content.", exit_code=-1)
-
+
pr_title, pr_body = edit_pr(pr_title, pr_body)
assert_exit(not pr_title, "PR creation cancelled.", exit_code=0)
is_push_success = auto_push()
assert_exit(not is_push_success, "Failed to push changes.", exit_code=-1)
- pr = update_pr(recent_pr['number'], pr_title, pr_body, repo_name)
+ pr = update_pr(recent_pr["number"], pr_title, pr_body, repo_name)
assert_exit(not pr, "Failed to update PR.", exit_code=-1)
-
+
print(f"PR updated successfully: {pr['html_url']}")
diff --git a/merico/pr/command.py b/merico/pr/command.py
index 1ddfcac..5ae28ee 100644
--- a/merico/pr/command.py
+++ b/merico/pr/command.py
@@ -2,21 +2,20 @@
/pr.describe https://github.com/devchat-ai/devchat-vscode/pull/25
"""
+import logging
import os
import sys
-import logging
-import json
-import argparse
-import asyncio
+
+# add the current directory to the path
+from os.path import abspath, dirname
from lib.ide_service import IDEService
-# add the current directory to the path
-from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__))))
# add new model configs to algo.MAX_TOKENS
import pr_agent.algo as algo
+
algo.MAX_TOKENS["gpt-4-turbo-preview"] = 128000
algo.MAX_TOKENS["claude-3-opus"] = 100000
algo.MAX_TOKENS["claude-3-sonnet"] = 100000
@@ -44,49 +43,68 @@
algo.MAX_TOKENS["bert-base-uncased"] = 512
if os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106") not in algo.MAX_TOKENS:
current_model = os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")
- IDEService().ide_logging("info", f"{current_model}'s max tokens is not config, we use it as default 16000")
+ IDEService().ide_logging(
+ "info", f"{current_model}'s max tokens is not config, we use it as default 16000"
+ )
algo.MAX_TOKENS[os.environ.get("LLM_MODEL", "gpt-3.5-turbo-1106")] = 16000
+
# add new git provider
def get_git_provider():
from pr_agent.config_loader import get_settings
+
_git_provider_old_ = get_settings().config.git_provider
get_settings().config.git_provider = "devchat"
provider = _get_git_provider_old()
get_settings().config.git_provider = _git_provider_old_
return provider
+
import pr_agent.git_providers as git_providers
from providers.devchat_provider import DevChatProvider
-git_providers._GIT_PROVIDERS['devchat'] = DevChatProvider
+
+git_providers._GIT_PROVIDERS["devchat"] = DevChatProvider
_get_git_provider_old = git_providers.get_git_provider
git_providers.get_git_provider = get_git_provider
-from pr_agent.config_loader import get_settings
from pr_agent.cli import run
+from pr_agent.config_loader import get_settings
# mock logging method, to redirect log to IDE
-from pr_agent.log import setup_logger, inv_analytics_filter
+from pr_agent.log import inv_analytics_filter, setup_logger
+
from lib.ide_service import IDEService
+
+
class CustomOutput:
def __init__(self):
pass
+
def write(self, message):
IDEService().ide_logging("info", message.strip())
+
def flush(self):
pass
+
def close(self):
pass
+
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger = setup_logger(log_level)
logger.remove(None)
-logger.add(CustomOutput(), level=logging.INFO, format="{message}", colorize=False, filter=inv_analytics_filter)
+logger.add(
+ CustomOutput(),
+ level=logging.INFO,
+ format="{message}",
+ colorize=False,
+ filter=inv_analytics_filter,
+)
-from config_util import read_server_access_token_with_input, get_repo_type
+from config_util import get_repo_type, read_server_access_token_with_input
from custom_suggestions_config import get_custom_suggestions_system_prompt
# set openai key and api base
@@ -107,7 +125,12 @@ def close(self):
elif repo_type == "gitlab":
get_settings().set("GITLAB.PERSONAL_ACCESS_TOKEN", access_token)
else:
- print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True)
+ print(
+ "Unsupported git hosting service, input pr url is:",
+ sys.argv[1],
+ file=sys.stderr,
+ flush=True,
+ )
sys.exit(1)
@@ -143,7 +166,7 @@ def close(self):
get_settings().pr_code_suggestions_prompt.system += language_prompt
get_settings().pr_review_prompt.system += language_prompt
get_settings().pr_description_prompt.system += language_prompt
-#get_settings().pr_reviewer.inline_code_comments = True
+# get_settings().pr_reviewer.inline_code_comments = True
# config for find similar issues
get_settings().set("PR_SIMILAR_ISSUE.VECTORDB", "lancedb")
@@ -152,17 +175,17 @@ def close(self):
# set git provider type, devchat provider will create actual repo provider based on this type
pr_provider_type = get_repo_type(sys.argv[1])
if not pr_provider_type:
- print("Unsupported git hosting service, input pr url is:", sys.argv[1], file=sys.stderr, flush=True)
+ print(
+ "Unsupported git hosting service, input pr url is:",
+ sys.argv[1],
+ file=sys.stderr,
+ flush=True,
+ )
sys.exit(1)
get_settings().set("CONFIG.GIT_PROVIDER", pr_provider_type)
-os.environ['CONFIG.GIT_PROVIDER_TYPE'] = pr_provider_type
+os.environ["CONFIG.GIT_PROVIDER_TYPE"] = pr_provider_type
# os.environ['ENABLE_PUBLISH_LABELS'] = "1"
-if __name__ == '__main__':
- sys.argv = [
- sys.executable,
- '--pr_url',
- sys.argv[1].strip(),
- sys.argv[2].strip()
- ]
+if __name__ == "__main__":
+ sys.argv = [sys.executable, "--pr_url", sys.argv[1].strip(), sys.argv[2].strip()]
run()
diff --git a/merico/pr/config_util.py b/merico/pr/config_util.py
index 8d3fab7..f45228f 100644
--- a/merico/pr/config_util.py
+++ b/merico/pr/config_util.py
@@ -1,6 +1,5 @@
-import os
import json
-import sys
+import os
from lib.chatmark import TextEditor
@@ -26,6 +25,7 @@ def get_repo_type(url):
else:
return ""
+
def read_github_token():
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
@@ -35,6 +35,7 @@ def read_github_token():
return config_data["github_token"]
return ""
+
def read_server_access_token(repo_type):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
if os.path.exists(config_path):
@@ -44,6 +45,7 @@ def read_server_access_token(repo_type):
return config_data[repo_type]["access_token"]
return ""
+
def save_github_token(github_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@@ -51,11 +53,12 @@ def save_github_token(github_token):
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
config_data["github_token"] = github_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
+
def save_server_access_token(repo_type, access_token):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@@ -63,27 +66,26 @@ def save_server_access_token(repo_type, access_token):
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
if repo_type not in config_data:
config_data[repo_type] = {}
config_data[repo_type]["access_token"] = access_token
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
+
def read_github_token_with_input():
github_token = read_github_token()
if not github_token:
# Input your github TOKEN to access github api:
- github_token_editor = TextEditor(
- "",
- "Please input your github TOKEN to access:"
- )
+ github_token_editor = TextEditor("", "Please input your github TOKEN to access:")
github_token = github_token_editor.new_text
if not github_token:
return github_token
save_github_token(github_token)
return github_token
+
def read_server_access_token_with_input(pr_url):
repo_type = get_repo_type(pr_url)
if not repo_type:
@@ -93,8 +95,7 @@ def read_server_access_token_with_input(pr_url):
if not server_access_token:
# Input your server access TOKEN to access server api:
server_access_token_editor = TextEditor(
- "",
- f"Please input your {repo_type} access TOKEN to access:"
+ "", f"Please input your {repo_type} access TOKEN to access:"
)
server_access_token_editor.render()
diff --git a/merico/pr/custom_suggestions/config/command.py b/merico/pr/custom_suggestions/config/command.py
index 83ba70c..abf58f8 100644
--- a/merico/pr/custom_suggestions/config/command.py
+++ b/merico/pr/custom_suggestions/config/command.py
@@ -1,5 +1,5 @@
-import os
import json
+import os
from lib.chatmark import TextEditor
@@ -13,6 +13,7 @@ def read_custom_suggestions():
return config_data["custom_suggestions"]
return ""
+
def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@@ -20,7 +21,7 @@ def save_custom_suggestions(custom_suggestions):
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
@@ -30,19 +31,19 @@ def config_custom_suggestions_with():
custom_suggestions = read_custom_suggestions()
if not custom_suggestions:
custom_suggestions = "- make sure the code is efficient\n"
-
+
# Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor(
- custom_suggestions,
- "Please input your custom suggestions:"
+ custom_suggestions, "Please input your custom suggestions:"
)
custom_suggestions_editor.render()
custom_suggestions = custom_suggestions_editor.new_text
if not custom_suggestions:
- return
-
+ return
+
save_custom_suggestions(custom_suggestions)
+
if __name__ == "__main__":
config_custom_suggestions_with()
diff --git a/merico/pr/custom_suggestions_config.py b/merico/pr/custom_suggestions_config.py
index 8949d2d..f3e3b43 100644
--- a/merico/pr/custom_suggestions_config.py
+++ b/merico/pr/custom_suggestions_config.py
@@ -1,5 +1,5 @@
-import os
import json
+import os
import sys
from lib.chatmark import TextEditor
@@ -14,6 +14,7 @@ def read_custom_suggestions():
return config_data["custom_suggestions"]
return ""
+
def save_custom_suggestions(custom_suggestions):
config_path = os.path.join(os.path.expanduser("~/.chat"), ".workflow_config.json")
@@ -21,7 +22,7 @@ def save_custom_suggestions(custom_suggestions):
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config_data = json.load(f)
-
+
config_data["custom_suggestions"] = custom_suggestions
with open(config_path, "w+", encoding="utf-8") as f:
json.dump(config_data, f, indent=4)
@@ -32,8 +33,7 @@ def read_custom_suggestions_with_input():
if not custom_suggestions:
# Input your github TOKEN to access github api:
custom_suggestions_editor = TextEditor(
- "- make sure the code is efficient\n",
- "Please input your custom suggestions:"
+ "- make sure the code is efficient\n", "Please input your custom suggestions:"
)
custom_suggestions_editor.render()
@@ -50,7 +50,8 @@ def get_custom_suggestions_system_prompt():
print("Command has been canceled.", flush=True)
sys.exit(0)
- system_prompt = """You are PR-Reviewer, a language model that specializes in suggesting ways to improve for a Pull Request (PR) code.
+ system_prompt = (
+ """You are PR-Reviewer, a language model that specializes in suggesting ways to improve for a Pull Request (PR) code.
Your task is to provide meaningful and actionable code suggestions, to improve the new code presented in a PR diff.
@@ -96,7 +97,9 @@ def get_custom_suggestions_system_prompt():
Instructions from the user, that should be taken into account with high priority:
-""" + custom_suggestions + """
+"""
+ + custom_suggestions
+ + """
{%- if extra_instructions %}
@@ -151,4 +154,5 @@ class PRCodeSuggestions(BaseModel):
Each YAML output MUST be after a newline, indented, with block scalar indicator ('|').
"""
+ )
return system_prompt
diff --git a/merico/pr/providers/devchat_provider.py b/merico/pr/providers/devchat_provider.py
index ff326f4..d642961 100644
--- a/merico/pr/providers/devchat_provider.py
+++ b/merico/pr/providers/devchat_provider.py
@@ -1,28 +1,31 @@
+import json
import os
import sys
-import json
from typing import Optional, Tuple
+import pr_agent.git_providers as git_providers
from pr_agent.git_providers.git_provider import GitProvider, IncrementalPR
from pr_agent.git_providers.github_provider import GithubProvider
-import pr_agent.git_providers as git_providers
-from lib.chatmark import Form, TextEditor, Button
+from lib.chatmark import Button, Form, TextEditor
+
class DevChatProvider(GitProvider):
def __init__(self, pr_url: Optional[str] = None, incremental=IncrementalPR(False)):
# 根据某个状态,创建正确的GitProvider
- provider_type = os.environ.get('CONFIG.GIT_PROVIDER_TYPE')
- self.provider: GitProvider = git_providers._GIT_PROVIDERS[provider_type](pr_url, incremental)
+ provider_type = os.environ.get("CONFIG.GIT_PROVIDER_TYPE")
+ self.provider: GitProvider = git_providers._GIT_PROVIDERS[provider_type](
+ pr_url, incremental
+ )
@property
def pr(self):
return self.provider.pr
-
+
@property
def diff_files(self):
return self.provider.diff_files
-
+
@property
def github_client(self):
return self.provider.github_client
@@ -32,13 +35,10 @@ def is_supported(self, capability: str) -> bool:
def get_diff_files(self):
return self.provider.get_diff_files()
-
+
def need_edit(self):
button = Button(
- [
- "Commit",
- "Edit"
- ],
+ ["Commit", "Edit"],
)
button.render()
return 1 == button.clicked
@@ -54,9 +54,9 @@ def publish_description(self, pr_title: str, pr_body: str):
# Edit pr title and body
title_editor = TextEditor(pr_title)
body_editor = TextEditor(pr_body)
- form = Form(['Edit pr title:', title_editor, 'Edit pr body:', body_editor])
+ form = Form(["Edit pr title:", title_editor, "Edit pr body:", body_editor])
form.render()
-
+
pr_title = title_editor.new_text
pr_body = body_editor.new_text
if not pr_title or not pr_body:
@@ -68,8 +68,7 @@ def publish_description(self, pr_title: str, pr_body: str):
def publish_code_suggestions(self, code_suggestions: list) -> bool:
code_suggestions_json_str = json.dumps(code_suggestions, indent=4)
code_suggestions_editor = TextEditor(
- code_suggestions_json_str,
- "Edit code suggestions in JSON format:"
+ code_suggestions_json_str, "Edit code suggestions in JSON format:"
)
code_suggestions_editor.render()
@@ -77,7 +76,7 @@ def publish_code_suggestions(self, code_suggestions: list) -> bool:
if not code_suggestions_json_new:
print("Code suggestions are empty, please fill in the code suggestions.")
sys.exit(0)
-
+
code_suggestions = json.loads(code_suggestions_json_new)
return self.provider.publish_code_suggestions(code_suggestions)
@@ -86,7 +85,7 @@ def get_languages(self):
def get_pr_branch(self):
return self.provider.get_pr_branch()
-
+
def get_files(self):
return self.provider.get_files()
@@ -103,10 +102,7 @@ def edit_comment(self, comment, body: str):
print(f"\n\n{body}", end="\n\n", flush=True)
if self.need_edit():
- comment_editor = TextEditor(
- body,
- "Edit Comment:"
- )
+ comment_editor = TextEditor(body, "Edit Comment:")
comment_editor.render()
body = comment_editor.new_text
@@ -138,7 +134,9 @@ def get_repo_settings(self):
def get_pr_id(self):
return self.provider.get_pr_id()
- def get_line_link(self, relevant_file: str, relevant_line_start: int, relevant_line_end: int = None) -> str:
+ def get_line_link(
+ self, relevant_file: str, relevant_line_start: int, relevant_line_end: int = None
+ ) -> str:
return self.provider.get_line_link(relevant_file, relevant_line_start, relevant_line_end)
#### comments operations ####
@@ -148,18 +146,18 @@ def publish_comment(self, pr_comment: str, is_temporary: bool = False):
if pr_comment.find("## Generating PR code suggestions") != -1:
return None
- if (not is_temporary and \
- pr_comment.find("## Generating PR code suggestions") == -1 and \
- pr_comment.find("**[PR Description]") == -1):
+ if (
+ not is_temporary
+ and pr_comment.find("## Generating PR code suggestions") == -1
+ and pr_comment.find("**[PR Description]") == -1
+ ):
print(f"\n\n{pr_comment}", end="\n\n", flush=True)
if self.need_edit():
- pr_comment_editor = TextEditor(
- pr_comment
- )
- form = Form(['Edit pr comment:', pr_comment_editor])
+ pr_comment_editor = TextEditor(pr_comment)
+ form = Form(["Edit pr comment:", pr_comment_editor])
form.render()
-
+
pr_comment = pr_comment_editor.new_text
if not pr_comment:
print("Comment is empty, please fill in the comment.")
@@ -167,19 +165,20 @@ def publish_comment(self, pr_comment: str, is_temporary: bool = False):
return self.provider.publish_comment(pr_comment, is_temporary=is_temporary)
- def publish_persistent_comment(self, pr_comment: str,
- initial_header: str,
- update_header: bool = True,
- name='review',
- final_update_message=True):
+ def publish_persistent_comment(
+ self,
+ pr_comment: str,
+ initial_header: str,
+ update_header: bool = True,
+ name="review",
+ final_update_message=True,
+ ):
print(f"\n\n{initial_header}", end="\n\n", flush=True)
print(pr_comment, end="\n\n", flush=True)
if self.need_edit():
- pr_comment_editor = TextEditor(
- pr_comment
- )
- form = Form(['Edit pr comment:', pr_comment_editor])
+ pr_comment_editor = TextEditor(pr_comment)
+ form = Form(["Edit pr comment:", pr_comment_editor])
form.render()
pr_comment = pr_comment_editor.new_text
@@ -187,14 +186,23 @@ def publish_persistent_comment(self, pr_comment: str,
if not pr_comment:
print("Comment is empty, please fill in the comment.")
sys.exit(0)
- return self.provider.publish_persistent_comment(pr_comment, initial_header, update_header, name, final_update_message)
+ return self.provider.publish_persistent_comment(
+ pr_comment, initial_header, update_header, name, final_update_message
+ )
def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str):
return self.provider.publish_inline_comment(body, relevant_file, relevant_line_in_file)
- def create_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str,
- absolute_position: int = None):
- return self.provider.create_inline_comment(body, relevant_file, relevant_line_in_file, absolute_position)
+ def create_inline_comment(
+ self,
+ body: str,
+ relevant_file: str,
+ relevant_line_in_file: str,
+ absolute_position: int = None,
+ ):
+ return self.provider.create_inline_comment(
+ body, relevant_file, relevant_line_in_file, absolute_position
+ )
def publish_inline_comments(self, comments: list[dict]):
return self.provider.publish_inline_comments(comments)
@@ -213,7 +221,7 @@ def get_comment_url(self, comment) -> str:
#### labels operations ####
def publish_labels(self, labels):
- if not os.environ.get('ENABLE_PUBLISH_LABELS', None):
+ if not os.environ.get("ENABLE_PUBLISH_LABELS", None):
return None
return self.provider.publish_labels(labels)
@@ -247,7 +255,7 @@ def calc_pr_statistics(self, pull_request_data: dict):
def get_num_of_files(self):
return self.provider.get_num_of_files()
-
+
@staticmethod
def _parse_issue_url(issue_url: str) -> Tuple[str, int]:
return GithubProvider._parse_issue_url(issue_url)