Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace subprocess calls with subprocess_check_output #131

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions merico/github/commit/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))

from common_util import assert_exit # noqa: E402
from git_api import get_issue_info
from git_api import get_issue_info, subprocess_check_output, subprocess_run

diff_too_large_message_en = (
"Commit failed. The modified content is too long "
Expand Down Expand Up @@ -137,7 +137,7 @@ def get_modified_files():
tuple: 包含两个list的元组,第一个list包含当前修改过的文件,第二个list包含已经staged的文件
"""
""" 获取当前修改文件列表以及已经staged的文件列表"""
output = subprocess.check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
output = subprocess_check_output(["git", "status", "-s", "-u"], text=True, encoding="utf-8")
lines = output.split("\n")
modified_files = []
staged_files = []
Expand Down Expand Up @@ -216,10 +216,10 @@ def rebuild_stage_list(user_files):

"""
# Unstage all files
subprocess.check_output(["git", "reset"])
subprocess_check_output(["git", "reset"])
# Stage all user_files
for file in user_files:
os.system(f'git add "{file}"')
subprocess_run(["git", "add", file])


def get_diff():
Expand All @@ -233,13 +233,13 @@ def get_diff():
bytes: 返回bytes类型,是git diff --cached命令的输出结果

"""
return subprocess.check_output(["git", "diff", "--cached"])
return subprocess_check_output(["git", "diff", "--cached"])


def get_current_branch():
try:
# 使用git命令获取当前分支名称
result = subprocess.check_output(
result = subprocess_check_output(
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
Expand Down Expand Up @@ -313,7 +313,7 @@ def display_commit_message_and_commit(commit_message):
new_commit_message = text_editor.new_text
if not new_commit_message:
return None
return subprocess.check_output(["git", "commit", "-m", new_commit_message])
return subprocess_check_output(["git", "commit", "-m", new_commit_message])


def extract_issue_id(branch_name):
Expand Down
102 changes: 90 additions & 12 deletions merico/github/git_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,84 @@ def read_github_token():
return server_access_token


current_repo_dir = None


def get_current_repo():
"""
获取当前文件所在的仓库信息
"""
global current_repo_dir

if not current_repo_dir:
selected_data = IDEService().get_selected_range().dict()
current_file = selected_data.get("abspath", None)
if not current_file:
return None
current_dir = os.path.dirname(current_file)
try:
# 获取仓库根目录
current_repo_dir = (
subprocess.check_output(
["git", "rev-parse", "--show-toplevel"],
stderr=subprocess.DEVNULL,
cwd=current_dir,
)
.decode("utf-8")
.strip()
)
except subprocess.CalledProcessError:
# 如果发生错误,可能不在git仓库中
return None
return current_repo_dir


def subprocess_check_output(*popenargs, timeout=None, **kwargs):
# 将 current_dir 添加到 kwargs 中的 cwd 参数
current_repo = get_current_repo()
if current_repo:
kwargs["cwd"] = kwargs.get("cwd", current_repo)

# 调用 subprocess.check_output
return subprocess.check_output(*popenargs, timeout=timeout, **kwargs)


def subprocess_run(
*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs
):
current_repo = get_current_repo()
if current_repo:
kwargs["cwd"] = kwargs.get("cwd", current_repo)

# 调用 subprocess.run
return subprocess.run(
*popenargs,
input=input,
capture_output=capture_output,
timeout=timeout,
check=check,
**kwargs,
)


def subprocess_call(*popenargs, timeout=None, **kwargs):
current_repo = get_current_repo()
if current_repo:
kwargs["cwd"] = kwargs.get("cwd", current_repo)

# 调用 subprocess.call
return subprocess.call(*popenargs, timeout=timeout, **kwargs)


def subprocess_check_call(*popenargs, timeout=None, **kwargs):
current_repo = get_current_repo()
if current_repo:
kwargs["cwd"] = kwargs.get("cwd", current_repo)

# 调用 subprocess.check_call
return subprocess.check_call(*popenargs, timeout=timeout, **kwargs)


GITHUB_ACCESS_TOKEN = read_github_token()
GITHUB_API_URL = "https://api.github.com"

Expand Down Expand Up @@ -124,7 +202,7 @@ def check_git_installed():
bool: True if Git is installed, False otherwise.
"""
try:
subprocess.run(
subprocess_run(
["git", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return True
Expand All @@ -134,7 +212,7 @@ def check_git_installed():


def create_and_checkout_branch(branch_name):
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
subprocess_run(["git", "checkout", "-b", branch_name], check=True)


def is_issue_url(task):
Expand Down Expand Up @@ -179,7 +257,7 @@ def get_github_repo(issue_repo=False):
return config_data["issue_repo"]

# 使用git命令获取当前仓库的URL
result = subprocess.check_output(
result = subprocess_check_output(
["git", "remote", "get-url", "origin"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str并提取出仓库信息
Expand All @@ -205,7 +283,7 @@ def get_github_repo(issue_repo=False):
def get_current_branch():
try:
# 使用git命令获取当前分支名称
result = subprocess.check_output(
result = subprocess_check_output(
["git", "branch", "--show-current"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
Expand All @@ -225,7 +303,7 @@ def get_parent_branch():
return None
try:
# 使用git命令获取当前分支的父分支引用
result = subprocess.check_output(
result = subprocess_check_output(
["git", "rev-parse", "--abbrev-ref", f"{current_branch}@{1}"], stderr=subprocess.STDOUT
).strip()
# 将结果从bytes转换为str
Expand All @@ -235,7 +313,7 @@ def get_parent_branch():
# 如果父分支引用和当前分支相同,说明当前分支可能是基于一个没有父分支的提交创建的
return None
# 使用git命令获取父分支的名称
result = subprocess.check_output(
result = subprocess_check_output(
["git", "name-rev", "--name-only", "--exclude=tags/*", parent_branch_ref],
stderr=subprocess.STDOUT,
).strip()
Expand Down Expand Up @@ -282,7 +360,7 @@ def get_issue_id(issue_url):
# 获取当前分支自从与base_branch分叉以来的历史提交信息
def get_commit_messages(base_branch):
# 找到当前分支与base_branch的分叉点
merge_base = subprocess.run(
merge_base = subprocess_run(
["git", "merge-base", "HEAD", base_branch],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand All @@ -297,7 +375,7 @@ def get_commit_messages(base_branch):
merge_base_commit = merge_base.stdout.strip()

# 获取从分叉点到当前分支的所有提交信息
result = subprocess.run(
result = subprocess_run(
["git", "log", f"{merge_base_commit}..HEAD"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand Down Expand Up @@ -328,7 +406,7 @@ def create_pull_request(title, body, head, base, repo_name):
def run_command_with_retries(command, retries=3, delay=5):
for attempt in range(retries):
try:
subprocess.check_call(command)
subprocess_check_call(command)
return True
except subprocess.CalledProcessError as e:
print(f"Command failed: {e}")
Expand All @@ -343,7 +421,7 @@ def run_command_with_retries(command, retries=3, delay=5):
def check_unpushed_commits():
try:
# 获取当前分支的本地提交和远程提交的差异
result = subprocess.check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
result = subprocess_check_output(["git", "cherry", "-v"]).decode("utf-8").strip()
# 如果结果不为空,说明存在未push的提交
return bool(result)
except subprocess.CalledProcessError as e:
Expand All @@ -357,7 +435,7 @@ def auto_push():
return True
try:
branch = (
subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
subprocess_check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
.strip()
.decode("utf-8")
)
Expand All @@ -366,7 +444,7 @@ def auto_push():
return False

# 检查当前分支是否有对应的远程分支
remote_branch_exists = subprocess.call(["git", "ls-remote", "--exit-code", "origin", branch])
remote_branch_exists = subprocess_call(["git", "ls-remote", "--exit-code", "origin", branch])

push_command = ["git", "push", "origin", branch]
if remote_branch_exists == 0:
Expand Down
24 changes: 24 additions & 0 deletions merico/gitlab/code_task_summary/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
### code_task_summary

根据当前分支或指定的Issue,生成代码任务摘要。

#### 用途
- 自动生成简洁的代码任务描述
- 帮助开发者快速理解任务要点
- 用于更新项目配置或文档

#### 使用方法
执行命令: `/github.code_task_summary [issue_url]`

- 如不提供issue_url,将基于当前分支名称提取Issue信息
- 如提供issue_url,将直接使用该Issue的内容

#### 操作流程
1. 获取Issue信息
2. 生成代码任务摘要
3. 允许用户编辑摘要
4. 更新项目配置文件

#### 注意事项
- 确保Git仓库配置正确
- 需要有效的GitHub Token以访问API
124 changes: 124 additions & 0 deletions merico/gitlab/code_task_summary/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import json
import os
import sys

from devchat.llm import chat_json

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))

from common_util import assert_exit, ui_edit # noqa: E402
from git_api import ( # noqa: E402
check_git_installed,
get_current_branch,
get_gitlab_issue_repo,
get_issue_info,
is_issue_url,
read_issue_by_url,
)


def extract_issue_id(branch_name):
if "#" in branch_name:
return branch_name.split("#")[-1]
return None


# Function to generate a random branch name
PROMPT = (
"You are a coding engineer, required to summarize the ISSUE description into a coding task description of no more than 50 words. \n" # noqa: E501
"The ISSUE description is as follows: {issue_body}, please summarize the corresponding coding task description.\n" # noqa: E501
'The coding task description should be output in JSON format, in the form of: {{"summary": "code task summary"}}\n' # noqa: E501
)


@chat_json(prompt=PROMPT)
def generate_code_task_summary(issue_body):
pass


@ui_edit(ui_type="editor", description="Edit code task summary:")
def edit_code_task_summary(task_summary):
pass


def get_issue_or_task(task):
if is_issue_url(task):
issue = read_issue_by_url(task.strip())
assert_exit(not issue, "Failed to read issue.", exit_code=-1)

return json.dumps(
{"id": issue["iid"], "title": issue["title"], "description": issue["description"]}
)
else:
return task


def get_issue_json(issue_id, task):
issue = {"id": "no issue id", "title": "", "description": task}
if issue_id:
issue = get_issue_info(issue_id)
assert_exit(not issue, f"Failed to retrieve issue with ID: {issue_id}", exit_code=-1)
issue = {
"id": issue_id,
"web_url": issue["web_url"],
"title": issue["title"],
"description": issue["description"],
}
return issue


# Main function
def main():
print("Start update code task summary ...", end="\n\n", flush=True)

is_git_installed = check_git_installed()
assert_exit(not is_git_installed, "Git is not installed.", exit_code=-1)

task = sys.argv[1]

repo_name = get_gitlab_issue_repo()
branch_name = get_current_branch()
issue_id = extract_issue_id(branch_name)

# print basic info, repo_name, branch_name, issue_id
print("repo name:", repo_name, end="\n\n")
print("branch name:", branch_name, end="\n\n")
print("issue id:", issue_id, end="\n\n")

issue = get_issue_json(issue_id, task)
assert_exit(
not issue["description"], f"Failed to retrieve issue with ID: {issue_id}", exit_code=-1
)

# Generate 5 branch names
print("Generating code task summary ...", end="\n\n", flush=True)
code_task_summary = generate_code_task_summary(issue_body=issue["description"])
assert_exit(not code_task_summary, "Failed to generate code task summary.", exit_code=-1)
assert_exit(
not code_task_summary.get("summary", None),
"Failed to generate code task summary, missing summary field in result.",
exit_code=-1,
)
code_task_summary = code_task_summary["summary"]

# Select branch name
code_task_summary = edit_code_task_summary(code_task_summary)
assert_exit(not code_task_summary, "Failed to edit code task summary.", exit_code=-1)
code_task_summary = code_task_summary[0]

# create and checkout branch
print("Updating code task summary to config:")
config_file = os.path.join(".chat", "complete.config")
if os.path.exists(config_file):
with open(config_file, "r") as f:
config = json.load(f)
config["taskDescription"] = code_task_summary
else:
config = {"taskDescription": code_task_summary}
with open(config_file, "w") as f:
json.dump(config, f, indent=4)
print("Code task summary has updated")


if __name__ == "__main__":
main()
Loading
Loading