Skip to content

Commit

Permalink
Added support for DevOps Pull Requests
Browse files Browse the repository at this point in the history
  • Loading branch information
boginw committed Oct 24, 2023
1 parent ea993ab commit 67d7375
Show file tree
Hide file tree
Showing 2 changed files with 342 additions and 12 deletions.
273 changes: 264 additions & 9 deletions weblate/vcs/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,14 +830,7 @@ def get_credentials_configuration(cls):
def get_credentials(self) -> dict[str, str]:
scheme, host, owner, slug = self.parse_repo_url()
hostname = self.format_api_host(host).lower()

configuration = self.get_credentials_configuration()
try:
credentials = configuration[hostname]
except KeyError as exc:
raise RepositoryError(
0, f"{self.name} API access for {hostname} is not configured"
) from exc
credentials = self.get_credentials_by_hostname(hostname)

# Scheme override
if "scheme" in credentials:
Expand All @@ -856,6 +849,16 @@ def get_credentials(self) -> dict[str, str]:
"scheme": scheme,
}

def get_credentials_by_hostname(self, hostname):
configuration = self.get_credentials_configuration()
try:
credentials = configuration[hostname]
except KeyError as exc:
raise RepositoryError(
0, f"{self.name} API access for {hostname} is not configured"
) from exc
return credentials

@classmethod
def is_configured(cls) -> bool:
return bool(cls.get_credentials_configuration())
Expand Down Expand Up @@ -1051,7 +1054,7 @@ def request(
response_data = response.json()
except JSONDecodeError as error:
report_error(cause="request json decoding")
response.raise_for_status()
self.raise_for_response(response)
raise RepositoryError(0, str(error)) from error

if self.should_retry(response, response_data):
Expand All @@ -1076,6 +1079,258 @@ def failed_pull_request(
-1, f"Could not create pull request {status_code}: {error}"
)

@classmethod
def raise_for_response(cls, response: requests.Response):
"""
Raises :class:`HTTPError`, if one occurred.
Some providers (DevOps for instance) respond with codes in the 2XX range
even though the response was an error. This method exists to let the
inheritors override it if they have special cases.
"""
response.raise_for_status()


class DevopsRepository(GitMergeRequestBase):
name = gettext_lazy("DevOps pull request")
identifier = "devops"
_version = None
API_TEMPLATE = "{scheme}://{host}/{owner}/_apis/git/repositories/{slug}"
ORG_API_TEMPLATE = "{scheme}://{host}/{org}/_apis/Contribution/HierarchyQuery?api-version=5.0-preview.1"
REQUIRED_CONFIG = {"username", "token", "organization"}
OPTIONAL_CONFIG = {"scheme", "workItemIds"}
push_label = gettext_lazy(
"This will push changes and create a DevOps pull request."
)

@classmethod
def raise_for_response(cls, response: requests.Response):
super().raise_for_response(response)

# DevOps returns 203 when the token is invalid
if response.status_code == 203:
raise RepositoryError(0, "Invalid token")

def fork(self, credentials: dict):
remotes = self.execute(["remote"]).splitlines()
if credentials["username"] not in remotes:
self.create_fork(credentials)
return

# If the fork was deleted, we just create another fork
try:
self.__get_forked_id(credentials, credentials["username"])
except RepositoryError:
self.create_fork(credentials)

def parse_repo_url(self, repo: str | None = None) -> tuple[str, str, str, str]:
if repo is None:
repo = self.component.repo

scheme_regex = r"^[a-z]+:\/\/.*" # matches for example ssh://* and https://*

if not re.match(scheme_regex, repo):
repo = "ssh://" + repo # assume all links without schema are ssh links

(scheme, host, owner, slug) = super().parse_repo_url(repo)

# ssh links are in the subdomain "ssh.", the API link doesn't have that, so remove it
if host.startswith("ssh."):
host = host[len("ssh.") :]

# https urls have /_git/ between owner and slug
if "/_git/" in slug:
parts = slug.split("/_git/")
owner = owner + "/" + parts[0] # we want owner to be org/project
slug = parts[1]
elif "/" in slug:
parts = slug.split("/")

owner = owner + "/" + parts[0] # we want owner to be org/project
slug = parts[1]

return scheme, host, owner, slug

def get_headers(self, credentials: dict):
encoded_token = base64.b64encode(
(":" + credentials["token"]).encode("utf8")
).decode("utf8")

headers = super().get_headers(credentials)
headers["Accept"] = "application/json; api-version=7.0"
headers["Authorization"] = "Basic " + encoded_token
return headers

def create_fork(self, credentials: dict):
# url without repository name
fork_url = "/".join(list(credentials["url"].split("/")[0:-1]))

# Get parent repo info
response_data, response, error = self.request(
"get", credentials, credentials["url"]
)

if "project" not in response_data:
raise RepositoryError(
0, self.get_fork_failed_message(error, credentials, response)
)

found_fork = self.__find_fork(credentials)

if found_fork is not None:
self.configure_fork_remote(found_fork["sshUrl"], credentials["username"])
return

request = {
"name": credentials["slug"],
"project": {"id": response_data["project"]["id"]},
"parentRepository": {
"id": response_data["id"],
"project": {"id": response_data["project"]["id"]},
},
}

# Create fork
response_data, response, error = self.request(
"post", credentials, fork_url, json=request
)

if "TF400948" in error: # A Git repository with the name already exists
fork_name = "{}-{}".format(
credentials["slug"],
random.randint(1000, 9999), # noqa: S311
)

request["name"] = fork_name
response_data, response, error = self.request(
"post", credentials, fork_url, json=request
)

if "sshUrl" not in response_data:
raise RepositoryError(
0, self.get_fork_failed_message(error, credentials, response)
)

self.configure_fork_remote(response_data["sshUrl"], credentials["username"])

def get_credentials(self) -> dict[str, str]:
super_credentials = super().get_credentials()
hostname = super_credentials.get("hostname")
credentials = self.get_credentials_by_hostname(hostname)

super_credentials["organization"] = credentials["organization"]
super_credentials["workItemIds"] = (
credentials["workItemIds"] if "workItemIds" in credentials else []
)

return super_credentials

def create_pull_request(
self,
credentials: dict,
origin_branch: str,
fork_remote: str,
fork_branch: str,
retry_fork: bool = True,
):
pr_url = "{}/pullrequests".format(credentials["url"])
title, description = self.get_merge_message()

work_item_ids = self.get_credentials().get("workItemIds")
work_item_refs = [{"id": str(ref)} for ref in work_item_ids]

request = {
"sourceRefName": "refs/heads/" + fork_branch,
"targetRefName": "refs/heads/" + origin_branch,
"title": title,
"description": description,
"workItemRefs": work_item_refs,
}

if fork_remote != "origin":
forked_id = self.__get_forked_id(credentials, fork_remote)
request["forkSource"] = {"repository": {"id": forked_id}}

response_data, response, error_message = self.request(
"post", credentials, pr_url, json=request
)

# Check for an error. If the error has a message saying A pull request already
# exists, then we ignore that, else raise an error.
if "url" not in response_data:
# Gracefully handle pull request already exists
if "TF401179" in error_message:
return

self.failed_pull_request(error_message, pr_url, response, response_data)

def __get_forked_id(self, credentials: dict, remote: str) -> str:
"""
Returns ID of the forked DevOps repository.
To send a PR to DevOps via API with a fork, one needs to send request
a request with the ID of the forked repository (unlike others, where
the name is enough).
"""
cmd = ["remote", "get-url", "--push", remote]
fork_remotes = self.execute(cmd, needs_lock=False, merge_err=False).splitlines()
(_, hostname, owner, slug) = self.parse_repo_url(fork_remotes[0])
url = self.format_url("https", hostname, owner, slug)

# Get repo info
response_data, response, error = self.request("get", credentials, url)

if "id" not in response_data:
raise RepositoryError(0, error)

return response_data["id"]

def __find_fork(self, credentials) -> dict | None:
for fork in self.__get_forks(credentials):
if "project" in fork and fork["project"]["name"] == credentials["username"]:
return fork

return None

def __get_forks(self, credentials: dict) -> list:
forks_url = "{}/forks/{}".format(credentials["url"], self.__get_org_id())
response_data, response, error = self.request("get", credentials, forks_url)

if response.status_code != 200:
raise RepositoryError(
0, self.get_fork_failed_message(error, credentials, response)
)

return response_data["value"]

def __get_org_id(self) -> str:
credentials = self.get_credentials()
org = credentials.get("owner").split("/")[0] # format is "org/proj"

url = self.ORG_API_TEMPLATE.format(
scheme=credentials["scheme"],
host=credentials["hostname"],
org=org,
)

org_property = "ms.vss-features.my-organizations-data-provider"
request = {
"contributionIds": [org_property],
"dataProviderContext": {"properties": {}},
}

response_data, response, error = self.request(
"post", credentials, url, json=request
)

try:
data_providers = response_data["dataProviders"]
return data_providers[org_property]["organizations"][0]["id"]
except (KeyError, IndexError):
raise RepositoryError(
0, self.get_fork_failed_message(error, credentials, response)
)


class DevopsRepository(GitMergeRequestBase):
name = gettext_lazy("DevOps pull request")
Expand Down
Loading

0 comments on commit 67d7375

Please sign in to comment.