From 56b7c0bfb9bbff94409e3d2890ed60b0f1524862 Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 16 Apr 2024 09:28:53 +0200 Subject: [PATCH 1/2] scripts currently used for ingestion in NESSI/2023.06 --- .../automated_ingestion.py | 178 +++- scripts/automated_ingestion/eessitarball.py | 875 +++++++++++++++--- scripts/automated_ingestion/utils.py | 26 + scripts/ingest-tarball.sh | 129 ++- 4 files changed, 1010 insertions(+), 198 deletions(-) diff --git a/scripts/automated_ingestion/automated_ingestion.py b/scripts/automated_ingestion/automated_ingestion.py index 1596bf3d..92fd0235 100755 --- a/scripts/automated_ingestion/automated_ingestion.py +++ b/scripts/automated_ingestion/automated_ingestion.py @@ -1,24 +1,28 @@ #!/usr/bin/env python3 from eessitarball import EessiTarball +from git import Repo from pid.decorator import pidfile from pid import PidFileError +# from shared_vars import gh_repo_cache import argparse import boto3 -import botocore +# import botocore import configparser import github import logging import os import pid +import re import sys REQUIRED_CONFIG = { - 'secrets': ['aws_secret_access_key', 'aws_access_key_id', 'github_pat'], - 'paths': ['download_dir', 'ingestion_script', 'metadata_file_extension'], - 'aws': ['staging_buckets'], - 'github': ['staging_repo', 'failed_ingestion_issue_body', 'pr_body'], + 'secrets': ['aws_secret_access_key', 'aws_access_key_id', 'github_pat', 'github_user'], + 'paths': ['download_dir', 'ingestion_script', 'metadata_file_extension', 'repo_base_dir'], + 'aws': ['staging_bucket'], + 'github': ['staging_repo', 'failed_ingestion_issue_body', 'pr_body', + 'ingest_staged', 'ingest_pr_opened', 'ingest_approved', 'ingest_rejected', 'ingest_done'], } LOG_LEVELS = { @@ -29,6 +33,28 @@ 'CRITICAL': logging.CRITICAL } +TARBALL_ALL_STATES = ['new', 'staged', 'pr_opened', 'approved', 'rejected', 'ingested'] +TARBALL_END_STATES = ['rejected', 'ingested'] + + +def clone_staging_repo(config): + """Clone a GitHub repository to local disk.""" + # check if repo already exists + staging_repo = config['github']['staging_repo'] + repo_base_dir = config['paths']['repo_base_dir'] + + repo_name = staging_repo.rstrip('/').split('/')[-1] + local_repo_dir = os.path.join(repo_base_dir, repo_name) + if os.path.exists(local_repo_dir): + logging.info(f"directory {repo_base_dir} already contains directory {repo_name}") + print(f"directory {repo_base_dir} already contains directory {repo_name}") + return Repo(local_repo_dir) + + # if not, clone it + logging.info(f"cloning {staging_repo} into {local_repo_dir}") + print(f"cloning {staging_repo} into {local_repo_dir}") + return Repo.clone_from(f'https://github.com/{staging_repo}', local_repo_dir) + def error(msg, code=1): """Print an error and exit.""" @@ -36,19 +62,29 @@ def error(msg, code=1): sys.exit(code) -def find_tarballs(s3, bucket, extension='.tar.gz', metadata_extension='.meta.txt'): - """Return a list of all tarballs in an S3 bucket that have a metadata file with the given extension (and same filename).""" - # TODO: list_objects_v2 only returns up to 1000 objects - s3_objects = s3.list_objects_v2(Bucket=bucket).get('Contents', []) - files = [obj['Key'] for obj in s3_objects] +def fetch_pulls_staging_repo(repo): + """Fetch pull requests from a GitHub repository to local disk.""" + logging.info(f"fetch refs/pull/* for {repo.remote('origin').name} in {repo.working_dir}") + print(f"fetch refs/pull/* for {repo.remote('origin').name} in {repo.working_dir}") + checkout_main_branch(repo) + repo.remote('origin').fetch(refspec='+refs/pull/*:refs/remotes/origin/pull/*') - tarballs = [ - file - for file in files - if file.endswith(extension) - and file + metadata_extension in files - ] - return tarballs + +def find_tarballs(s3, bucket, state): + """ + Return a list of all metadata files representing tarballs in an S3 bucket. + + Note, we don't check if a matching tarball exists. That would require additional + queries to the S3 bucket. The check has to be done by subsequent functions. + """ + # TODO: list_objects_v2 only returns up to 1000 objects + objects = s3.list_objects_v2(Bucket=bucket, Prefix=state) + # make sure the list is not empty + if objects and 'Contents' in objects: + # return all information ... we will use more than just file names / keys + return objects['Contents'] + else: + return [] def parse_config(path): @@ -56,15 +92,15 @@ def parse_config(path): config = configparser.ConfigParser() try: config.read(path) - except: - error(f'Unable to read configuration file {path}!') + except Exception as err: + error(f'Unable to read configuration file {path}! error "{err}"') # Check if all required configuration parameters/sections can be found. for section in REQUIRED_CONFIG.keys(): - if not section in config: + if section not in config: error(f'Missing section "{section}" in configuration file {path}.') for item in REQUIRED_CONFIG[section]: - if not item in config[section]: + if item not in config[section]: error(f'Missing configuration item "{item}" in section "{section}" of configuration file {path}.') return config @@ -76,10 +112,44 @@ def parse_args(): default='automated_ingestion.cfg', dest='config') parser.add_argument('-d', '--debug', help='enable debug mode', action='store_true', dest='debug') parser.add_argument('-l', '--list', help='only list available tarballs', action='store_true', dest='list_only') + parser.add_argument('-p', '--pattern', type=str, help='only process tarballs matching pattern', + dest='pattern') + parser.add_argument('-s', '--state', type=str, help='only process tarballs in given state', + dest='state', choices=TARBALL_ALL_STATES) + parser.add_argument('-v', '--verbose', help='show more information', action='store_true', dest='verbose') args = parser.parse_args() return args +def prepare_env(config): + """Prepare env dictionary to be used for accessing private staging repository.""" + # prepare env with credentials + os.environ['GITHUB_USER'] = config['secrets']['github_user'] + os.environ['GITHUB_TOKEN'] = config['secrets']['github_pat'] + os.environ['GIT_CONFIG_COUNT'] = '1' + os.environ['GIT_CONFIG_KEY_0'] = 'credential.helper' + os.environ['GIT_CONFIG_VALUE_0'] = '!f() { echo "username=${GITHUB_USER}"; echo "password=${GITHUB_TOKEN}"; }; f' + # return env + + +def checkout_main_branch(repo): + """Checkout main branch in local repository.""" + local_repo_dir = repo.working_tree_dir + print(f'\n local repo dir: {local_repo_dir}') + git = repo.git(C=local_repo_dir) + # checkout main branch + chkout_result = git.checkout('main') + print(f'\n checkout: "{chkout_result}"') + + +def update_staging_repo(repo): + """Update a GitHub repository on local disk.""" + logging.info(f"pull updates for {repo.remote('origin').name} in {repo.working_dir}") + print(f"pull updates for {repo.remote('origin').name} in {repo.working_dir}") + checkout_main_branch(repo) + repo.remote('origin').pull() + + @pid.decorator.pidfile('automated_ingestion.pid') def main(): """Main function.""" @@ -89,26 +159,74 @@ def main(): log_format = config['logging'].get('format', '%(levelname)s:%(message)s') log_level = LOG_LEVELS.get(config['logging'].get('level', 'INFO').upper(), logging.WARN) log_level = logging.DEBUG if args.debug else log_level - logging.basicConfig(filename=log_file, format=log_format, level=log_level) + if args.debug: + logging.basicConfig( + format=log_format, + level=log_level, + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ]) + else: + logging.basicConfig(filename=log_file, format=log_format, level=log_level) # TODO: check configuration: secrets, paths, permissions on dirs, etc gh_pat = config['secrets']['github_pat'] gh = github.Github(gh_pat) + # gh_repo_cache = {} + + prepare_env(config) + + # obtain staging repo (only does what needs to be done) + repo = clone_staging_repo(config) + s3 = boto3.client( 's3', aws_access_key_id=config['secrets']['aws_access_key_id'], aws_secret_access_key=config['secrets']['aws_secret_access_key'], + endpoint_url=config['aws']['endpoint_url'], + verify=config['aws']['verify_cert_path'], ) - buckets = [x.strip() for x in config['aws']['staging_buckets'].split(',')] - for bucket in buckets: - tarballs = find_tarballs(s3, bucket) + states = TARBALL_ALL_STATES + if args.state: + states = [args.state] + + for state in states: + print(f"state = {state}") + + update_staging_repo(repo) + fetch_pulls_staging_repo(repo) + + object_list = find_tarballs(s3, config['aws']['staging_bucket'], state) + print(f"number of tarballs in state '{state}': {len(object_list)}") + + metadata_ext = config['paths']['metadata_file_extension'] if args.list_only: - for num, tarball in enumerate(tarballs): - print(f'[{bucket}] {num}: {tarball}') - else: - for tarball in tarballs: - tar = EessiTarball(tarball, config, gh, s3, bucket) + for num, obj in enumerate(object_list): + metadata_file = obj['Key'] + tarball = metadata_file.replace(state, 'tarballs', 1).rstrip(metadata_ext) + if args.pattern and not re.match(args.pattern, tarball): + print(f"tarball {tarball} does not match pattern {args.pattern}; skipping") + continue + print(f'{num} ({state}): {obj["Key"]}') + elif state not in TARBALL_END_STATES: + for num, obj in enumerate(object_list): + metadata_file = obj['Key'] + tarball = metadata_file.replace(state, 'tarballs', 1).rstrip(metadata_ext) + if args.pattern and not re.match(args.pattern, tarball): + print(f"tarball {tarball} does not match pattern {args.pattern}; skipping") + continue + + print(f"init tarball...: {tarball}") + indent = max(len('tarballs')-len(state), 0) + print(f" metadata file: {indent*' '}{metadata_file}") + + tar = EessiTarball(tarball, state, obj, config, gh, s3, repo) + if args.verbose: + tar.display() + print(f"processing tarball (state={tar.state}): {tarball}") tar.run_handler() + print() if __name__ == '__main__': diff --git a/scripts/automated_ingestion/eessitarball.py b/scripts/automated_ingestion/eessitarball.py index 9cac87a5..17bccce4 100644 --- a/scripts/automated_ingestion/eessitarball.py +++ b/scripts/automated_ingestion/eessitarball.py @@ -1,68 +1,229 @@ +# from git import Remote, Repo from utils import send_slack_message, sha256sum from pathlib import PurePosixPath -import boto3 +# import boto3 import github import json import logging import os +import re import subprocess import tarfile +import time + +from datetime import datetime, timezone +# from shared_vars import gh_comment_cache, gh_pr_cache, gh_repo_cache +from utils import get_gh_comment, get_gh_pr, get_gh_repo class EessiTarball: """ - Class that represents an EESSI tarball containing software installations or a compatibility layer, - and which is stored in an S3 bucket. - It has several functions to handle the different states of such a tarball in the actual ingestion process, - for which it interfaces with the S3 bucket, GitHub, and CVMFS. + Class that represents an EESSI tarball containing software installations + or a compatibility layer, and which is stored in an S3 bucket. It has + several functions to handle the different states of such a tarball in the + actual ingestion process, for which it interfaces with the S3 bucket, + GitHub, and CVMFS. """ - def __init__(self, object_name, config, github, s3, bucket): + def __init__(self, tarball_path, tarball_state, object_metadata, config, github, s3, local_repo): """Initialize the tarball object.""" + # init config, github, git staging repo and s3 objects + t1_b = time.time() self.config = config self.github = github - self.git_repo = github.get_repo(config['github']['staging_repo']) - self.metadata_file = object_name + config['paths']['metadata_file_extension'] - self.object = object_name + self.git_repo = get_gh_repo(config['github']['staging_repo'], github) self.s3 = s3 - self.bucket = bucket - self.local_path = os.path.join(config['paths']['download_dir'], os.path.basename(object_name)) - self.local_metadata_path = self.local_path + config['paths']['metadata_file_extension'] - self.url = f'https://{bucket}.s3.amazonaws.com/{object_name}' + self.local_repo = local_repo + + # store some of the object's metadata for later use + self.s3_object_etag = object_metadata['ETag'] + + # store remote path to metadata file and determine remote path to tarball + # path to tarball is structured as follows: + # 'tarballs/EESSI_PILOT_VERSION/LAYER/EESSI_OS_TYPE/EESSI_ARCH_SUBDIR/TIMESTAMP/TARBALL_NAME' + self.remote_tarball_path = tarball_path + # path to metadata file is structured as follows: + # 'tarball_state/EESSI_PILOT_VERSION/LAYER/EESSI_OS_TYPE/EESSI_ARCH_SUBDIR/TIMESTAMP/TARBALL_NAME.meta.txt' + metadata_ext = config['paths']['metadata_file_extension'] + self.remote_metadata_path = tarball_path.replace('tarballs', tarball_state, 1) + metadata_ext + + # set local paths to store metadata file and tarball + self.local_tarball_path = os.path.join(config['paths']['download_dir'], os.path.basename(tarball_path)) + self.local_metadata_path = self.local_tarball_path + metadata_ext + + # init default values for some instance information + self.metadata_raw = '' + self.metadata_json = {} + + self.sw_repo_name = '' + self.sw_repo = None + + self.sw_pr_number = -1 + self.sw_pr = None + + self.sw_pr_comment_id = -1 + self.sw_pr_comment = None + + self.tarball_name = '' + # reference to PR in staging repo + self.tar_pr = None + + # read metadata and init data structures + t2_b = time.time() + self.download() + t2_e = time.time() + if os.path.exists(self.local_metadata_path): + t3_b = time.time() + with open(self.local_metadata_path, 'r') as meta: + self.metadata_raw = meta.read() + self.metadata_json = json.loads(self.metadata_raw) + + self.sw_repo_name = self.metadata_json['link2pr']['repo'] + self.sw_repo = get_gh_repo(self.sw_repo_name, self.github) + + self.sw_pr_number = self.metadata_json['link2pr']['pr'] + self.sw_pr = get_gh_pr(self.sw_pr_number, self.sw_repo) + + if 'pr_comment_id' in self.metadata_json['link2pr']: + self.sw_pr_comment_id = self.metadata_json['link2pr']['pr_comment_id'] + self.sw_pr_comment = get_gh_comment(self.sw_pr_comment_id, self.sw_pr) + else: + logging.warn("should we try to obtain the comment id via scanning all comments or should we wait?") + + self.tarball_name = self.metadata_json['payload']['filename'] + t3_e = time.time() + else: + t3_b = time.time() + logging.warn(f"local metadata file '{self.local_metadata_path}' does not exist") + # TODO should raise an exception + t3_e = time.time() + +# self.url = f'https://{config["aws"]["staging_bucket"]}.s3.amazonaws.com/{object_name}' + # TODO verify if staging bucket and object_name are added correctly + self.bucket = config["aws"]["staging_bucket"] + self.url = f'{config["aws"]["endpoint_url"]}/{self.bucket}/{tarball_path}' self.states = { - 'new': {'handler': self.mark_new_tarball_as_staged, 'next_state': 'staged'}, - 'staged': {'handler': self.make_approval_request, 'next_state': 'approved'}, + 'new': {'handler': self.handle_new_tarball, 'next_state': 'staged'}, + 'staged': {'handler': self.open_approval_request, 'next_state': 'pr_opened'}, + 'pr_opened': {'handler': self.check_pr_status}, 'approved': {'handler': self.ingest, 'next_state': 'ingested'}, - 'ingested': {'handler': self.print_ingested}, - 'rejected': {'handler': self.print_rejected}, - 'unknown': {'handler': self.print_unknown}, + # 'unknown': {'handler': self.print_unknown}, } - # Find the initial state of this tarball. - self.state = self.find_state() + # set the initial state of this tarball. + self.state = tarball_state + logging.info(f"state is {self.state}, tarball is {self.tarball_name}") + t1_e = time.time() + if self.metadata_json: + tarball_size = int(self.metadata_json['payload']['size']) + rate = tarball_size / (t2_e-t2_b) + else: + tarball_size = -1 + rate = 0.0 + logging.info("timings (EessiTarball::__init__)") + logging.info(f" download.....: {t2_e-t2_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + logging.info(f" init metadata: {t3_e-t3_b:.2f} seconds") + logging.info(f" total........: {t1_e-t1_b:.2f} seconds") + print("timings (EessiTarball::__init__)") + print(f" download.....: {t2_e-t2_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + print(f" init metadata: {t3_e-t3_b:.2f} seconds") + print(f" total........: {t1_e-t1_b:.2f} seconds") + + def check_pr_status(self): + """ + Check status of pull request on GitHub (merged -> approved; closed -> rejected). + """ + t1_b = time.time() + print(f">> check_pr_status(): {self.remote_tarball_path}") + + filename = os.path.basename(self.remote_tarball_path) + # TODO remove '_{next_state}' suffix + git_branch = filename + + logging.info(f"get approval pr for {self.remote_tarball_path}") + print(f"get approval pr for {self.remote_tarball_path}") + t2a_b = time.time() + pr = self.get_approval_pr(update=True) + t2a_e = time.time() + + if pr: + logging.info(f'PR {pr.number} found for {self.remote_tarball_path}') + print(f' PR {pr.number} found for {self.remote_tarball_path}') - def download(self, force=False): + t2b_b = time.time() + if pr.state == 'open': + # The PR is still open, so it hasn't been reviewed yet: nothing to do. + logging.info(f'PR {pr.number} is still open, skipping this tarball...') + print(f' PR {pr.number} is still open, skipping this tarball...') + elif pr.state == 'closed' and not pr.merged: + # The PR was closed but not merged, i.e. it was rejected for ingestion. + logging.info(f'PR {pr.number} was rejected') + print(f' PR {pr.number} was rejected') + # if PR was closed the changes in PR branch are not merged, hence the old_state + # is the state when the branch was created + self.mark_new_state('rejected', old_state='staged') + else: + # The PR was closed and merged, i.e. it was approved for ingestion. + logging.info(f'PR {pr.number} was approved') + print(f' PR {pr.number} was approved') + self.mark_new_state('approved') + t2b_e = time.time() + else: + # There is a branch, but no PR for this tarball. + # This is weird, so let's remove the branch and reprocess the tarball. + logging.info(f'Tarball {self.remote_tarball_path} has a branch, but no PR.') + logging.info('Removing existing branch...') + print(f' Tarball {self.remote_tarball_path} has a branch, but no PR.') + print(' Removing existing branch...') + + t2b_b = time.time() + ref = self.git_repo.get_git_ref(f'heads/{git_branch}') + ref.delete() + + # move metadata file back to staged (only needed for S3, branch on github has been deleted) + if self.s3_move_metadata_file('staged'): + self.state = 'staged' + else: + print(f"something went wrong when moving metadata file from '{self.state}' to 'staged'") + # TODO create an issue? + t2b_e = time.time() + + t1_e = time.time() + logging.info("timings (EessiTarball::check_pr_status)") + logging.info(f" obtain pr instance: {t2a_e-t2a_b:.2f} seconds") + logging.info(f" process pr state..: {t2b_e-t2b_b:.2f} seconds") + logging.info(f" total.............: {t1_e-t1_b:.2f} seconds") + print("timings (EessiTarball::check_pr_status)") + print(f" obtain pr instance: {t2a_e-t2a_b:.2f} seconds") + print(f" process pr state..: {t2b_e-t2b_b:.2f} seconds") + print(f" total.............: {t1_e-t1_b:.2f} seconds") + + def download(self, force=False, metadata_force=False, tarball_force=False): """ Download this tarball and its corresponding metadata file, if this hasn't been already done. """ - if force or not os.path.exists(self.local_path): + bucket = self.config['aws']['staging_bucket'] + + if force or tarball_force or not os.path.exists(self.local_tarball_path): try: - self.s3.download_file(self.bucket, self.object, self.local_path) - except: - logging.error( - f'Failed to download tarball {self.object} from {self.bucket} to {self.local_path}.' + self.s3.download_file(bucket, self.remote_tarball_path, self.local_tarball_path) + except Exception: + logging.warn( + f'Failed to download tarball {self.remote_tarball_path} from {bucket} to {self.local_tarball_path}.' ) - self.local_path = None - if force or not os.path.exists(self.local_metadata_path): + self.local_tarball_path = None + + if force or metadata_force or not os.path.exists(self.local_metadata_path): try: - self.s3.download_file(self.bucket, self.metadata_file, self.local_metadata_path) - except: - logging.error( - f'Failed to download metadata file {self.metadata_file} from {self.bucket} to {self.local_metadata_path}.' - ) + self.s3.download_file(bucket, self.remote_metadata_path, self.local_metadata_path) + except Exception: + logging.warn(f'Failed to download metadata file {self.remote_metadata_path} ' + f'from {bucket} to {self.local_metadata_path}.') self.local_metadata_path = None def find_state(self): @@ -70,14 +231,14 @@ def find_state(self): for state in list(self.states.keys()): # iterate through the state dirs and try to find the tarball's metadata file try: - self.git_repo.get_contents(state + '/' + self.metadata_file) + self.git_repo.get_contents(state + '/' + self.remote_metadata_path) return state except github.UnknownObjectException: # no metadata file found in this state's directory, so keep searching... continue except github.GithubException: # if there was some other (e.g. connection) issue, abort the search for this tarball - logging.warning(f'Unable to determine the state of {self.object}!') + logging.warning(f'Unable to determine the state of {self.remote_tarball_path}!') return "unknown" else: # if no state was found, we assume this is a new tarball that was ingested to the bucket @@ -85,7 +246,8 @@ def find_state(self): def get_contents_overview(self): """Return an overview of what is included in the tarball.""" - tar = tarfile.open(self.local_path, 'r') + logging.debug(f'get contents overview for "{self.local_tarball_path}"') + tar = tarfile.open(self.local_tarball_path, 'r') members = tar.getmembers() tar_num_members = len(members) paths = sorted([m.path for m in members]) @@ -93,7 +255,6 @@ def get_contents_overview(self): if tar_num_members < 100: tar_members_desc = 'Full listing of the contents of the tarball:' members_list = paths - else: tar_members_desc = 'Summarized overview of the contents of the tarball:' prefix = os.path.commonprefix(paths) @@ -111,8 +272,8 @@ def get_contents_overview(self): other = [ # anything that is not in /software nor /modules m.path for m in members - if not PurePosixPath(prefix).joinpath('software') in PurePosixPath(m.path).parents - and not PurePosixPath(prefix).joinpath('modules') in PurePosixPath(m.path).parents + if not PurePosixPath(prefix).joinpath('software') in PurePosixPath(m.path).parents and + not PurePosixPath(prefix).joinpath('modules') in PurePosixPath(m.path).parents # if not fnmatch.fnmatch(m.path, os.path.join(prefix, 'software', '*')) # and not fnmatch.fnmatch(m.path, os.path.join(prefix, 'modules', '*')) ] @@ -140,184 +301,579 @@ def next_state(self, state): def run_handler(self): """Process this tarball by running the process function that corresponds to the current state.""" if not self.state: - self.state = self.find_state() + logging.warning(f"tarball {self.remote_tarball_path} has no state set; skipping...") + return + # self.state = self.find_state() handler = self.states[self.state]['handler'] handler() def verify_checksum(self): """Verify the checksum of the downloaded tarball with the one in its metadata file.""" - local_sha256 = sha256sum(self.local_path) - meta_sha256 = None - with open(self.local_metadata_path, 'r') as meta: - meta_sha256 = json.load(meta)['payload']['sha256sum'] + local_sha256 = sha256sum(self.local_tarball_path) + meta_sha256 = self.metadata_json['payload']['sha256sum'] logging.debug(f'Checksum of downloaded tarball: {local_sha256}') logging.debug(f'Checksum stored in metadata file: {meta_sha256}') return local_sha256 == meta_sha256 def ingest(self): """Process a tarball that is ready to be ingested by running the ingestion script.""" - #TODO: check if there is an open issue for this tarball, and if there is, skip it. - logging.info(f'Tarball {self.object} is ready to be ingested.') + # TODO: check if there is an open issue for this tarball, and if there is, skip it. + t1_b = time.time() + logging.info(f'Tarball {self.remote_tarball_path} is ready to be ingested.') + self.download() + logging.info('Verifying its checksum...') + if 'payload' in self.metadata_json and 'size' in self.metadata_json['payload']: + size_str = f"{int(self.metadata_json['payload']['size']) / 1000000:.3f} MB" + else: + size_str = "N/A" + print(f' verifying tarball checksum (size {size_str})...') + t4_b = time.time() if not self.verify_checksum(): logging.error('Checksum of downloaded tarball does not match the one in its metadata file!') - # Open issue? + # TODO Open issue? + print('Checksum of downloaded tarball does not match the one in its metadata file!') return else: - logging.debug(f'Checksum of {self.object} matches the one in its metadata file.') + logging.debug(f'Checksum of {self.remote_tarball_path} matches the one in its metadata file.') + t4_e = time.time() + script = self.config['paths']['ingestion_script'] sudo = ['sudo'] if self.config['cvmfs'].getboolean('ingest_as_root', True) else [] - logging.info(f'Running the ingestion script for {self.object}...') - ingest_cmd = subprocess.run( - sudo + [script, self.local_path], + logging.info(f'Running the ingestion script for {self.remote_tarball_path}...') + + # TODO add additional parameters for more info in cvmfs_server tag history + sw_branch = self.sw_pr.base.ref + uploader = self.metadata_json['uploader']['username'] + + ingest_cmd = sudo + [script, self.local_tarball_path, self.sw_repo_name, sw_branch, self.sw_pr_number, uploader] + logging.info(f'ingesting with /{" ".join(ingest_cmd)}/') + print(f' ingesting tarball (size {size_str}) with "{" ".join(ingest_cmd)}"') + t2_b = time.time() + ingest_run = subprocess.run( + ingest_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if ingest_cmd.returncode == 0: - next_state = self.next_state(self.state) - self.move_metadata_file(self.state, next_state) + t2_e = time.time() + if ingest_run.returncode == 0: + t3_b = time.time() if self.config.has_section('slack') and self.config['slack'].getboolean('ingestion_notification', False): send_slack_message( self.config['secrets']['slack_webhook'], - self.config['slack']['ingestion_message'].format(tarball=os.path.basename(self.object)) + self.config['slack']['ingestion_message'].format(tarball=os.path.basename(self.remote_tarball_path)) ) + logging.info(f'ingesting stdout: /{ingest_run.stdout.decode("UTF-8")}/') + logging.info(f'ingesting stderr: /{ingest_run.stderr.decode("UTF-8")}/') + + # update comment in software-layer repo: ingested + self.update_sw_repo_comment('ingest_done', prefix=self.determine_tarball_prefix()) + + next_state = self.next_state(self.state) + + # move metadata file to next_state (ingested) + self.git_move_metadata_file(self.state, next_state) + if self.s3_move_metadata_file(next_state): + self.state = next_state + else: + logging.warning(f"something went wrong when moving metadata file from '{self.state}' to {next_state}") + print(f"something went wrong when moving metadata file from '{self.state}' to {next_state}") + # TODO create an issue? + t3_e = time.time() else: - issue_title = f'Failed to ingest {self.object}' + t3_b = time.time() + issue_title = f'Failed to ingest {self.remote_tarball_path}' issue_body = self.config['github']['failed_ingestion_issue_body'].format( - command=' '.join(ingest_cmd.args), - tarball=self.object, - return_code=ingest_cmd.returncode, - stdout=ingest_cmd.stdout.decode('UTF-8'), - stderr=ingest_cmd.stderr.decode('UTF-8'), + command=' '.join(ingest_run.args), + tarball=self.remote_tarball_path, + return_code=ingest_run.returncode, + stdout=ingest_run.stdout.decode('UTF-8'), + stderr=ingest_run.stderr.decode('UTF-8'), ) if self.issue_exists(issue_title, state='open'): - logging.info(f'Failed to ingest {self.object}, but an open issue already exists, skipping...') + logging.info(f'Failed to ingest {self.remote_tarball_path}, ' + 'but an open issue already exists, skipping...') else: self.git_repo.create_issue(title=issue_title, body=issue_body) + t3_e = time.time() + t1_e = time.time() + if self.metadata_json: + tarball_size = int(self.metadata_json['payload']['size']) + rate = tarball_size / (t2_e-t2_b) + rate_v = tarball_size / (t4_e-t4_b) + else: + tarball_size = -1 + rate = 0.0 + logging.info("timings (EessiTarball::ingest)") + logging.info(f" verify checksum.......: {t4_e-t4_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate_v/1000000:.3f} MB/s") + logging.info(f" run ingest script.....: {t2_e-t2_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + print("timings (EessiTarball::ingest)") + print(f" verify checksum.......: {t4_e-t4_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate_v/1000000:.3f} MB/s") + print(f" run ingest script.....: {t2_e-t2_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + if ingest_run.returncode == 0: + logging.info(f" upd PR + move metadata: {t3_e-t3_b:.2f} seconds") + print(f" upd PR + move metadata: {t3_e-t3_b:.2f} seconds") + else: + logging.info(f" open issue............: {t3_e-t3_b:.2f} seconds") + print(f" open issue............: {t3_e-t3_b:.2f} seconds") + logging.info(f" total.................: {t1_e-t1_b:.2f} seconds") + print(f" total.................: {t1_e-t1_b:.2f} seconds") + + def s3_move_metadata_file(self, new_state): + """Moves a remote metadata file from its current state directory to the new_state directory.""" + # copy metadata file in S3 bucket from directory {self.state} to directory {new_state} and + # delete metadata file in S3 bucket in directory {self.state} + original_path = self.remote_metadata_path + new_state_path = self.remote_metadata_path.replace(self.state, new_state, 1) + bucket = self.config['aws']['staging_bucket'] + + logging.info(f"copying metadata file from {original_path} to {new_state_path} in bucket {bucket}") + print(f"copying metadata file from {original_path} to {new_state_path} in bucket {bucket}") + + # print('\nSKIPPING move in S3 bucket') + + try: + response = self.s3.copy_object( + Bucket=bucket, + CopySource=f'{bucket}/{original_path}', + Key=new_state_path, + ) + except Exception as err: + logging.warning(f"failed to copy metadata file from {original_path} to {new_state_path} with error {err}") + print(f"failed to copy metadata file from {original_path} to {new_state_path} with error {err}") + return False + + # verify that object was copied + if response is None or 'CopyObjectResult' not in response: + logging.warn(f"copying metatdata file returned no response data; not deleting original at {original_path}") + print(f"copying metatdata file returned no response data; not deleting original at {original_path}") + return False - def print_ingested(self): - """Process a tarball that has already been ingested.""" - logging.info(f'{self.object} has already been ingested, skipping...') + etag_org = self.s3_object_etag + etag_new = response['CopyObjectResult']['ETag'] + if etag_org == etag_new: + logging.info(f"copying of metadata file {original_path} to {new_state_path} succeeded; deleting original") + print(f"copying of metadata file {original_path} to {new_state_path} succeeded; deleting original") + self.s3.delete_object( + Bucket=bucket, + Key=original_path, + ) + else: + logging.warning(f"ETags of metatdata files differ (original={etag_org}, copied={etag_new});" + f"\n not deleting original metadata file at {original_path}") + print(f"ETags of metatdata files differ (original={etag_org}, copied={etag_new});" + f"\n not deleting original metadata file at {original_path}") + return False + self.remote_metadata_path = new_state_path + return True - def mark_new_tarball_as_staged(self): + def mark_new_state(self, new_state, old_state=None): + """ + Mark new state of a tarball. + """ + t1_b = time.time() + # update comment in software-layer repo + # TODO ensure that setting 'ingest_{new_state}' exists or use default + t2_b = time.time() + self.update_sw_repo_comment(f'ingest_{new_state}') + t2_e = time.time() + + # move metadata file to {new_state} top level dir + t3_b = time.time() + if not old_state: + old_state = self.state + # print(f"SKIP moving metadata file from '{old_state}' to '{new_state}'") + self.git_move_metadata_file(old_state, new_state) + if self.s3_move_metadata_file(new_state): + self.state = new_state + else: + logging.warning(f"something went wrong when moving metadata file from '{old_state}' to '{new_state}'") + print(f"something went wrong when moving metadata file from '{old_state}' to '{new_state}'") + # TODO create an issue? + t3_e = time.time() + t1_e = time.time() + logging.info(f"timings (EessiTarball::mark_new_state(new_state={new_state})") + logging.info(f" update PR comment..: {t2_e-t2_b:.2f} seconds") + logging.info(f" move metadata files: {t3_e-t3_b:.2f} seconds") + logging.info(f" total........: {t1_e-t1_b:.2f} seconds") + print(f"timings (EessiTarball::mark_new_state(new_state={new_state})") + print(f" update PR comment..: {t2_e-t2_b:.2f} seconds") + print(f" move metadata files: {t3_e-t3_b:.2f} seconds") + print(f" total........: {t1_e-t1_b:.2f} seconds") + + def create_branch_for_tarball(self): + """Creates a branch in the local repository for the tarball.""" + t2_b = time.time() + + # use filename as branch name + filename = os.path.basename(self.remote_tarball_path) + + local_repo_dir = self.local_repo.working_tree_dir + print(f'\n local repo dir: {local_repo_dir}') + git = self.local_repo.git(C=local_repo_dir) + + nwbr_result = git.branch(filename, 'origin/main') + print(f'\n new branch: "{nwbr_result}"') + + t2_e = time.time() + logging.info(f" create branch.......: {t2_e-t2_b:.2f} seconds") + print(f" create branch.......: {t2_e-t2_b:.2f} seconds") + + return filename + + def create_file_in_local_repo(self, file_path, file_contents, commit_msg, branch='main'): + """Creates a file in the local repo in the given branch.""" + t1_b = time.time() + + local_repo_dir = self.local_repo.working_tree_dir + git = self.local_repo.git(C=local_repo_dir) + chkout_result = git.checkout(branch) + + full_path = os.path.join(local_repo_dir, file_path) + directory = os.path.dirname(full_path) + try: + os.makedirs(directory, exist_ok=True) + with open(full_path, 'w') as local_metadata_file: + local_metadata_file.writelines(file_contents) + except Exception as err: + print(f'caught exception when trying to create/write file: {err}') + + # add + commit + add_result = git.add(full_path) + commit_result = git.commit("-m", commit_msg) + + t1_e = time.time() + logging.info(f" create file in local repo: {t1_e-t1_b:.2f} seconds") + print(f" create file in local repo: {t1_e-t1_b:.2f} seconds") + + def push_branch_to_remote_repo(self, pr_branch): + """Push branch to remote repo.""" + t1_b = time.time() + + local_repo_dir = self.local_repo.working_tree_dir + git = self.local_repo.git(C=local_repo_dir) + push_result = git.push("origin", pr_branch) + print(f' git.push -> "{push_result}"') + + t1_e = time.time() + logging.info(f" create file in local repo: {t1_e-t1_b:.2f} seconds") + print(f" create file in local repo: {t1_e-t1_b:.2f} seconds") + + def handle_new_tarball(self): """Process a new tarball that was added to the staging bucket.""" + t1_b = time.time() + next_state = self.next_state(self.state) - logging.info(f'Found new tarball {self.object}, downloading it...') + logging.info(f'Found new tarball {self.remote_tarball_path}, downloading it...') + print(f' Found new tarball {self.remote_tarball_path}, downloading it...') + # Download the tarball and its metadata file. - # Use force as it may be a new attempt for an existing tarball that failed before. - self.download(force=True) - if not self.local_path or not self.local_metadata_path: - logging.warn('Skipping this tarball...') + self.download() + if not self.local_tarball_path or not self.local_metadata_path: + logging.info('Skipping this tarball...') + print(' Skipping this tarball...') return - contents = '' - with open(self.local_metadata_path, 'r') as meta: - contents = meta.read() + pr_branch = self.create_branch_for_tarball() logging.info(f'Adding tarball\'s metadata to the "{next_state}" folder of the git repository.') - file_path_staged = next_state + '/' + self.metadata_file - new_file = self.git_repo.create_file(file_path_staged, 'new tarball', contents, branch='main') + print(f' Adding tarball\'s metadata to the "{next_state}" folder of the git repository.') + file_path_staged = self.remote_metadata_path.replace(self.state, next_state, 1) + t2_b = time.time() + # replace next line by creating file locally in pr_branch and pushing branch to remote repo + # self.git_repo.create_file(file_path_staged, 'new tarball staged', self.metadata_raw, branch=pr_branch) + print(f' file_path_staged = "{file_path_staged}"') + self.create_file_in_local_repo(file_path_staged, self.metadata_raw, 'new tarball staged', pr_branch) + self.push_branch_to_remote_repo(pr_branch) + + t2_e = time.time() - self.state = next_state - self.run_handler() + # move metadata file to staged + t3_b = time.time() + logging.info(f'Moving tarball\'s metadata to the "{next_state}" folder of the S3 bucket.') + print(f' Moving tarball\'s metadata to the "{next_state}" folder of the S3 bucket.') + if self.s3_move_metadata_file(next_state): + self.state = next_state + else: + logging.warn(f"something went wrong when moving metadata file from '{self.state}' to '{next_state}'") + print(f" something went wrong when moving metadata file from '{self.state}' to '{next_state}'") + # TODO create an issue? + t3_e = time.time() - def print_rejected(self): - """Process a (rejected) tarball for which the corresponding PR has been closed witout merging.""" - logging.info("This tarball was rejected, so we're skipping it.") - # Do we want to delete rejected tarballs at some point? + t4_b = time.time() + self.update_sw_repo_comment('ingest_staged') + t4_e = time.time() + t1_e = time.time() + logging.info("timings (EessiTarball::handle_new_tarball)") + logging.info(f" add metadata file to GitHub: {t2_e-t2_b:.2f} seconds") + logging.info(f" move metadata file on S3...: {t3_e-t3_b:.2f} seconds") + logging.info(f" update PR comment..........: {t4_e-t4_b:.2f} seconds") + logging.info(f" total........: {t1_e-t1_b:.2f} seconds") + print("timings (EessiTarball::handle_new_tarball)") + print(f" add metadata file to GitHub: {t2_e-t2_b:.2f} seconds") + print(f" move metadata file on S3...: {t3_e-t3_b:.2f} seconds") + print(f" update PR comment..........: {t4_e-t4_b:.2f} seconds") + print(f" total........: {t1_e-t1_b:.2f} seconds") def print_unknown(self): """Process a tarball which has an unknown state.""" logging.info("The state of this tarball could not be determined, so we're skipping it.") - def make_approval_request(self): + def find_comment(self, pull_request, tarball_name): + """Find comment in pull request that contains name of a tarball. + Args: + pull_request (object): PullRequest object (PyGithub) representing + a pull request. + tarball_name (string): Name of tarball used to identify a comment. + Returns: + issue_comment (object): IssueComment object (PyGithub) representing + an issue comment. + """ + comments = pull_request.get_issue_comments() + for comment in comments: + cms = f".*{tarball_name}.*" + comment_match = re.search(cms, comment.body) + if comment_match: + return comment + return None + + def get_approval_pr(self, update=False): + """Find approval PR if any exists.""" + if self.tar_pr and not update: + return self.tar_pr + + filename = os.path.basename(self.remote_tarball_path) + # TODO remove '_approved' + pr_branch = filename + + all_refs = self.local_repo.remote().refs + + # obtain commit for pr_branch + # iterate over all_refs, + # keep those where remote_head matches pr_branch, and + # use only the first commit + commits = [ref.commit for ref in all_refs if ref.remote_head == pr_branch] + if not commits: + return None + commit = commits[0] + + # obtain pr for pr_branch + # iterate over all_refs (again), + # keep those where the commit equals the one of the branch and if 'pull' is in the ref.remote_head + # only use middle element of remote_head which is something like 'pull/NUMBER/head' + pulls = [ref.remote_head.split('/')[1] + for ref in all_refs if ref.commit == commit and 'pull' in ref.remote_head] + if not pulls: + return None + + # obtain pr instance from GitHub (also contains status information) + gh_pr = self.git_repo.get_pull(int(pulls[0])) + + self.tar_pr = gh_pr + return gh_pr + + def get_pr_url(self): + """Return URL to approval PR.""" + self.tar_pr = self.get_approval_pr() + + if self.tar_pr: + return self.tar_pr.html_url + else: + return None + + def determine_sw_repo_pr_comment(self, tarball_name): + """Determine PR comment.""" + if self.sw_pr_comment: + return self.sw_pr_comment + else: + return self.find_comment(self.sw_pr, tarball_name) + + def determine_tarball_prefix(self): + """Determine common prefix of tarball.""" + tar = tarfile.open(self.local_tarball_path, 'r') + members = tar.getmembers() + paths = sorted([m.path for m in members]) + + return os.path.commonprefix(paths) + + def update_sw_repo_comment(self, comment_template, prefix=None): + """Update comment in PR of software-layer repository. + """ + # obtain issue_comment (use previously stored value in self or determine via tarball_name) + issue_comment = self.determine_sw_repo_pr_comment(self.tarball_name) + + if issue_comment: + comment_update = self.config['github'][comment_template].format( + date=datetime.now(timezone.utc).strftime('%b %d %X %Z %Y'), + tarball=self.tarball_name, + approval_pr=self.get_pr_url(), + prefix=prefix, + ) + logging.info(f'Comment found (id: {issue_comment.id}); ' + f'adding row "{comment_update}"') + # get current data/time + issue_comment.edit(issue_comment.body + "\n" + comment_update) + else: + logging.info('Failed to find a comment for tarball ' + f'{self.tarball_name} in pull request ' + f'#{self.sw_pr_number} in repo {self.sw_repo_name}.') + + def open_approval_request(self): """Process a staged tarball by opening a pull request for ingestion approval.""" + t1_b = time.time() + next_state = self.next_state(self.state) - file_path_staged = self.state + '/' + self.metadata_file - file_path_to_ingest = next_state + '/' + self.metadata_file - filename = os.path.basename(self.object) - tarball_metadata = self.git_repo.get_contents(file_path_staged) - git_branch = filename + '_' + next_state - self.download() + t3_b = time.time() + filename = os.path.basename(self.remote_tarball_path) + # move metadata file to next_state top level dir in branch named {filename} + self.git_move_metadata_file(self.state, next_state, branch=filename) + + # Move the file to the top-level directory of the next stage in the S3 bucket + self.s3_move_metadata_file(next_state) + t3_e = time.time() - main_branch = self.git_repo.get_branch('main') - if git_branch in [branch.name for branch in self.git_repo.get_branches()]: - # Existing branch found for this tarball, so we've run this step before. - # Try to find out if there's already a PR as well... - logging.info("Branch already exists for " + self.object) - # Filtering with only head= returns all prs if there's no match, so double-check - find_pr = [pr for pr in self.git_repo.get_pulls(head=git_branch, state='all') if pr.head.ref == git_branch] - logging.debug('Found PRs: ' + str(find_pr)) - if find_pr: - # So, we have a branch and a PR for this tarball (if there are more, pick the first one)... - pr = find_pr.pop(0) - logging.info(f'PR {pr.number} found for {self.object}') - if pr.state == 'open': - # The PR is still open, so it hasn't been reviewed yet: ignore this tarball. - logging.info('PR is still open, skipping this tarball...') - return - elif pr.state == 'closed' and not pr.merged: - # The PR was closed but not merged, i.e. it was rejected for ingestion. - logging.info('PR was rejected') - self.reject() - return - else: - logging.warn(f'Warning, tarball {self.object} is in a weird state:') - logging.warn(f'Branch: {git_branch}\nPR: {pr}\nPR state: {pr.state}\nPR merged: {pr.merged}') - else: - # There is a branch, but no PR for this tarball. - # This is weird, so let's remove the branch and reprocess the tarball. - logging.info(f'Tarball {self.object} has a branch, but no PR.') - logging.info(f'Removing existing branch...') - ref = self.git_repo.get_git_ref(f'heads/{git_branch}') - ref.delete() - logging.info(f'Making pull request to get ingestion approval for {self.object}.') - # Create a new branch - self.git_repo.create_git_ref(ref='refs/heads/' + git_branch, sha=main_branch.commit.sha) - # Move the file to the directory of the next stage in this branch - self.move_metadata_file(self.state, next_state, branch=git_branch) - # Get metadata file contents - metadata = '' - with open(self.local_metadata_path, 'r') as meta: - metadata = meta.read() # Try to get the tarball contents and open a PR to get approval for the ingestion try: + t4_b = time.time() + exception = False tarball_contents = self.get_contents_overview() pr_body = self.config['github']['pr_body'].format( - tar_overview=self.get_contents_overview(), - metadata=metadata, + tar_overview=tarball_contents, + metadata=self.metadata_raw, ) - self.git_repo.create_pull(title='Ingest ' + filename, body=pr_body, head=git_branch, base='main') + t4_i = time.time() + except Exception as err: + exception = True + print(f'caught an exception "{err}"') + t4_e = time.time() + + try: + t5_b = time.time() + if not exception: + self.tar_pr = self.git_repo.create_pull(title='Ingest ' + filename, + body=pr_body, + head=filename, + base='main') + t5_i = time.time() + + # update comment in pull request of softwares-layer repo + self.update_sw_repo_comment('ingest_pr_opened') + except Exception as err: - issue_title = f'Failed to get contents of {self.object}' + exception = True + print(f'caught an exception "{err}"') + issue_title = f'Failed to get contents of {self.remote_tarball_path}' issue_body = self.config['github']['failed_tarball_overview_issue_body'].format( - tarball=self.object, + tarball=self.remote_tarball_path, error=err ) if len([i for i in self.git_repo.get_issues(state='open') if i.title == issue_title]) == 0: self.git_repo.create_issue(title=issue_title, body=issue_body) else: - logging.info(f'Failed to create tarball overview, but an issue already exists.') + logging.info('Failed to create tarball overview, but an issue already exists.') + print('Failed to create tarball overview, but an issue already exists.') + t5_e = time.time() + t1_e = time.time() + logging.info("timings (EessiTarball::open_approval_request)") + logging.info(f" move metadata files.: {t3_e-t3_b:.2f} seconds") + print("timings (EessiTarball::open_approval_request)") + print(f" move metadata files.: {t3_e-t3_b:.2f} seconds") + if not exception: + if self.metadata_json: + tarball_size = int(self.metadata_json['payload']['size']) + rate = tarball_size / (t4_i-t4_b) + else: + tarball_size = -1 + rate = 0.0 + logging.info(f" analyse tarball.....: {t4_e-t4_b:.2f} seconds") + logging.info(f" - analyse tarball...: {t4_i-t4_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + logging.info(f" created pull request: {t5_e-t5_b:.2f} seconds") + logging.info(f" - open pull request.: {t5_i-t5_b:.2f} seconds") + logging.info(f" - update PR comment.: {t5_e-t5_i:.2f} seconds") + print(f" analyse tarball.....: {t4_e-t4_b:.2f} seconds") + print(f" - analyse tarball...: {t4_i-t4_b:.2f} seconds, " + f"size {tarball_size/1000000:.3f} MB, rate {rate/1000000:.3f} MB/s") + print(f" created pull request: {t5_e-t5_b:.2f} seconds") + print(f" - open pull request.: {t5_i-t5_b:.2f} seconds") + print(f" - update PR comment.: {t5_e-t5_i:.2f} seconds") + else: + logging.info(f" exception -> issue..: {t5_e-t4_b:.2f} seconds") + print(f" exception -> issue..: {t5_e-t4_b:.2f} seconds") + print(f" total...............: {t1_e-t1_b:.2f} seconds") + logging.info(f" total...............: {t1_e-t1_b:.2f} seconds") - def move_metadata_file(self, old_state, new_state, branch='main'): + def git_move_metadata_file(self, old_state, new_state, branch='main'): """Move the metadata file of a tarball from an old state's directory to a new state's directory.""" - file_path_old = old_state + '/' + self.metadata_file - file_path_new = new_state + '/' + self.metadata_file - logging.debug(f'Moving metadata file {self.metadata_file} from {file_path_old} to {file_path_new}.') - tarball_metadata = self.git_repo.get_contents(file_path_old) - # Remove the metadata file from the old state's directory... - self.git_repo.delete_file(file_path_old, 'remove from ' + old_state, sha=tarball_metadata.sha, branch=branch) - # and move it to the new state's directory - self.git_repo.create_file(file_path_new, 'move to ' + new_state, tarball_metadata.decoded_content, - branch=branch) + metadata_path = '/'.join(self.remote_metadata_path.split('/')[1:]) + file_path_old = old_state + '/' + metadata_path + file_path_new = new_state + '/' + metadata_path + logging.debug(f'Moving metadata file {metadata_path} from {file_path_old} ' + f'to {file_path_new} in branch {branch}.') + print(f'\nMoving metadata file {metadata_path}\n from {file_path_old}' + f'\n to {file_path_new}\n in branch {branch}') + + # tarball_metadata = self.git_repo.get_contents(file_path_old) + # # TODO maybe first create file, then remove? if remove succeeds and create fails, it may be lost + # # Remove the metadata file from the old state's directory... + # self.git_repo.delete_file(file_path_old, 'remove from ' + old_state, sha=tarball_metadata.sha, branch=branch) + # # and move it to the new state's directory + # self.git_repo.create_file(file_path_new, 'move to ' + new_state, tarball_metadata.decoded_content, + # branch=branch) + # USE git commands locally via self.local_repo.git() and push to remote + local_repo_dir = self.local_repo.working_tree_dir + print(f'\n local repo dir: {local_repo_dir}') + prefixed_file_path_old = os.path.join(local_repo_dir, file_path_old) + + git = self.local_repo.git(C=local_repo_dir) + # check branch + br_result = git.branch() + print(f'\n branch: "{br_result}"') + # checkout branch + chkout_result = git.checkout(branch) + print(f'\n checkout: "{chkout_result}"') + # check branch again + bra_result = git.branch() + print(f'\n branch: "{bra_result}"') + + if os.path.exists(prefixed_file_path_old): + print(f'\n moving old file path {file_path_old} with "git mv"') + # make sure target directory exists + target_directory = os.path.dirname(file_path_new) + print(f'\n target directory: {target_directory}') + prefixed_target_directory = os.path.join(local_repo_dir, target_directory) + if not os.path.exists(prefixed_target_directory): + print('\n target directory does not exist yet ... creating it') + os.makedirs(prefixed_target_directory, exist_ok=True) + mv_result = git.mv(file_path_old, file_path_new) + print(f'\n mv_result: "{mv_result}"') + status_result = git.status() + print(f'\n status_result: "{status_result}"') + # commit_result = git.commit(m=f'change state from {old_state} to {new_state}') + self.local_repo.index.commit(f'change state from {old_state} to {new_state}') + # commit result is not of type str, so we can't print it + push_result = git.push('origin', branch) + # print(f'\n type(push_result): {type(push_result)}') + print(f'\n push_result: "{push_result}"') + status_result = git.status() + # print(f'\n type(status_result): {type(status_result)}') + print(f'\n status_result: "{status_result}"') + else: + print(f'\nold file path {file_path_old} does not exist in branch "{branch}"') def reject(self): """Reject a tarball for ingestion.""" # Let's move the the tarball to the directory for rejected tarballs. - logging.info(f'Marking tarball {self.object} as rejected...') + logging.info(f'Marking tarball {self.remote_tarball_path} as rejected...') next_state = 'rejected' - self.move_metadata_file(self.state, next_state) + self.git_move_metadata_file(self.state, next_state) + # update comment in software-layer repo: rejected + self.update_sw_repo_comment('ingest_rejected') def issue_exists(self, title, state='open'): """Check if an issue with the given title and state already exists.""" @@ -327,3 +883,28 @@ def issue_exists(self, title, state='open'): return True else: return False + + def display(self): + """Print overview of object settings.""" + print(f"config: {self.config}") + print(f"self.github: {self.github}") + print(f"self.git_repo: {self.git_repo}") + print(f"self.s3: {self.s3}") + print(f"self.s3_object_etag: {self.s3_object_etag}") + print(f"self.remote_tarball_path: {self.remote_tarball_path}") + print(f"self.remote_metadata_path: {self.remote_metadata_path}") + print(f"self.local_tarball_path: {self.local_tarball_path}") + print(f"self.local_metadata_path: {self.local_metadata_path}") + print(f"self.metadata_raw: {self.metadata_raw}") + print(f"self.metadata_json: {self.metadata_json}") + print(f"self.sw_repo_name: {self.sw_repo_name}") + print(f"self.sw_repo: {self.sw_repo}") + print(f"self.sw_pr_number: {self.sw_pr_number}") + print(f"self.sw_pr: {self.sw_pr}") + print(f"self.sw_pr_comment_id: {self.sw_pr_comment_id}") + print(f"self.sw_pr_comment: {self.sw_pr_comment}") + print(f"self.tarball_name: {self.tarball_name}") + print(f"self.tar_pr: {self.tar_pr}") + print(f"self.bucket: {self.bucket}") + print(f"self.url: {self.url}") + print(f"self.state: {self.state}") diff --git a/scripts/automated_ingestion/utils.py b/scripts/automated_ingestion/utils.py index 66843dd9..c6c5553d 100644 --- a/scripts/automated_ingestion/utils.py +++ b/scripts/automated_ingestion/utils.py @@ -2,6 +2,32 @@ import json import requests +from shared_vars import gh_comment_cache, gh_pr_cache, gh_repo_cache + + +def get_gh_repo(full_repo_name, github_instance): + """Return and cache pygithub.repository object for full_repo_name.""" + if full_repo_name not in gh_repo_cache: + repo = github_instance.get_repo(full_repo_name) + gh_repo_cache[full_repo_name] = repo + return gh_repo_cache[full_repo_name] + + +def get_gh_pr(pr_number, repo_instance): + """Return and cache pygithub.pull_request object for pr_number.""" + if pr_number not in gh_pr_cache: + pr = repo_instance.get_pull(int(pr_number)) + gh_pr_cache[pr_number] = pr + return gh_pr_cache[pr_number] + + +def get_gh_comment(pr_comment_id, pr_instance): + """Return and cache pygithub.issue_comment object for pr_comment_id.""" + if pr_comment_id not in gh_comment_cache: + comment = pr_instance.get_issue_comment(int(pr_comment_id)) + gh_comment_cache[pr_comment_id] = comment + return gh_comment_cache[pr_comment_id] + def send_slack_message(webhook, msg): """Send a Slack message.""" diff --git a/scripts/ingest-tarball.sh b/scripts/ingest-tarball.sh index 25e3883b..570bb20d 100755 --- a/scripts/ingest-tarball.sh +++ b/scripts/ingest-tarball.sh @@ -1,7 +1,7 @@ #!/bin/bash # Ingest a tarball containing software, a compatibility layer, -# or (init) scripts to the EESSI CVMFS repository, and generate +# or (init) scripts to the NESSI CVMFS repository, and generate # nested catalogs in a separate transaction. # This script has to be run on a CVMFS publisher node. @@ -12,9 +12,10 @@ # Only if it passes these checks, the tarball gets ingested to the base dir in the repository specified below. -repo=pilot.eessi-hpc.org +repo=pilot.nessi.no basedir=versions decompress="gunzip -c" +cvmfs_server="cvmfs_server" # list of supported architectures for compat and software layers declare -A archs=(["aarch64"]= ["ppc64le"]= ["riscv64"]= ["x86_64"]=) # list of supported operating systems for compat and software layers @@ -40,6 +41,14 @@ function error() { exit 1 } +function is_repo_owner() { + if [ -f "/etc/cvmfs/repositories.d/${repo}/server.conf" ] + then + . "/etc/cvmfs/repositories.d/${repo}/server.conf" + [ x"$(whoami)" = x"$CVMFS_USER" ] + fi +} + function check_repo_vars() { if [ -z "${repo}" ] then @@ -55,7 +64,7 @@ function check_repo_vars() { function check_version() { if [ -z "${version}" ] then - error "EESSI version cannot be derived from the filename." + error "NESSI version cannot be derived from the filename." fi if [ -z "${tar_top_level_dir}" ] @@ -63,11 +72,11 @@ function check_version() { error "no top level directory can be found in the tarball." fi - # Check if the EESSI version number encoded in the filename + # Check if the NESSI version number encoded in the filename # is valid, i.e. matches the format YYYY.DD if ! echo "${version}" | egrep -q '^20[0-9][0-9]\.(0[0-9]|1[0-2])$' then - error "${version} is not a valid EESSI version." + error "${version} is not a valid NESSI version." fi # Check if the version encoded in the filename matches the top-level dir inside the tarball @@ -104,8 +113,8 @@ function check_contents_type() { function cvmfs_regenerate_nested_catalogs() { # Use the .cvmfsdirtab to generate nested catalogs for the ingested tarball echo "Generating the nested catalogs..." - cvmfs_server transaction "${repo}" - cvmfs_server publish -m "Generate catalogs after ingesting ${tar_file_basename}" "${repo}" + ${cvmfs_server} transaction "${repo}" + ${cvmfs_server} publish -m "Generate catalogs after ingesting ${tar_file_basename}" "${repo}" ec=$? if [ $ec -eq 0 ] then @@ -115,11 +124,23 @@ function cvmfs_regenerate_nested_catalogs() { fi } +function cvmfs_add_more_metadata() { + echo "Adding metadata to tag history" + # example tag history via command 'cvmfs_server tag -x pilot.nessi.no' + # generic-2022-11-16T07:52:56Z 00146ec1fc67287d8d0916ec4edd34f616a7e632 29696 36 1668585186 (default) Generate catalogs after ingesting eessi-2022.11-software-linux-x86_64-generic-1668253670.tar.gz + # trunk 00146ec1fc67287d8d0916ec4edd34f616a7e632 29696 36 1668585186 (default) current HEAD + # generic-2022-11-16T07:52:35Z 6977066e1491827e9be164a407e5230b6de17777 5087232 35 1668585171 (default) + # trunk-previous 6977066e1491827e9be164a407e5230b6de17777 5087232 35 1668585171 (default) default undo target + # generic-2022-11-15T21:55:49Z a5ef00196961b14d8b0d74a261bc909a800fbc29 28672 34 1668549349 (default) Generate catalogs after ingesting eessi-2022.11-software-linux-aarch64-generic-1668253729.tar.gz + LAST_TAG=$(cvmfs_server tag -x ${repo} | head -n 1 | cut -f 1 -d ' ') + cvmfs_server tag -a "${LAST_TAG}-meta" -m "TAR ${tar_file_basename} REPO ${GH_REPO} BRANCH ${BRANCH} PR/COMMIT ${PR_or_COMMIT} WHO ${WHO}" ${repo} +} + function cvmfs_ingest_tarball() { # Do a regular "cvmfs_server ingest" for a given tarball, # followed by regenerating the nested catalog echo "Ingesting tarball ${tar_file} to ${repo}..." - ${decompress} "${tar_file}" | cvmfs_server ingest -t - -b "${basedir}" "${repo}" + ${decompress} "${tar_file}" | ${cvmfs_server} ingest -t - -b "${basedir}" -m "nessi" "${repo}" ec=$? if [ $ec -eq 0 ] then @@ -128,6 +149,8 @@ function cvmfs_ingest_tarball() { error "${tar_file} could not be ingested to ${repo}." fi + cvmfs_add_more_metadata + # "cvmfs_server ingest" doesn't automatically rebuild the nested catalogs, # so we do that forcefully by doing an empty transaction cvmfs_regenerate_nested_catalogs @@ -144,11 +167,13 @@ function check_os() { then error "the operating system directory in the tarball is ${os}, which is not a valid operating system!" fi + echo "OS component is '${os}'" } function check_arch() { # Check if the architecture directory is correctly set for the contents of the tarball - arch=$(echo "${tar_first_file}" | cut -d / -f 4) + arch_and_date=$(echo "${tar_first_file}" | cut -d / -f 4) + arch=${arch_and_date//.*} if [ -z "${arch}" ] then error "no architecture directory found in the tarball!" @@ -157,6 +182,25 @@ function check_arch() { then error "the architecture directory in the tarball is ${arch}, which is not a valid architecture!" fi + echo "full ARCH component is '${arch_and_date}'" + echo "stnd ARCH component is '${arch}'" +} + +function update_lmod_caches() { + # Update the Lmod caches for the stacks of all supported CPUs + script_dir=$(dirname $(realpath $BASH_SOURCE)) + update_caches_script=${script_dir}/update_lmod_caches.sh + if [ ! -f ${update_caches_script} ] + then + error "cannot find the script for updating the Lmod caches; it should be placed in the same directory as the ingestion script!" + fi + if [ ! -x ${update_caches_script} ] + then + error "the script for updating the Lmod caches (${update_caches_script}) does not have execute permissions!" + fi + cvmfs_server transaction "${repo}" + ${update_caches_script} /cvmfs/${repo}/${basedir}/${version} + cvmfs_server publish -m "update Lmod caches after ingesting ${tar_file_basename}" "${repo}" } function ingest_init_tarball() { @@ -174,6 +218,7 @@ function ingest_software_tarball() { check_arch check_os cvmfs_ingest_tarball + update_lmod_caches } function ingest_compat_tarball() { @@ -185,31 +230,34 @@ function ingest_compat_tarball() { then echo_yellow "Compatibility layer for version ${version}, OS ${os}, and architecture ${arch} already exists!" echo_yellow "Removing the existing layer, and adding the new one from the tarball..." - cvmfs_server transaction "${repo}" + ${cvmfs_server} transaction "${repo}" rm -rf "/cvmfs/${repo}/${basedir}/${version}/compat/${os}/${arch}/" - tar -C "/cvmfs/${repo}/${basedir}/" -xzf "${tar_file}" - cvmfs_server publish -m "update compat layer for ${version}, ${os}, ${arch}" "${repo}" + tar --absolute-names -C "/cvmfs/${repo}/${basedir}/" -xzf "${tar_file}" + ${cvmfs_server} publish -m "update compat layer for ${version}, ${os}, ${arch}" "${repo}" ec=$? if [ $ec -eq 0 ] then echo_green "Successfully ingested the new compatibility layer!" + cvmfs_add_more_metadata else - cvmfs_server abort "${repo}" + ${cvmfs_server} abort "${repo}" error "error while updating the compatibility layer, transaction aborted." fi else cvmfs_ingest_tarball fi - } - # Check if a tarball has been specified -if [ "$#" -ne 1 ]; then - error "usage: $0 " +if [ "$#" -ne 5 ]; then + error "usage: $0 " fi tar_file="$1" +GH_REPO="$2" +BRANCH="$3" +PR_or_COMMIT="$4" +WHO="$5" # Check if the given tarball exists if [ ! -f "${tar_file}" ]; then @@ -219,13 +267,52 @@ fi # Get some information about the tarball tar_file_basename=$(basename "${tar_file}") version=$(echo "${tar_file_basename}" | cut -d- -f2) -contents_type_dir=$(echo "${tar_file_basename}" | cut -d- -f3) -tar_first_file=$(tar tf "${tar_file}" | head -n 1) +# contents_type_dir=$(echo "${tar_file_basename}" | cut -d- -f3) +# temporarily use last line to determine contents type correctly if it contains, eg, init files +contents_type_dir=$(tar tf "${tar_file}" | tail -n 1 | cut -d/ -f2) +# need to find a file (not necessarily the first) whose path contains all components: VERSION/TYPE/OS/ARCH +# tar_first_file=$(tar tf "${tar_file}" | head -n 1) +tar_first_file=$(tar tf "${tar_file}" | head -n 4 | tail -n 1) tar_top_level_dir=$(echo "${tar_first_file}" | cut -d/ -f1) -tar_contents_type_dir=$(tar tf "${tar_file}" | head -n 2 | tail -n 1 | cut -d/ -f2) +# Use the 2nd file/dir in the tarball, as the first one may be just "/" +# tar_contents_type_dir=$(tar tf "${tar_file}" | head -n 2 | tail -n 1 | cut -d/ -f2) +# temporarily use last line to determine contents type correctly if it contains, eg, init files +tar_contents_type_dir=$(tar tf "${tar_file}" | tail -n 1 | cut -d/ -f2) + +echo "tar_file_basename....: '${tar_file_basename}'" +echo "version..............: '${version}'" +echo "contents_type_dir....: '${contents_type_dir}'" +echo "tar_first_file.......: '${tar_first_file}'" +echo "tar_top_level_dir....: '${tar_top_level_dir}'" +echo "tar_contents_type_dir: '${tar_contents_type_dir}'" + +# only check if contents_type_dir is software +if [ "x${contents_type_dir}" == "xsoftware" ]; then + check_arch + check_os +fi + +# exit 0 + +# Check if we are running as the CVMFS repo owner, otherwise run cvmfs_server with sudo +is_repo_owner || cvmfs_server="sudo cvmfs_server" + +### add more metadata to tag history +##echo "Adding metadata to tag history" +### cvmfs_server tag -x pilot.nessi.no +### generic-2022-11-16T07:52:56Z 00146ec1fc67287d8d0916ec4edd34f616a7e632 29696 36 1668585186 (default) Generate catalogs after ingesting eessi-2022.11-software-linux-x86_64-generic-1668253670.tar.gz +### trunk 00146ec1fc67287d8d0916ec4edd34f616a7e632 29696 36 1668585186 (default) current HEAD +### generic-2022-11-16T07:52:35Z 6977066e1491827e9be164a407e5230b6de17777 5087232 35 1668585171 (default) +### trunk-previous 6977066e1491827e9be164a407e5230b6de17777 5087232 35 1668585171 (default) default undo target +### generic-2022-11-15T21:55:49Z a5ef00196961b14d8b0d74a261bc909a800fbc29 28672 34 1668549349 (default) Generate catalogs after ingesting eessi-2022.11-software-linux-aarch64-generic-1668253729.tar.gz +##LAST_TAG=$(cvmfs_server tag -x ${repo} | head -n 1 | cut -f 1 -d ' ') +##cvmfs_server tag -a "${LAST_TAG}-meta" -m "TAR ${tar_file_basename} REPO ${GH_REPO} BRANCH ${BRANCH} PR/COMMIT ${PR_or_COMMIT} WHO ${WHO}" ${repo} +## # Do some checks, and ingest the tarball check_repo_vars check_version -check_contents_type +# Disable the call to check_contents_type, as it does not work for tarballs produced +# by our build bot that only contain init files (as they have "software" in the filename) +# check_contents_type ingest_${tar_contents_type_dir}_tarball From 2171d62fe604f6fab254f61329036e5dec513dcc Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 16 Apr 2024 09:29:17 +0200 Subject: [PATCH 2/2] scripts currently used for ingestion in NESSI/2023.06, one more file --- scripts/automated_ingestion/shared_vars.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 scripts/automated_ingestion/shared_vars.py diff --git a/scripts/automated_ingestion/shared_vars.py b/scripts/automated_ingestion/shared_vars.py new file mode 100644 index 00000000..3625dfd3 --- /dev/null +++ b/scripts/automated_ingestion/shared_vars.py @@ -0,0 +1,8 @@ +# dictionary that links a pull request comment id to an issue comment object to minimize requests to GitHub API +gh_comment_cache = {} + +# dictionary that links a pull request number to a pull request object to minimize requests to GitHub API +gh_pr_cache = {} + +# dictionary that maps a repository name to a repository object to minimize requests to GitHub API +gh_repo_cache = {}