diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 29afab2b0d..fc466f021c 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -91,9 +91,12 @@ def start(disable_collection, development, port): logger.info("Deleting old task schedule") os.remove("celerybeat-schedule.db") - celery_beat_process = None - celery_command = "celery -A augur.tasks.init.celery_app.celery_app beat -l debug" - celery_beat_process = subprocess.Popen(celery_command.split(" ")) + with DatabaseSession(logger) as db_session: + config = AugurConfig(logger, db_session) + log_level = config.get_value("Logging", "log_level") + celery_beat_process = None + celery_command = f"celery -A augur.tasks.init.celery_app.celery_app beat -l {log_level.lower()}" + celery_beat_process = subprocess.Popen(celery_command.split(" ")) if not disable_collection: diff --git a/augur/application/db/data_parse.py b/augur/application/db/data_parse.py index abdc6de54c..4c618860ba 100644 --- a/augur/application/db/data_parse.py +++ b/augur/application/db/data_parse.py @@ -313,7 +313,7 @@ def extract_needed_pr_message_ref_data(comment: dict, pull_request_id: int, repo def extract_needed_pr_data(pr, repo_id, tool_source, tool_version): - pr_dict = { + pr = { 'repo_id': repo_id, 'pr_url': pr['url'], # 1-22-2022 inconsistent casting; sometimes int, sometimes float in bulk_insert @@ -367,7 +367,7 @@ def extract_needed_pr_data(pr, repo_id, tool_source, tool_version): 'data_source': 'GitHub API' } - return pr_dict + return pr def extract_needed_issue_data(issue: dict, repo_id: int, tool_source: str, tool_version: str, data_source: str): @@ -513,8 +513,81 @@ def extract_needed_pr_review_data(review, pull_request_id, repo_id, platform_id, return review_row +def extract_needed_pr_data_from_gitlab_merge_request(pr, repo_id, tool_source, tool_version): - + pr_dict = { + 'repo_id': repo_id, + 'pr_url': pr['web_url'], + 'pr_src_id': pr['id'], + 'pr_src_node_id': None, + 'pr_html_url': pr['web_url'], + 'pr_diff_url': None, + 'pr_patch_url': None, + 'pr_issue_url': None, + 'pr_augur_issue_id': None, + 'pr_src_number': pr['iid'], + 'pr_src_state': pr['state'], + 'pr_src_locked': pr['discussion_locked'], + 'pr_src_title': pr['title'], + # TODO: Add contributor logic for gitlab + 'pr_augur_contributor_id': None, + 'pr_body': pr['description'], + 'pr_created_at': pr['created_at'], + 'pr_updated_at': pr['updated_at'], + 'pr_closed_at': pr['closed_at'], + 'pr_merged_at': pr['merged_at'], + 'pr_merge_commit_sha': pr['merge_commit_sha'], + 'pr_teams': None, + 'pr_milestone': pr['milestone'].get('title') if pr['milestone'] else None, + 'pr_commits_url': None, + 'pr_review_comments_url': None, + 'pr_review_comment_url': None, + 'pr_comments_url': None, + 'pr_statuses_url': None, + 'pr_meta_head_id': None, + 'pr_meta_base_id': None, + 'pr_src_issue_url': None, + 'pr_src_comments_url': None, + 'pr_src_review_comments_url': None, + 'pr_src_commits_url': None, + 'pr_src_statuses_url': None, + 'pr_src_author_association': None, + 'tool_source': tool_source, + 'tool_version': tool_version, + 'data_source': 'Gitlab API' + } + + return pr_dict +def extract_needed_issue_data_from_gitlab_issue(issue: dict, repo_id: int, tool_source: str, tool_version: str, data_source: str): + + issue_dict = { + "repo_id": repo_id, + "reporter_id": None, + "pull_request": None, + "pull_request_id": None, + "created_at": issue['created_at'], + "issue_title": issue['title'], + "issue_body": issue['description'] if 'description' in issue else None, + "comment_count": issue['user_notes_count'], + "updated_at": issue['updated_at'], + "closed_at": issue['closed_at'], + "repository_url": issue['_links']['project'], + "issue_url": issue['_links']['self'], + "labels_url": None, + "comments_url": issue['_links']['notes'], + "events_url": None, + "html_url": issue['_links']['self'], + "issue_state": issue['state'], + "issue_node_id": None, + "gh_issue_id": issue['id'], + "gh_issue_number": issue['iid'], + "gh_user_id": issue['author']['id'], + "tool_source": tool_source, + "tool_version": tool_version, + "data_source": data_source + } + return issue_dict + diff --git a/augur/application/db/models/augur_operations.py b/augur/application/db/models/augur_operations.py index 8da2b397fa..cf9da75af6 100644 --- a/augur/application/db/models/augur_operations.py +++ b/augur/application/db/models/augur_operations.py @@ -1232,10 +1232,22 @@ def insert(session, repo_id): repo_git = repo.repo_git collection_status_unique = ["repo_id"] + pr_issue_count = 0 + github_weight = 0 + if "github" in repo_git: + try: + pr_issue_count = get_repo_weight_by_issue(session.logger, repo_git) + #session.logger.info(f"date weight: {calculate_date_weight_from_timestamps(repo.repo_added, None)}") + github_weight = pr_issue_count - calculate_date_weight_from_timestamps(repo.repo_added, None) + except Exception as e: + pr_issue_count = None + github_weight = None + session.logger.error( + ''.join(traceback.format_exception(None, e, e.__traceback__))) + try: - pr_issue_count = get_repo_weight_by_issue(session.logger, repo_git) - #session.logger.info(f"date weight: {calculate_date_weight_from_timestamps(repo.repo_added, None)}") + pr_issue_count = 0 github_weight = pr_issue_count - calculate_date_weight_from_timestamps(repo.repo_added, None) except Exception as e: pr_issue_count = None @@ -1251,6 +1263,7 @@ def insert(session, repo_id): "secondary_weight": github_weight, "ml_weight": github_weight } + result = session.insert_data(record, CollectionStatus, collection_status_unique, on_conflict_update=False) diff --git a/augur/tasks/github/pull_requests/tasks.py b/augur/tasks/github/pull_requests/tasks.py index 3af6e39e08..478260bcd7 100644 --- a/augur/tasks/github/pull_requests/tasks.py +++ b/augur/tasks/github/pull_requests/tasks.py @@ -74,7 +74,7 @@ def retrieve_all_pr_data(repo_git: str, logger, key_auth) -> None: return all_data - + def process_pull_requests(pull_requests, task_name, repo_id, logger, augur_db): tool_source = "Pr Task" diff --git a/augur/tasks/github/util/util.py b/augur/tasks/github/util/util.py index fbb23dd6e8..0400b82e1a 100644 --- a/augur/tasks/github/util/util.py +++ b/augur/tasks/github/util/util.py @@ -54,7 +54,7 @@ def parse_json_response(logger: logging.Logger, response: httpx.Response) -> dic try: return response.json() except json.decoder.JSONDecodeError as e: - logger.warning(f"invalid return from GitHub. Response was: {response.text}. Exception: {e}") + logger.warning(f"invalid return. Response was: {response.text}. Exception: {e}") return json.loads(json.dumps(response.text)) def get_repo_weight_by_issue(logger,repo_git): diff --git a/augur/tasks/gitlab/gitlab_api_key_handler.py b/augur/tasks/gitlab/gitlab_api_key_handler.py index 7e6b359f5e..50efa446f8 100644 --- a/augur/tasks/gitlab/gitlab_api_key_handler.py +++ b/augur/tasks/gitlab/gitlab_api_key_handler.py @@ -20,7 +20,7 @@ class GitlabApiKeyHandler(): Attributes: session (DatabaseSession): Database connection logger (logging.Logger): Handles all logs - oauth_redis_key (str): The key where the github api keys are cached in redis + oauth_redis_key (str): The key where the gitlab api keys are cached in redis redis_key_list (RedisList): Acts like a python list, and interacts directly with the redis cache config_key (str): The api key that is stored in the users config table key: (List[str]): List of keys retrieve from database or cache @@ -46,13 +46,13 @@ def get_random_key(self): """Retrieves a random key from the list of keys Returns: - A random github api key + A random gitlab api key """ return random.choice(self.keys) def get_config_key(self) -> str: - """Retrieves the users github api key from their config table + """Retrieves the users gitlab api key from their config table Returns: Github API key from config table @@ -60,7 +60,7 @@ def get_config_key(self) -> str: return self.config.get_value("Keys", "gitlab_api_key") def get_api_keys_from_database(self) -> List[str]: - """Retieves all github api keys from database + """Retieves all gitlab api keys from database Note: It retrieves all the keys from the database except the one defined in the users config @@ -131,7 +131,7 @@ def get_api_keys(self) -> List[str]: self.redis_key_list.extend(valid_keys) if not valid_keys: - raise NoValidKeysError("No valid github api keys found in the config or worker oauth table") + raise NoValidKeysError("No valid gitlab api keys found in the config or worker oauth table") # shuffling the keys so not all processes get the same keys in the same order @@ -152,7 +152,7 @@ def is_bad_api_key(self, client: httpx.Client, oauth_key: str) -> bool: Args: client: makes the http requests - oauth_key: github api key that is being tested + oauth_key: gitlab api key that is being tested Returns: True if key is bad. False if the key is good diff --git a/augur/tasks/gitlab/gitlab_paginator.py b/augur/tasks/gitlab/gitlab_paginator.py new file mode 100644 index 0000000000..e7dd36b9e5 --- /dev/null +++ b/augur/tasks/gitlab/gitlab_paginator.py @@ -0,0 +1,536 @@ + +import collections +import httpx +import time +import json +import asyncio +import datetime +import logging + + +from typing import List, Optional, Union, Generator, Tuple +from urllib.parse import urlencode, urlparse, parse_qs, urlunparse +from enum import Enum + + +from augur.tasks.gitlab.gitlab_random_key_auth import GitlabRandomKeyAuth +from augur.tasks.github.util.util import parse_json_response + +class GitlabApiResult(Enum): + """All the different results of querying the Gitlab API.""" + + NEW_RESULT = -1 + SUCCESS = 0 + TIMEOUT = 1 + NO_MORE_ATTEMPTS = 2 + REPO_NOT_FOUND = 3 + SECONDARY_RATE_LIMIT = 4 + RATE_LIMIT_EXCEEDED = 5 + ABUSE_MECHANISM_TRIGGERED = 6 + BAD_CREDENTIALS = 7 + HTML = 8 + EMPTY_STRING = 9 + +def hit_api(key_manager, url: str, logger: logging.Logger, timeout: float = 10, method: str = 'GET', ) -> Optional[httpx.Response]: + """Ping the api and get the data back for the page. + + Returns: + A httpx response that contains the data. None if a timeout occurs + """ + # self.logger.info(f"Hitting endpoint with {method} request: {url}...\n") + + with httpx.Client() as client: + + try: + response = client.request( + method=method, url=url, auth=key_manager, timeout=timeout, follow_redirects=True) + + except TimeoutError: + logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n") + time.sleep(round(timeout)) + return None + except httpx.TimeoutException: + logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n") + time.sleep(round(timeout)) + return None + except httpx.NetworkError: + logger.info(f"Network Error. Sleeping {round(timeout)} seconds and trying again...\n") + time.sleep(round(timeout)) + return None + except httpx.ProtocolError: + logger.info(f"Protocol Error. Sleeping {round(timeout*1.5)} seconds and trying again...\n") + time.sleep(round(timeout*1.5)) + return None + + return response + + +def process_dict_response(logger: logging.Logger, response: httpx.Response, page_data: dict) -> Optional[str]: + """Process dict response from the api and return the status. + + Args: + logger: handles logging + response: used to access the url of the request and the headers + page_data: dict response from the api + + Returns: + A string explaining what happened is returned if what happened is determined, otherwise None is returned. + """ + + status_code = response.status_code + if status_code == 429: + + current_epoch = int(time.time()) + epoch_when_key_resets = int(response.headers["X-RateLimit-Reset"]) + key_reset_time = epoch_when_key_resets - current_epoch + + if key_reset_time < 0: + logger.error(f"Key reset time was less than 0 setting it to 0.\nThe current epoch is {current_epoch} and the epoch that the key resets at is {epoch_when_key_resets}") + key_reset_time = 0 + + logger.info(f"\n\n\nGitlab API rate limit exceeded. Sleeping until the key resets ({key_reset_time} seconds)") + time.sleep(key_reset_time) + + return GitlabApiResult.RATE_LIMIT_EXCEEDED + + + return GitlabApiResult.NEW_RESULT + +class GitlabPaginator(collections.abc.Sequence): + """This class is a sequence that handles paginating through data on the Gitlab API. + + Attributes: + url (str): The url that we are collecting data + key_mangager (GitlabRandomKeyAuth): Custom httpx auth class + that randomizes the github api key a request gets. + This is how the requests are getting their api keys + logger (logging.Logger): Logger that handler printing information to files and stdout + """ + + def __init__(self, url: str, key_manager: GitlabRandomKeyAuth, logger: logging.Logger, from_datetime=None, to_datetime=None): + """Initialize the class GitlabPaginator. + + Args: + url: url that the data is being collected + key_manager: class that randomly selects a Gitlab API key for each request + logger: handles logging + from_datetime: collects data after this datatime (not yet implemented) + to_datetime: collects data before this datatime (not yet implemented) + """ + remove_fields = ["per_page", "page"] + url = clean_url(url, remove_fields) + + # we need to add query params directly to the url, instead of passing the param to the httpx.Client.request + # this is because github will only append specified params to the links in the headers if they are a part + # of the url, and not the params with the request + params = {"per_page": 100} + url = add_query_params(url, params) + + self.url = url + self.key_manager = key_manager + self.logger = logger + + # get the logger from the key manager + # self.logger = key_manager.logger + + self.from_datetime = from_datetime + self.to_datetime = to_datetime + + def __getitem__(self, index: int) -> Optional[dict]: + """Get the value at index of the Gitlab API data returned from the url. + + Args: + index: The index of the desired data from the Gitlab API + + Returns: + The value at the index + """ + + # get the page the item is on + items_page = (index // 100) + 1 + + # create url to query + params = {"page": items_page} + url = add_query_params(self.url, params) + + data, _, result = self.retrieve_data(url) + + if result != GitlabApiResult.SUCCESS: + self.logger.debug("Unable to get item from the api") + return None + + # get the position of data on the page + page_index = index % 100 + + try: + return data[page_index] + except KeyError as e: + raise KeyError("Data does not exists for that index") from e + + def __len__(self): + """Get the length of the Gitlab API data. + + Returns: + The length of the Gitlab API data at the url. + + Examples: + This function is called when len() is called on the GitlabPaginator class for example. + + issues = GitlabPaginator(url, session.oauths, logger) + issue_len = len(issues) + """ + + num_pages = self.get_num_pages() + + self.logger.info(f"Num pages: {num_pages}") + + params = {"page": num_pages} + url = add_query_params(self.url, params) + + # get the amount of data on last page + data, _, result = self.retrieve_data(url) + + if result == GitlabApiResult.SUCCESS: + return (100 * (num_pages -1)) + len(data) + + self.logger.debug("Unable to retrieve data length from api") + return 0 + + def __iter__(self) -> Generator[Optional[dict], None, None]: + """Provide data from Gitlab API via a generator that yields one dict at a time. + + Yields: + A piece of data from the github api as the specified url + """ + data_list, response, result = self.retrieve_data(self.url) + + if result != GitlabApiResult.SUCCESS: + self.logger.debug("Failed to retrieve the data even though 10 attempts were given") + yield None + return + + # yield the first page data + for data in data_list: + yield data + + while 'next' in response.links.keys(): + next_page = response.links['next']['url'] + + # Here we don't need to pass in params with the page, or the default params because the url from the headers already has those values + data_list, response, result = self.retrieve_data(next_page) + + if result != GitlabApiResult.SUCCESS: + self.logger.debug("Failed to retrieve the data even though 10 attempts were given") + return + + for data in data_list: + yield data + + def iter_pages(self) -> Generator[Tuple[Optional[List[dict]], int], None, None]: + """Provide data from Gitlab API via a generator that yields a page of dicts at a time. + + Returns: + A page of data from the Gitlab API at the specified url + """ + # retrieves the data for the given url + data_list, response, result = self.retrieve_data(self.url) + + if result != GitlabApiResult.SUCCESS: + self.logger.debug("Failed to retrieve the data even though 10 attempts were given") + yield None, None + return + + # this retrieves the page for the given url + page_number = get_url_page_number(self.url) + + # yields the first page of data and its page number + yield data_list, page_number + + while 'next' in response.links.keys(): + + # gets the next page from the last responses header + next_page = response.links['next']['url'] + + # Here we don't need to pass in params with the page, or the default params because the url from the headers already has those values + data_list, response, result = self.retrieve_data(next_page) + + if result != GitlabApiResult.SUCCESS: + self.logger.debug(f"Failed to retrieve the data for even though 10 attempts were given. Url: {next_page}") + return + + page_number = get_url_page_number(next_page) + + # if either the data or response is None then yield None and return + if data_list is None or response is None: + return + + # yield the data from the page and its number + yield data_list, page_number + + def retrieve_data(self, url: str) -> Tuple[Optional[List[dict]], Optional[httpx.Response]]: + """Attempt to retrieve data at given url. + + Args: + url: The url to retrieve the data from + + Returns + The response object from hitting the url and the data on the page + """ + timeout = 30 + timeout_count = 0 + num_attempts = 1 + while num_attempts <= 10: + + response = hit_api(self.key_manager, url, self.logger, timeout) + + if response is None: + if timeout_count == 10: + self.logger.error(f"Request timed out 10 times for {url}") + return None, None, GitlabApiResult.TIMEOUT + + timeout = timeout * 1.1 + num_attempts += 1 + continue + + # if api returns a status of 204 No Content then return empty list + if response.status_code == 204: + return [], response, GitlabApiResult.SUCCESS + + + page_data = parse_json_response(self.logger, response) + + + # if the data is a list, then return it and the response + if isinstance(page_data, list) is True: + return page_data, response, GitlabApiResult.SUCCESS + + # if the data is a dict then call process_dict_response, and + if isinstance(page_data, dict) is True: + dict_processing_result = process_dict_response(self.logger, response, page_data) + + if dict_processing_result == GitlabApiResult.NEW_RESULT: + print(f"Encountered new dict response from api on url: {url}. Response: {page_data}") + return None, None, GitlabApiResult.NEW_RESULT + + if dict_processing_result == GitlabApiResult.REPO_NOT_FOUND: + return None, response, GitlabApiResult.REPO_NOT_FOUND + + if dict_processing_result in (GitlabApiResult.SECONDARY_RATE_LIMIT, GitlabApiResult.ABUSE_MECHANISM_TRIGGERED): + continue + + if dict_processing_result == GitlabApiResult.RATE_LIMIT_EXCEEDED: + num_attempts = 0 + continue + + if isinstance(page_data, str) is True: + str_processing_result: Union[str, List[dict]] = self.process_str_response(page_data) + + if isinstance(str_processing_result, list): + return str_processing_result, response, GitlabApiResult.SUCCESS + + num_attempts += 1 + + self.logger.error("Unable to collect data in 10 attempts") + return None, None, GitlabApiResult.NO_MORE_ATTEMPTS + + def get_num_pages(self) -> Optional[int]: + """Get the number of pages of data that a url can paginate through. + + Returns: + The number of pages a url can access + """ + timeout: float = 5 + num_attempts = 0 + while num_attempts < 10: + r = hit_api(self.key_manager, self.url, self.logger, timeout=timeout, method="HEAD") + + if r: + break + + timeout = timeout * 1.2 + else: + raise RuntimeError("Unable to get the number of pages of data in 10 attempts") + + if 'last' not in r.links.keys(): + return 1 + + # get the last url from header + last_page_url = r.links['last']['url'] + + parsed_url = urlparse(last_page_url) + try: + num_pages = int(parse_qs(parsed_url.query)['page'][0]) + except (KeyError, ValueError): + return None + + return num_pages + + def hit_api(self, url, timeout): + + return hit_api(self.key_manager, url, self.logger, timeout) + + +################################################### + + def process_str_response(self, page_data: str) -> Union[str, List[dict]]: + """Process an api response of type string. + + Args: + page_data: the string response from the api that is being processed + + Returns: + html_response, empty_string, and failed_to_parse_jsonif the data is not processable. + Or a list of dicts if the json was parasable + """ + self.logger.info(f"Warning! page_data was string: {page_data}\n") + + if "" in page_data: + self.logger.info("HTML was returned, trying again...\n") + return GitlabApiResult.HTML + + if not page_data: + self.logger.info("Empty string, trying again...\n") + return GitlabApiResult.EMPTY_STRING + + try: + list_of_dict_page_data = json.loads(page_data) + return list_of_dict_page_data + except TypeError: + return "failed_to_parse_json" + + +################################################################################ + +# Url Helper Method to remove query paramaters from the url +def clean_url(url: str, keys: List[str]) -> str: + """Remove query params from url. + + Args: + url: the url that is being modified + keys: the query params that are being removed + + Returns: + A url with the params in keys removed + """ + u = urlparse(url) + query = parse_qs(u.query, keep_blank_values=True) + + for key in keys: + query.pop(key, None) + + u = u._replace(query=urlencode(query, True)) + + return urlunparse(u) + + +def add_query_params(url: str, additional_params: dict) -> str: + """Add query params to a url. + + Args: + url: the url that is being modified + additional_params: key value pairs specififying the paramaters to be added + + Returns: + The url with the key value pairs in additional_params added as query params + """ + url_components = urlparse(url) + original_params = parse_qs(url_components.query) + # Before Python 3.5 you could update original_params with + # additional_params, but here all the variables are immutable. + merged_params = {**original_params, **additional_params} + updated_query = urlencode(merged_params, doseq=True) + # _replace() is how you can create a new NamedTuple with a changed field + return url_components._replace(query=updated_query).geturl() + + + +################################################################################ + + +def get_url_page_number(url: str) -> int: + """Parse the page number from the url. + + Note: + If the url does not contain a page number the function returns 1 + + Args: + url: url to get the page number from + + Returns: + The page number that the url contains + """ + parsed_url = urlparse(url) + try: + # if page is not a url query param then this is page 1 + page_number = int(parse_qs(parsed_url.query)['page'][0]) + + except KeyError: + return 1 + + return page_number + + +def retrieve_dict_from_endpoint(logger, key_auth, url, timeout_wait=10) -> Tuple[Optional[dict], GitlabApiResult]: + timeout = timeout_wait + timeout_count = 0 + num_attempts = 1 + + while num_attempts <= 10: + + response = hit_api(key_auth, url, logger, timeout) + + if response is None: + if timeout_count == 10: + logger.error(f"Request timed out 10 times for {url}") + return None, GitlabApiResult.TIMEOUT + + timeout = timeout * 1.1 + num_attempts += 1 + continue + + + page_data = parse_json_response(logger, response) + + if isinstance(page_data, str): + str_processing_result: Union[str, List[dict]] = process_str_response(logger,page_data) + + if isinstance(str_processing_result, dict): + #return str_processing_result, response, GitlabApiResult.SUCCESS + page_data = str_processing_result + else: + num_attempts += 1 + continue + + # if the data is a list, then return it and the response + if isinstance(page_data, list): + logger.warning("Wrong type returned, trying again...") + logger.info(f"Returned list: {page_data}") + + # if the data is a dict then call process_dict_response, and + elif isinstance(page_data, dict): + dict_processing_result = process_dict_response(logger, response, page_data) + + if dict_processing_result == GitlabApiResult.SUCCESS: + return page_data, dict_processing_result + if dict_processing_result == GitlabApiResult.NEW_RESULT: + logger.info(f"Encountered new dict response from api on url: {url}. Response: {page_data}") + return None, GitlabApiResult.NEW_RESULT + + if dict_processing_result == GitlabApiResult.REPO_NOT_FOUND: + return None, GitlabApiResult.REPO_NOT_FOUND + + if dict_processing_result in (GitlabApiResult.SECONDARY_RATE_LIMIT, GitlabApiResult.ABUSE_MECHANISM_TRIGGERED): + continue + + if dict_processing_result == GitlabApiResult.RATE_LIMIT_EXCEEDED: + num_attempts = 0 + continue + + + + num_attempts += 1 + + logger.error("Unable to collect data in 10 attempts") + return None, GitlabApiResult.NO_MORE_ATTEMPTS + + diff --git a/augur/tasks/gitlab/issues_task.py b/augur/tasks/gitlab/issues_task.py new file mode 100644 index 0000000000..a5a55f353a --- /dev/null +++ b/augur/tasks/gitlab/issues_task.py @@ -0,0 +1,97 @@ +import logging +import traceback + +from augur.tasks.init.celery_app import celery_app as celery +from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask +from augur.tasks.gitlab.gitlab_paginator import GitlabPaginator +from augur.tasks.gitlab.gitlab_task_session import GitlabTaskManifest +from augur.application.db.data_parse import extract_needed_issue_data_from_gitlab_issue +from augur.tasks.github.util.util import get_owner_repo +from augur.application.db.models import Issue, Repo +from augur.application.db.util import execute_session_query + + +@celery.task(base=AugurCoreRepoCollectionTask) +def collect_gitlab_issues(repo_git : str) -> int: + + + logger = logging.getLogger(collect_gitlab_issues.__name__) + with GitlabTaskManifest(logger) as manifest: + + augur_db = manifest.augur_db + + try: + + query = augur_db.session.query(Repo).filter(Repo.repo_git == repo_git) + repo_obj = execute_session_query(query, 'one') + repo_id = repo_obj.repo_id + + owner, repo = get_owner_repo(repo_git) + + issue_data = retrieve_all_gitlab_issue_data(repo_git, logger, manifest.key_auth) + + if issue_data: + total_issues = len(issue_data) + process_issues(issue_data, f"{owner}/{repo}: Gitlab Issue task", repo_id, logger, augur_db) + + return total_issues + else: + logger.info(f"{owner}/{repo} has no issues") + return 0 + except Exception as e: + logger.error(f"Could not collect gitlab issues for repo {repo_git}\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") + return -1 + + + +def retrieve_all_gitlab_issue_data(repo_git, logger, key_auth) -> None: + + owner, repo = get_owner_repo(repo_git) + + logger.info(f"Collecting gitlab issues for {owner}/{repo}") + + url = f"https://gitlab.com/api/v4/projects/{owner}%2f{repo}/issues" + issues = GitlabPaginator(url, key_auth, logger) + + all_data = [] + num_pages = issues.get_num_pages() + for page_data, page in issues.iter_pages(): + + if page_data is None: + return all_data + + if len(page_data) == 0: + logger.debug( + f"{owner}/{repo}: Gitlab Issues Page {page} contains no data...returning") + logger.info(f"{owner}/{repo}: Issues Page {page} of {num_pages}") + return all_data + + logger.info(f"{owner}/{repo}: Gitlab Issues Page {page} of {num_pages}") + + all_data += page_data + + return all_data + +def process_issues(issues, task_name, repo_id, logger, augur_db) -> None: + + # get repo_id or have it passed + tool_source = "Gitlab Issue Task" + tool_version = "2.0" + data_source = "Gitlab API" + + issue_dicts = [] + for issue in issues: + + issue_dicts.append( + extract_needed_issue_data_from_gitlab_issue(issue, repo_id, tool_source, tool_version, data_source) + ) + + if len(issue_dicts) == 0: + print("No gitlab issues found while processing") + return + + logger.info(f"{task_name}: Inserting {len(issue_dicts)} gitlab issues") + issue_natural_keys = ["repo_id", "gh_issue_id"] + issue_string_columns = ["issue_title", "issue_body"] + + augur_db.insert_data(issue_dicts, Issue, issue_natural_keys, string_fields=issue_string_columns) \ No newline at end of file diff --git a/augur/tasks/gitlab/merge_request_task.py b/augur/tasks/gitlab/merge_request_task.py new file mode 100644 index 0000000000..cfe849ba8a --- /dev/null +++ b/augur/tasks/gitlab/merge_request_task.py @@ -0,0 +1,85 @@ +import logging + +from augur.tasks.init.celery_app import celery_app as celery +from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask +from augur.tasks.gitlab.gitlab_paginator import GitlabPaginator +from augur.tasks.gitlab.gitlab_task_session import GitlabTaskManifest +from augur.application.db.data_parse import extract_needed_pr_data_from_gitlab_merge_request +from augur.tasks.github.util.util import get_owner_repo +from augur.application.db.models import PullRequest, Repo + + +@celery.task(base=AugurCoreRepoCollectionTask) +def collect_gitlab_merge_requests(repo_git: str) -> int: + + + logger = logging.getLogger(collect_gitlab_merge_requests.__name__) + + with GitlabTaskManifest(logger) as manifest: + + augur_db = manifest.augur_db + + repo_id = augur_db.session.query(Repo).filter( + Repo.repo_git == repo_git).one().repo_id + + owner, repo = get_owner_repo(repo_git) + mr_data = retrieve_all_mr_data(repo_git, logger, manifest.key_auth) + + if mr_data: + process_merge_requests(mr_data, f"{owner}/{repo}: Mr task", repo_id, logger, augur_db) + + return len(mr_data) + else: + logger.info(f"{owner}/{repo} has no merge requests") + return 0 + + +def retrieve_all_mr_data(repo_git: str, logger, key_auth) -> None: + + owner, repo = get_owner_repo(repo_git) + + logger.info(f"Collecting pull requests for {owner}/{repo}") + + url = f"https://gitlab.com/api/v4/projects/{owner}%2f{repo}/merge_requests" + mrs = GitlabPaginator(url, key_auth, logger) + + all_data = [] + num_pages = mrs.get_num_pages() + for page_data, page in mrs.iter_pages(): + + if page_data is None: + return all_data + + if len(page_data) == 0: + logger.debug( + f"{owner}/{repo} Mrs Page {page} contains no data...returning") + logger.info(f"{owner}/{repo} Prs Page {page} of {num_pages}") + return all_data + + logger.info(f"{owner}/{repo} Mrs Page {page} of {num_pages}") + + all_data += page_data + + return all_data + + +def process_merge_requests(data, task_name, repo_id, logger, augur_db): + + tool_source = "Mr Task" + tool_version = "2.0" + merge_requests = extract_needed_mr_data(data, repo_id, tool_source, tool_version) + + logger.info(f"{task_name}: Inserting mrs of length: {len(merge_requests)}") + pr_natural_keys = ["repo_id", "pr_src_id"] + pr_string_fields = ["pr_src_title", "pr_body"] + pr_return_data = augur_db.insert_data(merge_requests, PullRequest, pr_natural_keys, string_fields=pr_string_fields) + + +def extract_needed_mr_data(mrs, repo_id, tool_source, tool_version): + + data = [] + for mr in mrs: + data.append(extract_needed_pr_data_from_gitlab_merge_request(mr, repo_id, tool_source, tool_version)) + + return data + diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 706541d1c7..a2b06a22a6 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -50,6 +50,9 @@ class CollectionState(Enum): 'augur.tasks.github.pull_requests.commits_model.tasks', 'augur.tasks.github.traffic.tasks'] +gitlab_tasks = ['augur.tasks.gitlab.merge_request_task', + 'augur.tasks.gitlab.issues_task'] + git_tasks = ['augur.tasks.git.facade_tasks', 'augur.tasks.git.dependency_tasks.tasks', 'augur.tasks.git.dependency_libyear_tasks.tasks', @@ -66,7 +69,7 @@ class CollectionState(Enum): frontend_tasks = ['augur.tasks.frontend'] -tasks = start_tasks + github_tasks + git_tasks + materialized_view_tasks + frontend_tasks +tasks = start_tasks + github_tasks + gitlab_tasks + git_tasks + materialized_view_tasks + frontend_tasks if os.environ.get('AUGUR_DOCKER_DEPLOY') != "1": tasks += data_analysis_tasks diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 225f78ffde..b99032bb82 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -24,6 +24,8 @@ from augur.tasks.github.pull_requests.commits_model.tasks import process_pull_request_commits from augur.tasks.git.dependency_tasks.tasks import process_ossf_dependency_metrics from augur.tasks.github.traffic.tasks import collect_github_repo_clones_data +from augur.tasks.gitlab.merge_request_task import collect_gitlab_merge_requests +from augur.tasks.gitlab.issues_task import collect_gitlab_issues from augur.tasks.git.facade_tasks import * from augur.tasks.db.refresh_materialized_views import * # from augur.tasks.data_analysis import * @@ -93,6 +95,17 @@ def primary_repo_collect_phase(repo_git): return repo_task_group +def primary_gitlab_repo_collect_phase(repo_git): + + logger = logging.getLogger(primary_gitlab_repo_collect_phase.__name__) + + jobs = group( + collect_gitlab_merge_requests.si(repo_git), + collect_gitlab_issues.si(repo_git) + ) + + return jobs + #This phase creates the message for secondary collection tasks. #These are less important and have their own worker. @@ -146,20 +159,23 @@ def non_repo_domain_tasks(): def build_primary_repo_collect_request(session,enabled_phase_names, days_until_collect_again = 1): #Add all required tasks to a list and pass it to the CollectionRequest primary_enabled_phases = [] + primary_gitlab_enabled_phases = [] #Primary jobs if prelim_phase.__name__ in enabled_phase_names: primary_enabled_phases.append(prelim_phase) primary_enabled_phases.append(primary_repo_collect_phase) + primary_gitlab_enabled_phases.append(primary_gitlab_repo_collect_phase) #task success is scheduled no matter what the config says. def core_task_success_util_gen(repo_git): return core_task_success_util.si(repo_git) primary_enabled_phases.append(core_task_success_util_gen) + primary_gitlab_enabled_phases.append(core_task_success_util_gen) - primary_request = CollectionRequest("core",primary_enabled_phases,max_repo=40, days_until_collect_again=7) + primary_request = CollectionRequest("core",primary_enabled_phases,max_repo=40, days_until_collect_again=7, gitlab_phases=primary_gitlab_enabled_phases) primary_request.get_valid_repos(session) return primary_request diff --git a/augur/tasks/util/collection_util.py b/augur/tasks/util/collection_util.py index 1d5ddd79c5..47705785e9 100644 --- a/augur/tasks/util/collection_util.py +++ b/augur/tasks/util/collection_util.py @@ -132,9 +132,10 @@ def get_required_conditions_for_ml_repos(allow_collected_before = False, days_un class CollectionRequest: - def __init__(self,name,phases,max_repo = 10,days_until_collect_again = 1): + def __init__(self,name,phases,max_repo = 10,days_until_collect_again = 1, gitlab_phases=None): self.name = name self.phases = phases + self.gitlab_phases = gitlab_phases self.max_repo = max_repo self.days_until_collect_again = days_until_collect_again self.new_status = CollectionState.PENDING.value @@ -603,12 +604,28 @@ def send_messages(self): augur_collection_chain = chain(*augur_collection_sequence) task_id = augur_collection_chain.apply_async().task_id - self.logger.info(f"Setting repo {col_hook.name} status to collecting for repo: {repo_git}") + self.logger.info(f"Setting github repo {col_hook.name} status to collecting for repo: {repo_git}") #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated yield repo_git, task_id, col_hook.name else: - print(f"Unable to start collection for {repo.repo_git}") + if col_hook.gitlab_phases is not None: + + augur_collection_sequence = [] + for job in col_hook.gitlab_phases: + #Add the phase to the sequence in order as a celery task. + #The preliminary task creates the larger task chain + augur_collection_sequence.append(job(repo_git)) + + #augur_collection_sequence.append(core_task_success_util.si(repo_git)) + #Link all phases in a chain and send to celery + augur_collection_chain = chain(*augur_collection_sequence) + task_id = augur_collection_chain.apply_async().task_id + + self.logger.info(f"Setting gitlab repo {col_hook.name} status to collecting for repo: {repo_git}") + + #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated + yield repo_git, task_id, col_hook.name #def start_block_of_repos(logger,session,repo_git_identifiers,phases,repos_type,hook="core"): #