From f38c65f66e6326d4ca008af9946211bac0b1069b Mon Sep 17 00:00:00 2001 From: Mohammad Twin Date: Thu, 14 Dec 2023 17:07:11 +0400 Subject: [PATCH 1/2] feat: all codes linted with black command --- dags/github.py | 450 +++++++++++++---------- dags/github_api_helpers/__init__.py | 16 +- dags/github_api_helpers/comments.py | 62 ++-- dags/github_api_helpers/commits.py | 17 +- dags/github_api_helpers/issues.py | 18 +- dags/github_api_helpers/labels.py | 9 +- dags/github_api_helpers/orgs.py | 14 +- dags/github_api_helpers/pull_requests.py | 82 +++-- dags/github_api_helpers/repos.py | 20 +- dags/github_api_helpers/smart_proxy.py | 7 +- dags/github_old_version.py | 83 +++-- dags/neo4j_storage/__init__.py | 14 +- dags/neo4j_storage/comments.py | 60 +-- dags/neo4j_storage/commits.py | 42 ++- dags/neo4j_storage/issues.py | 35 +- dags/neo4j_storage/labels.py | 13 +- dags/neo4j_storage/neo4j_enums.py | 4 +- dags/neo4j_storage/orgs.py | 41 ++- dags/neo4j_storage/pull_requests.py | 67 ++-- dags/neo4j_storage/repos.py | 28 +- dags/neo4j_storage/utils.py | 7 +- 21 files changed, 644 insertions(+), 445 deletions(-) diff --git a/dags/github.py b/dags/github.py index aa8bdb18..1667c186 100644 --- a/dags/github.py +++ b/dags/github.py @@ -23,42 +23,51 @@ from airflow.decorators import task from github_api_helpers import ( - get_all_pull_requests, get_all_issues, - get_all_org_repos, get_all_org_repos, get_all_repo_issues_and_prs_comments, - get_all_commits, fetch_org_details, get_all_reviews_of_pull_request, get_all_repo_review_comments, get_all_repo_contributors, get_all_pull_request_files, + get_all_pull_requests, get_all_org_members, get_all_repo_labels, + get_all_org_repos, + get_all_org_repos, + fetch_org_details, fetch_commit_files, + get_all_commits, + get_all_issues, ) from neo4j_storage import ( - save_orgs_to_neo4j, save_repo_to_neo4j, save_commit_files_changes_to_neo4j, save_repo_contributors_to_neo4j, save_pr_files_changes_to_neo4j, save_review_comment_to_neo4j, get_orgs_profile_from_neo4j, - save_pull_request_to_neo4j, + save_pull_request_to_neo4j, save_org_member_to_neo4j, save_comment_to_neo4j, save_commit_to_neo4j, save_review_to_neo4j, save_issue_to_neo4j, save_label_to_neo4j, + save_orgs_to_neo4j, + save_repo_to_neo4j, ) -with DAG(dag_id="github_functionality", start_date=datetime(2022, 12, 1, 14), schedule_interval=timedelta(hours=6), catchup=False,) as dag: +with DAG( + dag_id="github_functionality", + start_date=datetime(2022, 12, 1, 14), + schedule_interval=timedelta(hours=6), + catchup=False, +) as dag: @task def get_all_organization(): orgs = get_orgs_profile_from_neo4j() return orgs - #! for testing + #! for testing # toghether_crew_org = { # "id": 1, # "name": "TogetherCrew", @@ -75,59 +84,63 @@ def get_all_organization(): # } # orgs = [rndao_org, toghether_crew_org] - #region organization ETL + # region organization ETL @task def extract_github_organization(organization): - organization_name = organization['name'] - org_info = fetch_org_details(org_name= organization_name) + organization_name = organization["name"] + org_info = fetch_org_details(org_name=organization_name) + + return {"organization_basic": organization, "organization_info": org_info} - return { "organization_basic": organization, "organization_info": org_info} - @task def transform_github_organization(organization): return organization - + @task def load_github_organization(organization): - organization_info = organization['organization_info'] + organization_info = organization["organization_info"] save_orgs_to_neo4j(organization_info) return organization - #endregion - - #region organization members ETL + # endregion + + # region organization members ETL @task def extract_github_organization_members(organization): - organization_name = organization['organization_basic']['name'] - members = get_all_org_members(org= organization_name) + organization_name = organization["organization_basic"]["name"] + members = get_all_org_members(org=organization_name) + + return {"organization_members": members, **organization} - return { "organization_members": members, **organization } @task def transform_github_organization_members(data): print("data: ", data) return data + @task def load_github_organization_members(data): - members = data['organization_members'] - org_id = data['organization_info']['id'] + members = data["organization_members"] + org_id = data["organization_info"]["id"] for member in members: - save_org_member_to_neo4j(org_id= org_id, member= member) - + save_org_member_to_neo4j(org_id=org_id, member=member) + return data - #endregion + # endregion - #region github repos ETL + # region github repos ETL @task def extract_github_repos(organizations): all_repos = [] for organization in organizations: - repos = get_all_org_repos(org_name=organization['organization_basic']['name']) - repos = list(map(lambda repo: { "repo": repo, **organization }, repos)) + repos = get_all_org_repos( + org_name=organization["organization_basic"]["name"] + ) + repos = list(map(lambda repo: {"repo": repo, **organization}, repos)) print("len-repos: ", len(repos)) - + all_repos.extend(repos) return all_repos @@ -136,206 +149,218 @@ def extract_github_repos(organizations): def transform_github_repos(repo): print("TRANSFORM REPO: ", repo) return repo - + @task def load_github_repos(repo): - repo = repo['repo'] + repo = repo["repo"] print("LOAD REPO: ", repo) save_repo_to_neo4j(repo) return repo - #endregion - #region pull requests ETL + # endregion + + # region pull requests ETL @task def extract_pull_requests(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] - prs = get_all_pull_requests(owner= owner, repo= repo_name) + prs = get_all_pull_requests(owner=owner, repo=repo_name) for pr in prs: print("pr: ", pr, end="\n\n") - - new_data = { "prs": prs, **data } + + new_data = {"prs": prs, **data} return new_data - + @task def transform_pull_requests(data): print("prs IN TRANSFORM: ", data) return data - + @task def load_pull_requests(data): print("prs IN REQUESTS: ", data) - prs = data['prs'] - repository_id = data['repo']['id'] + prs = data["prs"] + repository_id = data["repo"]["id"] for pr in prs: print("PR(pull-request): ", pr) - save_pull_request_to_neo4j(pr= pr, repository_id= repository_id) + save_pull_request_to_neo4j(pr=pr, repository_id=repository_id) return data - #endregion - #region pull request files changes ETL + # endregion + + # region pull request files changes ETL @task def extract_pull_request_files_changes(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - prs = data['prs'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + prs = data["prs"] pr_files_changes = {} for pr in prs: - files_changes = get_all_pull_request_files(owner= owner, repo= repo_name, pull_number= pr.get('number', None)) - pr_files_changes[pr['id']] = files_changes + files_changes = get_all_pull_request_files( + owner=owner, repo=repo_name, pull_number=pr.get("number", None) + ) + pr_files_changes[pr["id"]] = files_changes + + return {"pr_files_changes": pr_files_changes, **data} - return { "pr_files_changes": pr_files_changes, **data } - @task def transform_pull_request_files_changes(data): return data - + @task def load_pull_request_files_changes(data): - pr_files_changes = data['pr_files_changes'] - repository_id = data['repo']['id'] + pr_files_changes = data["pr_files_changes"] + repository_id = data["repo"]["id"] for pr_id, files_changes in pr_files_changes.items(): - save_pr_files_changes_to_neo4j(pr_id= pr_id, repository_id= repository_id, file_changes= files_changes) + save_pr_files_changes_to_neo4j( + pr_id=pr_id, repository_id=repository_id, file_changes=files_changes + ) return data - - #endregion - #region pr review ETL + # endregion + + # region pr review ETL @task def extract_pr_review(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - prs = data['prs'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + prs = data["prs"] pr_reviews = {} for pr in prs: - reviews = get_all_reviews_of_pull_request(owner= owner, repo= repo_name, pull_number= pr.get('number', None)) - pr_reviews[pr['id']] = reviews + reviews = get_all_reviews_of_pull_request( + owner=owner, repo=repo_name, pull_number=pr.get("number", None) + ) + pr_reviews[pr["id"]] = reviews + + return {"pr_reviews": pr_reviews, **data} - return { "pr_reviews": pr_reviews, **data } @task def transform_pr_review(data): return data - + @task def load_pr_review(data): - pr_reviews = data['pr_reviews'] + pr_reviews = data["pr_reviews"] for pr_id, reviews in pr_reviews.items(): for review in reviews: - save_review_to_neo4j(pr_id= pr_id, review= review) + save_review_to_neo4j(pr_id=pr_id, review=review) return data - #endregion + # endregion - #region pr review comment ETL + # region pr review comment ETL @task def extract_pr_review_comments(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] - review_comments = get_all_repo_review_comments(owner= owner, repo= repo_name) + review_comments = get_all_repo_review_comments(owner=owner, repo=repo_name) for review_comment in review_comments: print("review_comment: ", review_comment, end="\n\n") - return { "review_comments": review_comments, **data } + return {"review_comments": review_comments, **data} @task def transform_pr_review_comments(data): return data - + @task def load_pr_review_comments(data): - review_comments = data['review_comments'] - repository_id = data['repo']['id'] + review_comments = data["review_comments"] + repository_id = data["repo"]["id"] for review_comment in review_comments: - save_review_comment_to_neo4j(review_comment= review_comment, repository_id= repository_id) + save_review_comment_to_neo4j( + review_comment=review_comment, repository_id=repository_id + ) return data - #endregion + # endregion - #region pr & issue comments ETL + # region pr & issue comments ETL @task def extract_pr_issue_comments(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] - comments = get_all_repo_issues_and_prs_comments(owner= owner, repo= repo_name) + comments = get_all_repo_issues_and_prs_comments(owner=owner, repo=repo_name) for comment in comments: print("comment: ", comment, end="\n\n") - return { "comments": comments, **data } - + return {"comments": comments, **data} + @task def transform_pr_issue_comments(data): return data - + @task def load_pr_issue_comments(data): - comments = data['comments'] - repository_id = data['repo']['id'] + comments = data["comments"] + repository_id = data["repo"]["id"] print("Len(comments): ", len(comments)) for comment in comments: - save_comment_to_neo4j(comment= comment, repository_id= repository_id) + save_comment_to_neo4j(comment=comment, repository_id=repository_id) return data - #endregion + # endregion - #region repo contributors ETL + # region repo contributors ETL @task def extract_repo_contributors(data): - repo = data['repo'] - repo_name = repo['name'] - owner = repo['owner']['login'] - contributors = get_all_repo_contributors(owner= owner, repo= repo_name) - - return { "contributors": contributors, **data } - - @task + repo = data["repo"] + repo_name = repo["name"] + owner = repo["owner"]["login"] + contributors = get_all_repo_contributors(owner=owner, repo=repo_name) + + return {"contributors": contributors, **data} + + @task def transform_repo_contributors(data): print("contributors IN TRANSFORM: ", data) return data - + @task def load_repo_contributors(data): - - contributors = data['contributors'] - repository_id = data['repo']['id'] + contributors = data["contributors"] + repository_id = data["repo"]["id"] for contributor in contributors: - save_repo_contributors_to_neo4j(contributor= contributor, repository_id= repository_id) + save_repo_contributors_to_neo4j( + contributor=contributor, repository_id=repository_id + ) return data - #endregion + # endregion - #region issues ETL + # region issues ETL @task def extract_issues(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - issues = get_all_issues(owner= owner, repo= repo_name) + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + issues = get_all_issues(owner=owner, repo=repo_name) print("issues IN TASK: ", issues) - return { "issues": issues, **data } + return {"issues": issues, **data} @task def transform_issues(data): @@ -343,51 +368,50 @@ def transform_issues(data): @task def load_issues(data): - - issues = data['issues'] - repository_id = data['repo']['id'] + issues = data["issues"] + repository_id = data["repo"]["id"] for issue in issues: - save_issue_to_neo4j(issue= issue, repository_id= repository_id) + save_issue_to_neo4j(issue=issue, repository_id=repository_id) return data - #endregion + # endregion - #region labels ETL + # region labels ETL @task def extract_labels(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - labels = get_all_repo_labels(owner= owner, repo= repo_name) + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + labels = get_all_repo_labels(owner=owner, repo=repo_name) + + return {"labels": labels, **data} - return { "labels": labels, **data } - @task def transform_labels(data): return data - + @task def load_labels(data): - labels = data['labels'] + labels = data["labels"] for label in labels: - save_label_to_neo4j(label= label) - + save_label_to_neo4j(label=label) + return data - - #endregion - #region commits ETL + # endregion + + # region commits ETL @task def extract_commits(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - commits = get_all_commits(owner= owner, repo= repo_name) + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + commits = get_all_commits(owner=owner, repo=repo_name) - return { "commits": commits, **data } + return {"commits": commits, **data} @task def transform_commits(data): @@ -395,109 +419,131 @@ def transform_commits(data): @task def load_commits(data): - commits = data['commits'] - repository_id = data['repo']['id'] + commits = data["commits"] + repository_id = data["repo"]["id"] for commit in commits: - save_commit_to_neo4j(commit= commit, repository_id= repository_id) + save_commit_to_neo4j(commit=commit, repository_id=repository_id) return data - #endregion + # endregion - #region commits files changes ETL + # region commits files changes ETL @task def extract_commits_files_changes(data): - repo = data['repo'] - owner = repo['owner']['login'] - repo_name = repo['name'] - commits = data['commits'] + repo = data["repo"] + owner = repo["owner"]["login"] + repo_name = repo["name"] + commits = data["commits"] commits_files_changes = {} for commit in commits: - sha = commit['sha'] - files_changes = fetch_commit_files(owner= owner, repo= repo_name, sha= sha) + sha = commit["sha"] + files_changes = fetch_commit_files(owner=owner, repo=repo_name, sha=sha) commits_files_changes[sha] = files_changes - return { "commits_files_changes": commits_files_changes, **data } - + return {"commits_files_changes": commits_files_changes, **data} + @task def transform_commits_files_changes(data): return data - + @task def load_commits_files_changes(data): - commits_files_changes = data['commits_files_changes'] - repository_id = data['repo']['id'] + commits_files_changes = data["commits_files_changes"] + repository_id = data["repo"]["id"] for sha, files_changes in commits_files_changes.items(): - save_commit_files_changes_to_neo4j(commit_sha= sha, repository_id= repository_id, file_changes= files_changes) + save_commit_files_changes_to_neo4j( + commit_sha=sha, repository_id=repository_id, file_changes=files_changes + ) return data - - #endregion - orgs = get_all_organization() - orgs_info = extract_github_organization.expand(organization= orgs) - transform_orgs = transform_github_organization.expand(organization= orgs_info) - load_orgs = load_github_organization.expand(organization= transform_orgs) + # endregion - orgs_members = extract_github_organization_members.expand(organization= orgs_info) - transform_orgs_members = transform_github_organization_members.expand(data= orgs_members) - load_orgs_members = load_github_organization_members.expand(data= transform_orgs_members) + orgs = get_all_organization() + orgs_info = extract_github_organization.expand(organization=orgs) + transform_orgs = transform_github_organization.expand(organization=orgs_info) + load_orgs = load_github_organization.expand(organization=transform_orgs) + + orgs_members = extract_github_organization_members.expand(organization=orgs_info) + transform_orgs_members = transform_github_organization_members.expand( + data=orgs_members + ) + load_orgs_members = load_github_organization_members.expand( + data=transform_orgs_members + ) load_orgs >> load_orgs_members - repos = extract_github_repos(organizations= orgs_info) - transform_repos = transform_github_repos.expand(repo= repos) - load_repos = load_github_repos.expand(repo= transform_repos) + repos = extract_github_repos(organizations=orgs_info) + transform_repos = transform_github_repos.expand(repo=repos) + load_repos = load_github_repos.expand(repo=transform_repos) load_orgs >> load_repos - contributors = extract_repo_contributors.expand(data= repos) - transform_contributors = transform_repo_contributors.expand(data= contributors) - load_contributors = load_repo_contributors.expand(data= transform_contributors) + contributors = extract_repo_contributors.expand(data=repos) + transform_contributors = transform_repo_contributors.expand(data=contributors) + load_contributors = load_repo_contributors.expand(data=transform_contributors) load_repos >> load_contributors - labels = extract_labels.expand(data= repos) - transform_label = transform_labels.expand(data= labels) - load_label = load_labels.expand(data= transform_label) + labels = extract_labels.expand(data=repos) + transform_label = transform_labels.expand(data=labels) + load_label = load_labels.expand(data=transform_label) - prs = extract_pull_requests.expand(data= repos) - transform_prs = transform_pull_requests.expand(data= prs) - load_prs = load_pull_requests.expand(data= transform_prs) + prs = extract_pull_requests.expand(data=repos) + transform_prs = transform_pull_requests.expand(data=prs) + load_prs = load_pull_requests.expand(data=transform_prs) load_contributors >> load_prs load_label >> load_prs - pr_files_changes = extract_pull_request_files_changes.expand(data= prs) - transform_pr_files_changes = transform_pull_request_files_changes.expand(data= pr_files_changes) - load_pr_files_changes = load_pull_request_files_changes.expand(data= transform_pr_files_changes) - - issues = extract_issues.expand(data= repos) - transform_issue = transform_issues.expand(data= issues) - load_issue = load_issues.expand(data= transform_issue) + pr_files_changes = extract_pull_request_files_changes.expand(data=prs) + transform_pr_files_changes = transform_pull_request_files_changes.expand( + data=pr_files_changes + ) + load_pr_files_changes = load_pull_request_files_changes.expand( + data=transform_pr_files_changes + ) + + issues = extract_issues.expand(data=repos) + transform_issue = transform_issues.expand(data=issues) + load_issue = load_issues.expand(data=transform_issue) load_contributors >> load_issue load_label >> load_issue - pr_reviews = extract_pr_review.expand(data= prs) - transform_pr_review = transform_pr_review.expand(data= pr_reviews) - load_pr_review = load_pr_review.expand(data= transform_pr_review) - - pr_review_comments = extract_pr_review_comments.expand(data= prs) - transform_pr_review_comments = transform_pr_review_comments.expand(data= pr_review_comments) - load_pr_review_comments = load_pr_review_comments.expand(data= transform_pr_review_comments) + pr_reviews = extract_pr_review.expand(data=prs) + transform_pr_review = transform_pr_review.expand(data=pr_reviews) + load_pr_review = load_pr_review.expand(data=transform_pr_review) + + pr_review_comments = extract_pr_review_comments.expand(data=prs) + transform_pr_review_comments = transform_pr_review_comments.expand( + data=pr_review_comments + ) + load_pr_review_comments = load_pr_review_comments.expand( + data=transform_pr_review_comments + ) load_prs >> load_pr_review_comments - pr_issue_comments = extract_pr_issue_comments.expand(data= prs) - transformed_pr_issue_comments = transform_pr_issue_comments.expand(data= pr_issue_comments) - loaded_pr_issue_comments = load_pr_issue_comments.expand(data= transformed_pr_issue_comments) + pr_issue_comments = extract_pr_issue_comments.expand(data=prs) + transformed_pr_issue_comments = transform_pr_issue_comments.expand( + data=pr_issue_comments + ) + loaded_pr_issue_comments = load_pr_issue_comments.expand( + data=transformed_pr_issue_comments + ) load_prs >> loaded_pr_issue_comments load_issue >> loaded_pr_issue_comments - commits = extract_commits.expand(data= repos) - transform_comment = transform_commits.expand(data= commits) - load_comment = load_commits.expand(data= transform_comment) - - commits_files_changes = extract_commits_files_changes.expand(data= commits) - transform_commits_files_changes = transform_commits_files_changes.expand(data= commits_files_changes) - load_commits_files_changes = load_commits_files_changes.expand(data= transform_commits_files_changes) + commits = extract_commits.expand(data=repos) + transform_comment = transform_commits.expand(data=commits) + load_comment = load_commits.expand(data=transform_comment) + + commits_files_changes = extract_commits_files_changes.expand(data=commits) + transform_commits_files_changes = transform_commits_files_changes.expand( + data=commits_files_changes + ) + load_commits_files_changes = load_commits_files_changes.expand( + data=transform_commits_files_changes + ) load_comment >> load_commits_files_changes load_pr_files_changes >> load_commits_files_changes diff --git a/dags/github_api_helpers/__init__.py b/dags/github_api_helpers/__init__.py index 67b018de..586482aa 100644 --- a/dags/github_api_helpers/__init__.py +++ b/dags/github_api_helpers/__init__.py @@ -1,10 +1,16 @@ from .repos import get_all_org_repos, get_all_repo_contributors from .commits import get_all_commits, fetch_commit_details, fetch_commit_files from .issues import get_all_issues, get_all_comments_of_issue -from .pull_requests import (get_all_pull_requests, get_all_commits_of_pull_request, - get_all_comments_of_pull_request, get_all_review_comments_of_pull_request, - get_all_reactions_of_review_comment, get_all_reactions_of_comment, - get_all_reviews_of_pull_request, get_all_pull_request_files) +from .pull_requests import ( + get_all_pull_requests, + get_all_commits_of_pull_request, + get_all_comments_of_pull_request, + get_all_review_comments_of_pull_request, + get_all_reactions_of_review_comment, + get_all_reactions_of_comment, + get_all_reviews_of_pull_request, + get_all_pull_request_files, +) from .orgs import fetch_org_details, get_all_org_members from .labels import get_all_repo_labels -from .comments import get_all_repo_review_comments, get_all_repo_issues_and_prs_comments \ No newline at end of file +from .comments import get_all_repo_review_comments, get_all_repo_issues_and_prs_comments diff --git a/dags/github_api_helpers/comments.py b/dags/github_api_helpers/comments.py index 52bc35c2..77c675a0 100644 --- a/dags/github_api_helpers/comments.py +++ b/dags/github_api_helpers/comments.py @@ -1,5 +1,6 @@ from .smart_proxy import get + def extract_pull_request_number_from_review_comment_response(response_data): """ Extracts the pull request number from a review comment response. @@ -7,12 +8,15 @@ def extract_pull_request_number_from_review_comment_response(response_data): :param response_data: The response data from a review comment request. :return: The pull request number. """ - pull_request_url = response_data['pull_request_url'] - pull_request_number = int(pull_request_url.split('/')[-1]) + pull_request_url = response_data["pull_request_url"] + pull_request_number = int(pull_request_url.split("/")[-1]) return pull_request_number -def fetch_repo_review_comments_page(owner: str, repo: str, page: int, per_page: int = 100): + +def fetch_repo_review_comments_page( + owner: str, repo: str, page: int, per_page: int = 100 +): """ Fetches the review comments for all pull requests in a GitHub repository. @@ -22,19 +26,27 @@ def fetch_repo_review_comments_page(owner: str, repo: str, page: int, per_page: :param per_page: The number of results per page (default is 100). :return: A list of review comments for the specified repository. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/comments' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/pulls/comments" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() - updated_response_data = list(map(lambda x: {**x, "pull_request_number": extract_pull_request_number_from_review_comment_response(x)}, response_data)) + updated_response_data = list( + map( + lambda x: { + **x, + "pull_request_number": extract_pull_request_number_from_review_comment_response( + x + ), + }, + response_data, + ) + ) return updated_response_data + def get_all_repo_review_comments(owner: str, repo: str): """ Retrieves all review comments for all pull requests in a GitHub repository. @@ -57,6 +69,7 @@ def get_all_repo_review_comments(owner: str, repo: str): return all_comments + def extract_type_from_comment_response(response_data): """ Extracts the type of comment from a comment response. @@ -64,19 +77,22 @@ def extract_type_from_comment_response(response_data): :param response_data: The response data from a comment request. :return: The type of comment. """ - html_url = response_data['html_url'] - type = html_url.split('/')[-2] - url = response_data['issue_url'] - number = url.split('/')[-1] + html_url = response_data["html_url"] + type = html_url.split("/")[-2] + url = response_data["issue_url"] + number = url.split("/")[-1] if type == "issues": - return { "type": "issue", "number": number } + return {"type": "issue", "number": number} elif type == "pull": - return { "type": "pull_request", "number": number } + return {"type": "pull_request", "number": number} else: - return { "number": number } + return {"number": number} + -def fetch_repo_issues_and_prs_comments_page(owner: str, repo: str, page: int, per_page: int = 100): +def fetch_repo_issues_and_prs_comments_page( + owner: str, repo: str, page: int, per_page: int = 100 +): """ Fetches the review comments for all pull requests in a GitHub repository. @@ -86,19 +102,19 @@ def fetch_repo_issues_and_prs_comments_page(owner: str, repo: str, page: int, pe :param per_page: The number of results per page (default is 100). :return: A list of review comments for the specified repository. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/issues/comments' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/issues/comments" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() - updated_response_data = list(map(lambda x: {**x, **extract_type_from_comment_response(x)}, response_data)) + updated_response_data = list( + map(lambda x: {**x, **extract_type_from_comment_response(x)}, response_data) + ) return updated_response_data + def get_all_repo_issues_and_prs_comments(owner: str, repo: str): """ Retrieves all review comments for all pull requests in a GitHub repository. diff --git a/dags/github_api_helpers/commits.py b/dags/github_api_helpers/commits.py index f67fe6c4..56d880e9 100644 --- a/dags/github_api_helpers/commits.py +++ b/dags/github_api_helpers/commits.py @@ -1,6 +1,7 @@ import requests from .smart_proxy import get + def fetch_commits(owner: str, repo: str, page: int, per_page: int = 100): """ Fetches the commits for a specific repo page by page. @@ -11,17 +12,15 @@ def fetch_commits(owner: str, repo: str, page: int, per_page: int = 100): :param per_page: The number of results per page (default is 30). :return: A list of commits for the specified repo. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/commits' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/commits" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_commits(owner: str, repo: str): """ Retrieves all commits for a specific repo. @@ -44,6 +43,7 @@ def get_all_commits(owner: str, repo: str): return all_commits + def fetch_commit_details(owner: str, repo: str, commit_sha: str): """ Fetches detailed information about a specific commit. @@ -53,10 +53,11 @@ def fetch_commit_details(owner: str, repo: str, commit_sha: str): :param commit_sha: The SHA hash of the commit. :return: Detailed information about the specified commit. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/commits/{commit_sha}' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/commits/{commit_sha}" response = get(endpoint) return response.json() + def fetch_commit_files(owner: str, repo: str, sha: str): """ Retrieves the files changed in a specific commit of a GitHub repository. @@ -67,7 +68,7 @@ def fetch_commit_files(owner: str, repo: str, sha: str): :return: A list of files changed in the specified commit. """ commit_details = fetch_commit_details(owner, repo, sha) - if 'files' in commit_details: - return commit_details['files'] + if "files" in commit_details: + return commit_details["files"] else: return [] diff --git a/dags/github_api_helpers/issues.py b/dags/github_api_helpers/issues.py index b1c800f4..19470d61 100644 --- a/dags/github_api_helpers/issues.py +++ b/dags/github_api_helpers/issues.py @@ -1,6 +1,7 @@ import requests from .smart_proxy import get + # Issues def fetch_issues(owner: str, repo: str, page: int, per_page: int = 100): """ @@ -12,7 +13,7 @@ def fetch_issues(owner: str, repo: str, page: int, per_page: int = 100): :param per_page: The number of results per page (default is 30). :return: A list of issues for the specified repo. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/issues' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/issues" params = { "per_page": per_page, @@ -25,9 +26,10 @@ def fetch_issues(owner: str, repo: str, page: int, per_page: int = 100): # Filter out pull requests issues = [issue for issue in response_data if "pull_request" not in issue] is_more_issues = len(response_data) == per_page - + return issues, is_more_issues + def get_all_issues(owner: str, repo: str): """ Retrieves all issues for a specific repo. @@ -42,7 +44,7 @@ def get_all_issues(owner: str, repo: str): while True: issues, is_more_issues = fetch_issues(owner, repo, current_page) all_issues.extend(issues) - + if not is_more_issues: break # No more issues to fetch @@ -50,8 +52,11 @@ def get_all_issues(owner: str, repo: str): return all_issues + # Issue Comments -def fetch_issue_comments(owner: str, repo: str, issue_number: int, page: int, per_page: int = 30): +def fetch_issue_comments( + owner: str, repo: str, issue_number: int, page: int, per_page: int = 30 +): """ Fetches the comments for a specific issue page by page. @@ -62,11 +67,14 @@ def fetch_issue_comments(owner: str, repo: str, issue_number: int, page: int, pe :param per_page: The number of results per page (default is 30). :return: A list of comments for the specified issue page. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/comments' + endpoint = ( + f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/comments" + ) params = {"page": page, "per_page": per_page} response = get(endpoint, params=params) return response.json() + def get_all_comments_of_issue(owner: str, repo: str, issue_number: int): """ Retrieves all comments for a specific issue. diff --git a/dags/github_api_helpers/labels.py b/dags/github_api_helpers/labels.py index 534a193f..7d0259da 100644 --- a/dags/github_api_helpers/labels.py +++ b/dags/github_api_helpers/labels.py @@ -1,5 +1,6 @@ from .smart_proxy import get + def fetch_repo_labels_page(owner: str, repo: str, page: int, per_page: int = 100): """ Fetches the labels for a specific repository in GitHub. @@ -10,17 +11,15 @@ def fetch_repo_labels_page(owner: str, repo: str, page: int, per_page: int = 100 :param per_page: The number of results per page (default is 100). :return: A list of labels for the specified repository. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/labels' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/labels" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_repo_labels(owner: str, repo: str): """ Retrieves all labels for a specific repository in GitHub. diff --git a/dags/github_api_helpers/orgs.py b/dags/github_api_helpers/orgs.py index 734e8315..83f458c0 100644 --- a/dags/github_api_helpers/orgs.py +++ b/dags/github_api_helpers/orgs.py @@ -1,6 +1,7 @@ import requests from .smart_proxy import get + def fetch_org_details(org_name: str): """ Fetches the details of a specific organization in GitHub. @@ -8,13 +9,14 @@ def fetch_org_details(org_name: str): :param org_name: The name of the organization. :return: A dict containing the details of the specified organization. """ - endpoint = f'https://api.github.com/orgs/{org_name}' + endpoint = f"https://api.github.com/orgs/{org_name}" response = get(endpoint) response_data = response.json() return response_data + def fetch_org_members_page(org: str, page: int, per_page: int = 100): """ Fetches a page of members for a specific organization in GitHub. @@ -24,17 +26,15 @@ def fetch_org_members_page(org: str, page: int, per_page: int = 100): :param per_page: The number of results per page (default is 100). :return: A list of members for the specified organization. """ - endpoint = f'https://api.github.com/orgs/{org}/members?role=all' + endpoint = f"https://api.github.com/orgs/{org}/members?role=all" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_org_members(org: str): """ Retrieves all members of a specific organization in GitHub. @@ -54,4 +54,4 @@ def get_all_org_members(org: str): all_members.extend(members) current_page += 1 - return all_members \ No newline at end of file + return all_members diff --git a/dags/github_api_helpers/pull_requests.py b/dags/github_api_helpers/pull_requests.py index dcdcfb2c..94820416 100644 --- a/dags/github_api_helpers/pull_requests.py +++ b/dags/github_api_helpers/pull_requests.py @@ -11,7 +11,7 @@ def fetch_pull_requests(owner: str, repo: str, page: int, per_page: int = 100): :param per_page: The number of results per page (default is 100). :return: A list of pull requests for the specified repo. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/pulls" params = { "per_page": per_page, @@ -20,9 +20,10 @@ def fetch_pull_requests(owner: str, repo: str, page: int, per_page: int = 100): } response = get(endpoint, params=params) response_data = response.json() - + return response_data + def get_all_pull_requests(owner: str, repo: str): """ Retrieves all pull requests for a specific repo in a GitHub repository. @@ -47,7 +48,9 @@ def get_all_pull_requests(owner: str, repo: str): return all_pull_requests -def fetch_pull_requests_commits(owner: str, repo: str, pull_number: int, page: int, per_page: int = 100): +def fetch_pull_requests_commits( + owner: str, repo: str, pull_number: int, page: int, per_page: int = 100 +): """ Fetches the commits for a specific pull request in a GitHub repository. @@ -58,17 +61,17 @@ def fetch_pull_requests_commits(owner: str, repo: str, pull_number: int, page: i :param per_page: The number of results per page (default is 100). :return: A list of commits for the specified pull request. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/commits' + endpoint = ( + f"https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/commits" + ) - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() - + return response_data + def get_all_commits_of_pull_request(owner: str, repo: str, pull_number: int): """ Retrieves all commits for a specific pull request in a GitHub repository. @@ -93,7 +96,9 @@ def get_all_commits_of_pull_request(owner: str, repo: str, pull_number: int): return all_commits -def fetch_pull_request_comments(owner: str, repo: str, issue_number: int, page: int, per_page: int = 30): +def fetch_pull_request_comments( + owner: str, repo: str, issue_number: int, page: int, per_page: int = 30 +): """ Fetches the comments for a specific issue page by page. @@ -104,11 +109,14 @@ def fetch_pull_request_comments(owner: str, repo: str, issue_number: int, page: :param per_page: The number of results per page (default is 30). :return: A list of comments for the specified issue page. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/comments' + endpoint = ( + f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/comments" + ) params = {"page": page, "per_page": per_page} response = get(endpoint, params=params) return response.json() + def get_all_comments_of_pull_request(owner: str, repo: str, issue_number: int): """ Retrieves all comments for a specific issue. @@ -129,7 +137,9 @@ def get_all_comments_of_pull_request(owner: str, repo: str, issue_number: int): return all_comments -def fetch_pull_request_review_comments(owner: str, repo: str, pull_number: int, page: int, per_page: int = 100): +def fetch_pull_request_review_comments( + owner: str, repo: str, pull_number: int, page: int, per_page: int = 100 +): """ Fetches the review comments for a specific pull request page by page. @@ -140,11 +150,14 @@ def fetch_pull_request_review_comments(owner: str, repo: str, pull_number: int, :param per_page: The number of results per page (default is 30). :return: A list of review comments for the specified pull request page. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/comments' + endpoint = ( + f"https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/comments" + ) params = {"page": page, "per_page": per_page} response = get(endpoint, params=params) return response.json() + def get_all_review_comments_of_pull_request(owner: str, repo: str, pull_number: int): """ Retrieves all review comments for a specific pull request. @@ -157,7 +170,9 @@ def get_all_review_comments_of_pull_request(owner: str, repo: str, pull_number: all_comments = [] current_page = 1 while True: - comments = fetch_pull_request_review_comments(owner, repo, pull_number, current_page) + comments = fetch_pull_request_review_comments( + owner, repo, pull_number, current_page + ) if not comments: # Break the loop if no more comments are found break all_comments.extend(comments) @@ -165,7 +180,9 @@ def get_all_review_comments_of_pull_request(owner: str, repo: str, pull_number: return all_comments -def fetch_review_comment_reactions(owner: str, repo: str, comment_id: int, page: int, per_page: int = 100): +def fetch_review_comment_reactions( + owner: str, repo: str, comment_id: int, page: int, per_page: int = 100 +): """ Fetches the reactions for a specific pull request comment. @@ -176,11 +193,12 @@ def fetch_review_comment_reactions(owner: str, repo: str, comment_id: int, page: :param per_page: The number of results per page (default is 100). :return: A list of reactions for the specified pull request comment. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/comments/{comment_id}/reactions' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/pulls/comments/{comment_id}/reactions" params = {"page": page, "per_page": per_page} response = get(endpoint, params=params) return response.json() + def get_all_reactions_of_review_comment(owner: str, repo: str, comment_id: int): """ Retrieves all reactions for a specific pull request comment. @@ -201,7 +219,9 @@ def get_all_reactions_of_review_comment(owner: str, repo: str, comment_id: int): return all_reactions -def fetch_comment_reactions(owner: str, repo: str, comment_id: int, page: int, per_page: int = 100): +def fetch_comment_reactions( + owner: str, repo: str, comment_id: int, page: int, per_page: int = 100 +): """ Fetches the reactions for a specific issue comment. @@ -212,12 +232,15 @@ def fetch_comment_reactions(owner: str, repo: str, comment_id: int, page: int, p :param per_page: The number of results per page (default is 100). :return: A list of reactions for the specified issue comment. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/issues/comments/{comment_id}/reactions' - headers = {"Accept": "application/vnd.github.squirrel-girl-preview+json"} # Custom media type is required + endpoint = f"https://api.github.com/repos/{owner}/{repo}/issues/comments/{comment_id}/reactions" + headers = { + "Accept": "application/vnd.github.squirrel-girl-preview+json" + } # Custom media type is required params = {"page": page, "per_page": per_page} response = get(endpoint, headers=headers, params=params) return response.json() + def get_all_reactions_of_comment(owner: str, repo: str, comment_id: int): """ Retrieves all reactions for a specific issue comment. @@ -238,7 +261,9 @@ def get_all_reactions_of_comment(owner: str, repo: str, comment_id: int): return all_reactions -def fetch_pull_request_reviews(owner: str, repo: str, pull_number: int, page: int, per_page: int = 100): +def fetch_pull_request_reviews( + owner: str, repo: str, pull_number: int, page: int, per_page: int = 100 +): """ Fetches the reviews for a specific pull request page by page. @@ -249,11 +274,14 @@ def fetch_pull_request_reviews(owner: str, repo: str, pull_number: int, page: in :param per_page: The number of results per page (default is 100). :return: A list of reviews for the specified pull request page. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/reviews' + endpoint = ( + f"https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/reviews" + ) params = {"page": page, "per_page": per_page} response = get(endpoint, params=params) return response.json() + def get_all_reviews_of_pull_request(owner: str, repo: str, pull_number: int): """ Retrieves all reviews for a specific pull request. @@ -274,7 +302,9 @@ def get_all_reviews_of_pull_request(owner: str, repo: str, pull_number: int): return all_reviews -def fetch_pull_request_files_page(owner: str, repo: str, pull_number: int, page: int, per_page: int = 100): +def fetch_pull_request_files_page( + owner: str, repo: str, pull_number: int, page: int, per_page: int = 100 +): """ Fetches the files of a specific pull request in a GitHub repository. @@ -285,17 +315,15 @@ def fetch_pull_request_files_page(owner: str, repo: str, pull_number: int, page: :param per_page: The number of results per page (default is 30). :return: A list of files for the specified pull request. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/files' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pull_number}/files" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_pull_request_files(owner: str, repo: str, pull_number: int): """ Retrieves all files for a specified pull request in a GitHub repository. diff --git a/dags/github_api_helpers/repos.py b/dags/github_api_helpers/repos.py index 56012d0d..0d1318f6 100644 --- a/dags/github_api_helpers/repos.py +++ b/dags/github_api_helpers/repos.py @@ -1,5 +1,6 @@ from .smart_proxy import get + def fetch_org_repos_page(org_name: str, page: int, per_page: int = 100): """ Fetches the repos for a specific organization in GitHub. @@ -9,17 +10,15 @@ def fetch_org_repos_page(org_name: str, page: int, per_page: int = 100): :param per_page: The number of results per page (default is 100). :return: A list of repos for the specified organization. """ - endpoint = f'https://api.github.com/orgs/{org_name}/repos' + endpoint = f"https://api.github.com/orgs/{org_name}/repos" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_org_repos(org_name: str): """ Retrieves all repos for a specific organization in GitHub. @@ -41,6 +40,7 @@ def get_all_org_repos(org_name: str): return all_repos + def fetch_repo_contributors_page(owner: str, repo: str, page: int, per_page: int = 100): """ Fetches the contributors for a specific repository in GitHub. @@ -51,17 +51,15 @@ def fetch_repo_contributors_page(owner: str, repo: str, page: int, per_page: int :param per_page: The number of results per page (default is 100). :return: A list of contributors for the specified repository. """ - endpoint = f'https://api.github.com/repos/{owner}/{repo}/contributors' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/contributors" - params = { - "per_page": per_page, - "page": page - } + params = {"per_page": per_page, "page": page} response = get(endpoint, params=params) response_data = response.json() return response_data + def get_all_repo_contributors(owner: str, repo: str): """ Retrieves all contributors for a specific repository in GitHub. @@ -82,4 +80,4 @@ def get_all_repo_contributors(owner: str, repo: str): all_contributors.extend(contributors) current_page += 1 - return all_contributors \ No newline at end of file + return all_contributors diff --git a/dags/github_api_helpers/smart_proxy.py b/dags/github_api_helpers/smart_proxy.py index 16a788aa..1e98d54f 100644 --- a/dags/github_api_helpers/smart_proxy.py +++ b/dags/github_api_helpers/smart_proxy.py @@ -1,7 +1,8 @@ import requests import random -def get(url: str, params = None): + +def get(url: str, params=None): """ Sends a GET request With Smart Proxy. @@ -17,7 +18,7 @@ def get(url: str, params = None): proxy_url = f"http://spusfxy185:TwinTwinTwin@eu.dc.smartproxy.com:{random_port}" print("proxy_url: ", proxy_url) proxies = { - 'http': proxy_url, - 'https': proxy_url, + "http": proxy_url, + "https": proxy_url, } return requests.get(url=url, params=params, proxies=proxies) diff --git a/dags/github_old_version.py b/dags/github_old_version.py index bb698784..1770656f 100644 --- a/dags/github_old_version.py +++ b/dags/github_old_version.py @@ -5,32 +5,33 @@ default_args = { - 'owner': 'MohammadTwin', - 'start_date': datetime(2023, 11, 8), - 'retries': 1, - 'retry_delay': timedelta(minutes=1), - + "owner": "MohammadTwin", + "start_date": datetime(2023, 11, 8), + "retries": 1, + "retry_delay": timedelta(minutes=1), } dag = DAG( - 'github_old_version', + "github_old_version", default_args=default_args, - description='GitHub Data Extraction DAG', + description="GitHub Data Extraction DAG", schedule_interval=None, catchup=False, ) + def get_github_repos(ti): - endpoint = 'https://api.github.com/orgs/TogetherCrew/repos' + endpoint = "https://api.github.com/orgs/TogetherCrew/repos" response = requests.get(endpoint) response_data = response.json() print("[response_data] ", response_data) - ti.xcom_push(key='github_repos', value=response_data) + ti.xcom_push(key="github_repos", value=response_data) + get_repos_task = PythonOperator( - task_id='get_github_repos', + task_id="get_github_repos", python_callable=get_github_repos, provide_context=True, dag=dag, @@ -38,91 +39,96 @@ def get_github_repos(ti): def get_pull_requests(owner: str, repo: str): - endpoint = f'https://api.github.com/repos/{owner}/{repo}/pulls' + endpoint = f"https://api.github.com/repos/{owner}/{repo}/pulls" - params = { - "per_page": 100, - "page": 1, - "state": "all" - } + params = {"per_page": 100, "page": 1, "state": "all"} response = requests.get(endpoint, params=params) response_data = response.json() return response_data + def extract_pull_requests(ti): prs_data = {} - github_repos = ti.xcom_pull(key='github_repos', task_ids='get_github_repos') + github_repos = ti.xcom_pull(key="github_repos", task_ids="get_github_repos") for repo in github_repos: - prs = get_pull_requests(owner= repo['owner']['login'], repo= repo['name']) - prs_data[repo['id']] = prs - - ti.xcom_push(key='github_prs', value=github_repos) + prs = get_pull_requests(owner=repo["owner"]["login"], repo=repo["name"]) + prs_data[repo["id"]] = prs + + ti.xcom_push(key="github_prs", value=github_repos) return prs_data -def transform_pull_requests(ti): +def transform_pull_requests(ti): return None + def load_pull_requests(ti): print("Loaded PR data into the destination:") + task_extract_pull_requests = PythonOperator( - task_id='extract_pull_requests', + task_id="extract_pull_requests", python_callable=extract_pull_requests, provide_context=True, dag=dag, ) task_transform_pull_requests = PythonOperator( - task_id='transform_pull_requests', + task_id="transform_pull_requests", python_callable=transform_pull_requests, provide_context=True, dag=dag, ) task_load_pull_requests = PythonOperator( - task_id='load_pull_requests', + task_id="load_pull_requests", python_callable=load_pull_requests, provide_context=True, dag=dag, ) -get_repos_task >> task_extract_pull_requests >> task_transform_pull_requests >> task_load_pull_requests +( + get_repos_task + >> task_extract_pull_requests + >> task_transform_pull_requests + >> task_load_pull_requests +) def extract_commits(ti): - - github_repos = ti.xcom_pull(key='github_repos', task_ids='get_github_repos') + github_repos = ti.xcom_pull(key="github_repos", task_ids="get_github_repos") for repo in github_repos: print("\n[repo] ", repo) return None -def transform_commits(ti): +def transform_commits(ti): return None + def load_commits(ti): print("Loaded Commit data into the destination:") + task_extract_commits = PythonOperator( - task_id='extract_commits', + task_id="extract_commits", python_callable=extract_commits, provide_context=True, dag=dag, ) task_transform_commits = PythonOperator( - task_id='transform_commits', + task_id="transform_commits", python_callable=transform_commits, provide_context=True, dag=dag, ) task_load_commits = PythonOperator( - task_id='load_commits', + task_id="load_commits", python_callable=load_commits, provide_context=True, dag=dag, @@ -130,38 +136,39 @@ def load_commits(ti): get_repos_task >> task_extract_commits >> task_transform_commits >> task_load_commits -def extract_issues(ti): - github_repos = ti.xcom_pull(key='github_repos', task_ids='get_github_repos') +def extract_issues(ti): + github_repos = ti.xcom_pull(key="github_repos", task_ids="get_github_repos") for repo in github_repos: print("\n[repo] ", repo) return None -def transform_issues(ti): +def transform_issues(ti): return None + def load_issues(ti): print("Loaded issues data into the destination:") task_extract_issues = PythonOperator( - task_id='extract_issues', + task_id="extract_issues", python_callable=extract_issues, provide_context=True, dag=dag, ) task_transform_issues = PythonOperator( - task_id='transform_issues', + task_id="transform_issues", python_callable=transform_issues, provide_context=True, dag=dag, ) task_load_issues = PythonOperator( - task_id='load_issues', + task_id="load_issues", python_callable=load_issues, provide_context=True, dag=dag, diff --git a/dags/neo4j_storage/__init__.py b/dags/neo4j_storage/__init__.py index 50bd526a..0a22ad3c 100644 --- a/dags/neo4j_storage/__init__.py +++ b/dags/neo4j_storage/__init__.py @@ -1,7 +1,15 @@ -from .orgs import save_orgs_to_neo4j, save_org_member_to_neo4j, get_orgs_profile_from_neo4j +from .orgs import ( + save_orgs_to_neo4j, + save_org_member_to_neo4j, + get_orgs_profile_from_neo4j, +) from .repos import save_repo_to_neo4j, save_repo_contributors_to_neo4j -from .pull_requests import save_pull_request_to_neo4j, save_review_to_neo4j, save_pr_files_changes_to_neo4j +from .pull_requests import ( + save_pull_request_to_neo4j, + save_review_to_neo4j, + save_pr_files_changes_to_neo4j, +) from .issues import save_issue_to_neo4j from .labels import save_label_to_neo4j from .commits import save_commit_to_neo4j, save_commit_files_changes_to_neo4j -from .comments import save_review_comment_to_neo4j, save_comment_to_neo4j \ No newline at end of file +from .comments import save_review_comment_to_neo4j, save_comment_to_neo4j diff --git a/dags/neo4j_storage/comments.py b/dags/neo4j_storage/comments.py index 5a6efa10..1d25d17a 100644 --- a/dags/neo4j_storage/comments.py +++ b/dags/neo4j_storage/comments.py @@ -2,15 +2,15 @@ from .neo4j_enums import Node, Relationship from .utils import flat_map + def save_review_comment_to_neo4j(review_comment: dict, repository_id: str): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - user = review_comment.pop('user', None) - pull_request_number= review_comment.pop('pull_request_number', None) + user = review_comment.pop("user", None) + pull_request_number = review_comment.pop("pull_request_number", None) cleaned_review = flat_map(review_comment) - + if pull_request_number: pull_request_query = f""" WITH rc @@ -18,8 +18,8 @@ def save_review_comment_to_neo4j(review_comment: dict, repository_id: str): MERGE (rc)-[ra:{Relationship.IS_ON.value}]->(pr) SET ra.latestSavedAt = datetime() """ - else: pull_request_query = "" - + else: + pull_request_query = "" if user: user_query = f""" @@ -30,32 +30,39 @@ def save_review_comment_to_neo4j(review_comment: dict, repository_id: str): MERGE (ghu)-[ra:{Relationship.CREATED.value}]->(rc) SET ra.latestSavedAt = datetime() """ - else: user_query = "" - + else: + user_query = "" with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (rc:{Node.ReviewComment.value} {{id: $review.id}}) SET rc += $review, rc.repository_id = $repository_id, rc.latestSavedAt = datetime() { user_query } { pull_request_query } - """, review= cleaned_review, repository_id= repository_id, user= user, pull_request_number= pull_request_number)) + """, + review=cleaned_review, + repository_id=repository_id, + user=user, + pull_request_number=pull_request_number, + ) + ) driver.close() + def save_comment_to_neo4j(comment: dict, repository_id: str): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - user = comment.pop('user', None) - type = comment.pop('type', None) - number= comment.pop('number', None) + user = comment.pop("user", None) + type = comment.pop("type", None) + number = comment.pop("number", None) cleaned_comment = flat_map(comment) - - if type == 'issue': + + if type == "issue": issue_query = f""" WITH c MATCH (i:{Node.Issue.value} {{number: $number, repository_id: $repository_id}}) @@ -63,7 +70,7 @@ def save_comment_to_neo4j(comment: dict, repository_id: str): MERGE (c)-[ra:{Relationship.IS_ON.value}]->(i) SET ra.latestSavedAt = datetime() """ - if type == 'pull_request': + if type == "pull_request": issue_query = f""" WITH c MATCH (pr:{Node.PullRequest.value} {{number: $number, repository_id: $repository_id}}) @@ -81,17 +88,24 @@ def save_comment_to_neo4j(comment: dict, repository_id: str): MERGE (ghu)-[ra:{Relationship.CREATED.value}]->(c) SET ra.latestSavedAt = datetime() """ - else: user_query = "" - + else: + user_query = "" with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (c:{Node.Comment.value} {{id: $comment.id}}) SET c += $comment, c.repository_id = $repository_id, c.latestSavedAt = datetime() { user_query } { issue_query } - """, comment= cleaned_comment, repository_id= int(repository_id), user= user, number= int(number))) + """, + comment=cleaned_comment, + repository_id=int(repository_id), + user=user, + number=int(number), + ) + ) driver.close() diff --git a/dags/neo4j_storage/commits.py b/dags/neo4j_storage/commits.py index 899601f2..df11dfe7 100644 --- a/dags/neo4j_storage/commits.py +++ b/dags/neo4j_storage/commits.py @@ -2,14 +2,14 @@ from .neo4j_enums import Node, Relationship from .utils import flat_map + def save_commit_to_neo4j(commit: dict, repository_id: str): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - committer = commit.pop('committer', None) + committer = commit.pop("committer", None) cleaned_commit = flat_map(commit) - + if committer: committer_query = f""" WITH c @@ -18,29 +18,38 @@ def save_commit_to_neo4j(commit: dict, repository_id: str): WITH c, ghu MERGE (ghu)-[cc:{Relationship.COMMITTED.value}]->(c) SET cc.latestSavedAt = datetime() - """ - else: committer_query = "" - + """ + else: + committer_query = "" with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (c:{Node.Commit.value} {{sha: $commit.sha}}) SET c += $commit, c.repository_id = $repository_id, c.latestSavedAt = datetime() { committer_query } - """, commit= cleaned_commit, repository_id= repository_id, committer= committer)) + """, + commit=cleaned_commit, + repository_id=repository_id, + committer=committer, + ) + ) driver.close() -def save_commit_files_changes_to_neo4j(commit_sha: str, repository_id: str, file_changes: list): - + +def save_commit_files_changes_to_neo4j( + commit_sha: str, repository_id: str, file_changes: list +): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MATCH (repo:{Node.Repository.value} {{id: $repository_id}}), (c:{Node.Commit.value} {{sha: $commit_sha}}) WITH repo, c UNWIND $file_changes AS file_change @@ -50,6 +59,11 @@ def save_commit_files_changes_to_neo4j(commit_sha: str, repository_id: str, file SET fc.latestSavedAt = datetime() MERGE (f)-[io:{Relationship.IS_ON.value}]->(repo) SET io.latestSavedAt = datetime() - """, commit_sha= commit_sha, repository_id= repository_id, file_changes= file_changes)) + """, + commit_sha=commit_sha, + repository_id=repository_id, + file_changes=file_changes, + ) + ) driver.close() diff --git a/dags/neo4j_storage/issues.py b/dags/neo4j_storage/issues.py index a47ad6e7..9b3fca89 100644 --- a/dags/neo4j_storage/issues.py +++ b/dags/neo4j_storage/issues.py @@ -2,27 +2,28 @@ from .neo4j_enums import Node, Relationship from .utils import remove_nested_collections + def save_issue_to_neo4j(issue: dict, repository_id: str): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - issue_creator = issue.pop('user', None) - assignee = issue.pop('assignee', None) - assignees = issue.pop('assignees', None) - labels = issue.pop('labels', None) + issue_creator = issue.pop("user", None) + assignee = issue.pop("assignee", None) + assignees = issue.pop("assignees", None) + labels = issue.pop("labels", None) cleaned_issue = remove_nested_collections(issue) - + if assignee: - assignee_query = f""" + assignee_query = f""" WITH is MERGE (ghu:{Node.GitHubUser.value} {{id: $assignee.id}}) SET ghu += $assignee, ghu.latestSavedAt = datetime() WITH is, ghu MERGE (is)-[assignghu:{Relationship.ASSIGNED.value}]->(ghu) SET assignghu.latestSavedAt = datetime() - """ - else: assignee_query = "" + """ + else: + assignee_query = "" assignees_query = f""" WITH is @@ -44,10 +45,10 @@ def save_issue_to_neo4j(issue: dict, repository_id: str): SET haslb.latestSavedAt = datetime() """ - with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (is:{Node.Issue.value} {{id: $issue.id}}) SET is += $issue, is.repository_id = $repository_id, is.latestSavedAt = datetime() WITH is @@ -60,7 +61,13 @@ def save_issue_to_neo4j(issue: dict, repository_id: str): { assignee_query } { assignees_query } { labels_query } - """, issue= cleaned_issue, repository_id= repository_id, issue_creator= issue_creator, - labels= labels, assignee= assignee, assignees= assignees) + """, + issue=cleaned_issue, + repository_id=repository_id, + issue_creator=issue_creator, + labels=labels, + assignee=assignee, + assignees=assignees, + ) ) driver.close() diff --git a/dags/neo4j_storage/labels.py b/dags/neo4j_storage/labels.py index fb3457cd..bba43709 100644 --- a/dags/neo4j_storage/labels.py +++ b/dags/neo4j_storage/labels.py @@ -1,16 +1,19 @@ from .neo4j_connection import Neo4jConnection from .neo4j_enums import Node -def save_label_to_neo4j(label: dict): +def save_label_to_neo4j(label: dict): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - + with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (lb:{Node.Label.value} {{id: $label.id}}) SET lb += $label, lb.latestSavedAt = datetime() - """, label=label) + """, + label=label, + ) ) driver.close() diff --git a/dags/neo4j_storage/neo4j_enums.py b/dags/neo4j_storage/neo4j_enums.py index 55e00935..8b83f80b 100644 --- a/dags/neo4j_storage/neo4j_enums.py +++ b/dags/neo4j_storage/neo4j_enums.py @@ -1,7 +1,8 @@ from enum import Enum + class Node(Enum): - OrganizationProfile = "OrganizationProfile" # This node is created by the API, and we receive a list of organizations detail to extract data from + OrganizationProfile = "OrganizationProfile" # This node is created by the API, and we receive a list of organizations detail to extract data from GitHubOrganization = "GitHubOrganization" GitHubUser = "GitHubUser" PullRequest = "PullRequest" @@ -13,6 +14,7 @@ class Node(Enum): ReviewComment = "ReviewComment" File = "File" + class Relationship(Enum): IS_MEMBER = "IS_MEMBER" IS_WITHIN = "IS_WITHIN" diff --git a/dags/neo4j_storage/orgs.py b/dags/neo4j_storage/orgs.py index 70066cb9..5d95e154 100644 --- a/dags/neo4j_storage/orgs.py +++ b/dags/neo4j_storage/orgs.py @@ -2,15 +2,16 @@ from .neo4j_enums import Node, Relationship from neo4j.time import DateTime as Neo4jDateTime + def get_orgs_profile_from_neo4j(): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - + def do_cypher_tx(tx, cypher): # TODO: should be refactored result = tx.run(cypher) values = [record for record in result] - + records = [] for value in values: record = {} @@ -24,36 +25,43 @@ def do_cypher_tx(tx, cypher): return records with driver.session() as session: - orgs = session.execute_read(do_cypher_tx, f""" + orgs = session.execute_read( + do_cypher_tx, + f""" MATCH (op:{Node.OrganizationProfile.value}) RETURN op - """ + """, ) driver.close() - + return orgs -def save_orgs_to_neo4j(org: dict): +def save_orgs_to_neo4j(org: dict): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - + with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (gho:{Node.GitHubOrganization.value} {{id: $org.id}}) SET gho += $org, gho.latestSavedAt = datetime() - """, org=org) + """, + org=org, + ) ) driver.close() + def save_org_member_to_neo4j(org_id: str, member: dict): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - + with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MATCH (gho:{Node.GitHubOrganization.value} {{id: $org_id}}) WITH gho MERGE (ghu:{Node.GitHubUser.value} {{id: $member.id}}) @@ -61,6 +69,9 @@ def save_org_member_to_neo4j(org_id: str, member: dict): WITH ghu, gho MERGE (ghu)-[im:{Relationship.IS_MEMBER.value}]->(gho) SET im.latestSavedAt = datetime() - """, org_id= org_id, member=member) + """, + org_id=org_id, + member=member, + ) ) - driver.close() \ No newline at end of file + driver.close() diff --git a/dags/neo4j_storage/pull_requests.py b/dags/neo4j_storage/pull_requests.py index d0a730cc..58db5200 100644 --- a/dags/neo4j_storage/pull_requests.py +++ b/dags/neo4j_storage/pull_requests.py @@ -2,18 +2,17 @@ from .neo4j_enums import Node, Relationship from .utils import remove_nested_collections + def save_pull_request_to_neo4j(pr: dict, repository_id: str): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - repo_creator = pr.pop('user', None) - assignee = pr.pop('assignee', None) - assignees = pr.pop('assignees', None) - requested_reviewers = pr.pop('requested_reviewers', None) - labels = pr.pop('labels', None) + repo_creator = pr.pop("user", None) + assignee = pr.pop("assignee", None) + assignees = pr.pop("assignees", None) + requested_reviewers = pr.pop("requested_reviewers", None) + labels = pr.pop("labels", None) cleaned_pr = remove_nested_collections(pr) - if assignee: assignee_query = f""" @@ -23,8 +22,9 @@ def save_pull_request_to_neo4j(pr: dict, repository_id: str): WITH pr, ghu MERGE (pr)-[assignghu:{Relationship.ASSIGNED.value}]->(ghu) SET assignghu.latestSavedAt = datetime() - """ - else: assignee_query = "" + """ + else: + assignee_query = "" assignees_query = f""" WITH pr @@ -57,8 +57,9 @@ def save_pull_request_to_neo4j(pr: dict, repository_id: str): """ with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (pr:{Node.PullRequest.value} {{id: $pr.id}}) SET pr += $pr, pr.repository_id = $repository_id, pr.latestSavedAt = datetime() @@ -74,21 +75,29 @@ def save_pull_request_to_neo4j(pr: dict, repository_id: str): { requested_reviewers_query } { labels_query } - """, pr= cleaned_pr, repository_id= repository_id, repo_creator= repo_creator, - assignee= assignee, assignees= assignees, labels= labels, - requested_reviewers= requested_reviewers) + """, + pr=cleaned_pr, + repository_id=repository_id, + repo_creator=repo_creator, + assignee=assignee, + assignees=assignees, + labels=labels, + requested_reviewers=requested_reviewers, + ) ) driver.close() + def save_review_to_neo4j(pr_id: dict, review: dict): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - author = review.pop('user', None) + author = review.pop("user", None) with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MATCH (pr:{Node.PullRequest.value} {{id: $pr_id}}) WITH pr MERGE (ghu:{Node.GitHubUser.value} {{id: $author.id}}) @@ -96,23 +105,30 @@ def save_review_to_neo4j(pr_id: dict, review: dict): WITH pr, ghu MERGE (ghu)-[reviewed:{Relationship.REVIEWED.value}]->(pr) SET reviewed.latestSavedAt = datetime(), reviewed.state = $review.state - """, pr_id= int(pr_id), author= author, review= review) + """, + pr_id=int(pr_id), + author=author, + review=review, + ) ) driver.close() + def save_pr_files_changes_to_neo4j(pr_id: int, repository_id: str, file_changes: list): - neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() - print(f"MATCH (repo:{Node.Repository.value} {{id: $repository_id}}), (pr:{Node.PullRequest.value} {{id: $pr_id}})") + print( + f"MATCH (repo:{Node.Repository.value} {{id: $repository_id}}), (pr:{Node.PullRequest.value} {{id: $pr_id}})" + ) print("repository_id", repository_id) print("pr_id", pr_id) with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MATCH (repo:{Node.Repository.value} {{id: $repository_id}}), (pr:{Node.PullRequest.value} {{id: $pr_id}}) WITH repo, pr UNWIND $file_changes AS file_change @@ -122,6 +138,11 @@ def save_pr_files_changes_to_neo4j(pr_id: int, repository_id: str, file_changes: SET fc.latestSavedAt = datetime() MERGE (f)-[io:{Relationship.IS_ON.value}]->(repo) SET io.latestSavedAt = datetime() - """, pr_id= int(pr_id), repository_id= int(repository_id), file_changes= file_changes)) + """, + pr_id=int(pr_id), + repository_id=int(repository_id), + file_changes=file_changes, + ) + ) driver.close() diff --git a/dags/neo4j_storage/repos.py b/dags/neo4j_storage/repos.py index 4682d992..d08420e0 100644 --- a/dags/neo4j_storage/repos.py +++ b/dags/neo4j_storage/repos.py @@ -2,17 +2,18 @@ from .neo4j_enums import Node, Relationship from .utils import flat_map -def save_repo_to_neo4j(repo: dict): - owner = repo.pop('owner', None) +def save_repo_to_neo4j(repo: dict): + owner = repo.pop("owner", None) cleaned_repo = flat_map(repo) neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (r:{Node.Repository.value} {{id: $repo.id}}) SET r += $repo, r.latestSavedAt = datetime() WITH r @@ -20,18 +21,22 @@ def save_repo_to_neo4j(repo: dict): WITH r, go MERGE (r)-[rel:{Relationship.IS_WITHIN.value}]->(go) SET rel.latestSavedAt = datetime() - """, repo=cleaned_repo, owner=owner) + """, + repo=cleaned_repo, + owner=owner, + ) ) driver.close() -def save_repo_contributors_to_neo4j(contributor: dict, repository_id: str): +def save_repo_contributors_to_neo4j(contributor: dict, repository_id: str): neo4jConnection = Neo4jConnection() driver = neo4jConnection.connect_neo4j() with driver.session() as session: - session.execute_write(lambda tx: - tx.run(f""" + session.execute_write( + lambda tx: tx.run( + f""" MERGE (ghu:{Node.GitHubUser.value} {{id: $member.id}}) SET ghu += $member, ghu.latestSavedAt = datetime() WITH ghu @@ -39,6 +44,9 @@ def save_repo_contributors_to_neo4j(contributor: dict, repository_id: str): WITH ghu, r MERGE (ghu)-[im:{Relationship.IS_MEMBER.value}]->(r) SET im.latestSavedAt = datetime() - """, member=contributor, repository_id=repository_id) + """, + member=contributor, + repository_id=repository_id, + ) ) - driver.close() \ No newline at end of file + driver.close() diff --git a/dags/neo4j_storage/utils.py b/dags/neo4j_storage/utils.py index d261400a..d340e36f 100644 --- a/dags/neo4j_storage/utils.py +++ b/dags/neo4j_storage/utils.py @@ -1,10 +1,10 @@ def remove_nested_collections(data): """ Removes any nested dictionaries or lists from the input dictionary. - + Args: data (dict): A dictionary potentially containing nested dictionaries or lists. - + Returns: dict: A dictionary with nested dictionaries and lists removed. """ @@ -15,9 +15,10 @@ def remove_nested_collections(data): for key, value in list(data.items()): if isinstance(value, (dict, list)): del data[key] - + return data + def flat_map(obj): """ Function to iterate over an object and flat map its values if the value is an object or an array. From 67f5d93d4536b34f71e1955c2d39c250d31ad43e Mon Sep 17 00:00:00 2001 From: Mohammad Twin Date: Thu, 14 Dec 2023 17:07:35 +0400 Subject: [PATCH 2/2] feat: Readme.md updated to add a direction for linting codes --- Readme.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Readme.md b/Readme.md index 2dd42034..c51ca256 100644 --- a/Readme.md +++ b/Readme.md @@ -8,3 +8,11 @@ You can quickly launch the application using `Docker Compose`: ```bash docker-compose --profile flower up ``` + +## Lint the code + +The code can be linted by using the below command + +```bash +python -m black . +```