From 44792eca181bec27dca5b1afe05b9b175205fe8e Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 15 Aug 2023 11:28:32 +0200 Subject: [PATCH 1/4] eessi_bot_job_manager.py: added comments for categories of imports + ordered imports alphabetically --- eessi_bot_job_manager.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eessi_bot_job_manager.py b/eessi_bot_job_manager.py index fe3193f5..04573f2f 100644 --- a/eessi_bot_job_manager.py +++ b/eessi_bot_job_manager.py @@ -28,20 +28,23 @@ # license: GPLv2 # +# Standard library imports +from datetime import datetime, timezone import os import re import sys import time +# Third party imports (anything installed into the local Python environment) +from pyghee.utils import log +# Local application imports (anything from EESSI/eessi-bot-software-layer) from connections import github -from tools.args import job_manager_parse -from datetime import datetime, timezone from tools import config, run_cmd +from tools.args import job_manager_parse from tools.job_metadata import read_metadata_file from tools.pr_comments import get_submitted_job_comment, update_comment -from pyghee.utils import log AWAITS_LAUNCH = "awaits_launch" FAILURE = "failure" From 079c71d11f745da9fe4b5713daa6255b0da6c080 Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 15 Aug 2023 11:33:01 +0200 Subject: [PATCH 2/4] eessi_bot_job_manager.py: ordered constants alphabetically --- eessi_bot_job_manager.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/eessi_bot_job_manager.py b/eessi_bot_job_manager.py index 04573f2f..bded09fd 100644 --- a/eessi_bot_job_manager.py +++ b/eessi_bot_job_manager.py @@ -49,9 +49,11 @@ AWAITS_LAUNCH = "awaits_launch" FAILURE = "failure" FINISHED_JOB_COMMENTS = "finished_job_comments" -NEW_JOB_COMMENTS = "new_job_comments" +JOB_RESULT_COMMENT_DESCRIPTION = "comment_description" +JOB_RESULT_UNKNOWN_FMT = "job_result_unknown_fmt" MISSING_MODULES = "missing_modules" MULTIPLE_TARBALLS = "multiple_tarballs" +NEW_JOB_COMMENTS = "new_job_comments" NO_MATCHING_TARBALL = "no_matching_tarball" NO_SLURM_OUT = "no_slurm_out" NO_TARBALL_MESSAGE = "no_tarball_message" @@ -60,15 +62,13 @@ SLURM_OUT = "slurm_out" SUCCESS = "success" -JOB_RESULT_UNKNOWN_FMT = "job_result_unknown_fmt" -JOB_RESULT_COMMENT_DESCRIPTION = "comment_description" - REQUIRED_CONFIG = { + FINISHED_JOB_COMMENTS: [FAILURE, JOB_RESULT_UNKNOWN_FMT, MISSING_MODULES, + MULTIPLE_TARBALLS, NO_MATCHING_TARBALL, + NO_SLURM_OUT, NO_TARBALL_MESSAGE, SLURM_OUT, + SUCCESS], NEW_JOB_COMMENTS: [AWAITS_LAUNCH], - RUNNING_JOB_COMMENTS: [RUNNING_JOB], - FINISHED_JOB_COMMENTS: [SUCCESS, FAILURE, NO_SLURM_OUT, SLURM_OUT, MISSING_MODULES, - NO_TARBALL_MESSAGE, NO_MATCHING_TARBALL, MULTIPLE_TARBALLS, - JOB_RESULT_UNKNOWN_FMT] + RUNNING_JOB_COMMENTS: [RUNNING_JOB] } From 77985d54cd6702244582a7a79b329682f762c812 Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 15 Aug 2023 16:35:50 +0200 Subject: [PATCH 3/4] eessi_bot_job_manager.py: improved and completed docstrings --- eessi_bot_job_manager.py | 133 ++++++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 14 deletions(-) diff --git a/eessi_bot_job_manager.py b/eessi_bot_job_manager.py index bded09fd..7a1d86c6 100644 --- a/eessi_bot_job_manager.py +++ b/eessi_bot_job_manager.py @@ -73,14 +73,35 @@ class EESSIBotSoftwareLayerJobManager: - "main class for (Slurm) job manager of EESSI bot (separate process)" + """ + Class for representing the job manager of the build-and-deploy bot. It + monitors the job queue and processes job state changes. + """ def __init__(self): + """ + EESSIBotSoftwareLayerJobManager constructor. Just reads the + configuration to set the path to the logfile. + """ cfg = config.read_config() job_manager_cfg = cfg['job_manager'] self.logfile = job_manager_cfg.get('log_path') def get_current_jobs(self): + """ + Obtains a list of jobs currently managed by the batch system. + Retains key information about each job such as its id and its state. + + Args: + No arguments + + Returns: + (dict): maps a job id to a dictionary containing key information + about a job (currently: 'jobid', 'state' and 'reason') + + Raises: + Exception: if the environment variable USER is not set + """ # who am i username = os.getenv('USER', None) if username is None: @@ -123,13 +144,14 @@ def get_current_jobs(self): return current_jobs def determine_running_jobs(self, current_jobs): - """ determine which jobs are in running state + """ + Determine currently running jobs. Args: current_jobs (dict): dictionary containing data of current jobs Returns: - running_jobs (list): list containing ids of running jobs + (list): list of ids of currently running jobs """ running_jobs = [] for job in current_jobs.values(): @@ -139,6 +161,21 @@ def determine_running_jobs(self, current_jobs): # known_jobs = job_manager.get_known_jobs() def get_known_jobs(self): + """ + Obtain information about jobs that should be known to the job manager + (e.g., before it stopped or when it is resumed after a crash). This + method obtains the information from a local store (database or + filesystem). When comparing its results to the list of jobs currently + registered with the job management system (see method + get_current_jobs()), new jobs and finished jobs can be derived. + + Args: + No arguments + + Returns: + (dict): maps a job id to a dictionary containing key information + about a job (currently: 'jobid') + """ # find all symlinks resembling job ids (digits only) in jobdir known_jobs = {} if os.path.isdir(self.submitted_jobs_dir): @@ -174,6 +211,18 @@ def get_known_jobs(self): # new_jobs = job.manager.determine_new_jobs(known_jobs, current_jobs) def determine_new_jobs(self, known_jobs, current_jobs): + """ + Determine which jobs are new. + + Args: + known_jobs (dict): dictionary with information about jobs that are + already known/seen from before + current_jobs (dict): dictionary with information about jobs that are + currently registered with the job management system + + Returns: + (list): list of ids of new jobs + """ # known_jobs is a dictionary: jobid -> {'jobid':jobid} # current_jobs is a dictionary: jobid -> {'jobid':jobid, # 'state':val,'reason':val} @@ -187,6 +236,18 @@ def determine_new_jobs(self, known_jobs, current_jobs): # finished_jobs = job.manager.determine_finished_jobs(known_jobs, # current_jobs) def determine_finished_jobs(self, known_jobs, current_jobs): + """ + Determine which jobs have finished. + + Args: + known_jobs (dict): dictionary with information about jobs that are + already known/seen from before + current_jobs (dict): dictionary with information about jobs that are + currently registered with the job management system + + Returns: + (list): list of ids of finished jobs + """ # known_jobs is a dictionary: jobid -> {'jobid':jobid} # current_jobs is a dictionary: jobid -> {'jobid':jobid, # 'state':val,'reason':val} @@ -199,7 +260,14 @@ def determine_finished_jobs(self, known_jobs, current_jobs): def read_job_pr_metadata(self, job_metadata_path): """ - Check if metadata file exists, read it and return 'PR' section if so, return None if not. + Read job metadata file and return the contents of the 'PR' section. + + Args: + job_metadata_path (string): path to job metadata file + + Returns: + (ConfigParser): instance of ConfigParser corresponding to the 'PR' + section or None """ # just use a function provided by module tools.job_metadata metadata = read_metadata_file(job_metadata_path, self.logfile) @@ -210,7 +278,14 @@ def read_job_pr_metadata(self, job_metadata_path): def read_job_result(self, job_result_file_path): """ - Check if result file exists, read it and return 'RESULT section if so, return None if not. + Read job result file and return the contents of the 'RESULT' section. + + Args: + job_result_file_path (string): path to job result file + + Returns: + (ConfigParser): instance of ConfigParser corresponding to the + 'RESULT' section or None """ # just use a function provided by module tools.job_metadata result = read_metadata_file(job_result_file_path, self.logfile) @@ -221,11 +296,20 @@ def read_job_result(self, job_result_file_path): # job_manager.process_new_job(current_jobs[nj]) def process_new_job(self, new_job): - # create symlink in submitted_jobs_dir (destination is the working - # dir of the job derived via scontrol) - # release job - # update PR comment with new status (released) + """ + Process a new job by verifying that it is a bot job and if so + - create symlink in submitted_jobs_dir (destination is the working + dir of the job derived via scontrol) + - release the job (so it may be started by the scheduler) + - update the PR comment by adding its new status (released) + Args: + new_job (dict): dictionary storing key information about the job + + Returns: + (bool): True if method completed the tasks described, False if job + is not a bot job + """ job_id = new_job["jobid"] scontrol_cmd = "%s --oneliner show jobid %s" % ( @@ -343,13 +427,19 @@ def process_new_job(self, new_job): return True def process_running_jobs(self, running_job): - """process the jobs in running state and print comment + """ + Process a running job by verifying that it is a bot job and if so + - determines the PR comment body and id corresponding to the job, + - updates the PR comment (if found) Args: running_job (dict): dictionary containing data of the running jobs + Returns: + None (implicitly) + Raises: - Exception: raise exception if there is no metadata file + Exception: if there is no metadata file or reading it failed """ gh = github.get_instance() @@ -407,10 +497,19 @@ def process_running_jobs(self, running_job): ) def process_finished_job(self, finished_job): - """Process a finished job (move symlink, log and update PR comment). + """ + Process a finished job by + - moving the symlink to the directory storing finished jobs, + - updating the PR comment with information from '*.result' file Args: - finished_job (dict): dictionary with information about job + finished_job (dict): dictionary with information about the job + + Returns: + None (implicitly) + + Raises: + Exception: if there is no metadata file or reading it failed """ fn = sys._getframe().f_code.co_name @@ -498,7 +597,13 @@ def process_finished_job(self, finished_job): def main(): - """Main function.""" + """ + Main function which parses command line arguments, verifies if required + configuration settings are defined, creates an instance of + EESSIBotSoftwareLayerJobManager, reads the configuration to initialize + core attributes, determines known jobs and starts the main loop that + monitors jobs. + """ opts = job_manager_parse() From fc200f65e65b5e50f70e09db64b38f9c2c3acc48 Mon Sep 17 00:00:00 2001 From: Thomas Roeblitz Date: Tue, 15 Aug 2023 17:16:37 +0200 Subject: [PATCH 4/4] eessi_bot_job_manager.py: improved and cleaned up comments --- eessi_bot_job_manager.py | 118 +++++++++++++-------------------------- 1 file changed, 38 insertions(+), 80 deletions(-) diff --git a/eessi_bot_job_manager.py b/eessi_bot_job_manager.py index 7a1d86c6..e1936c71 100644 --- a/eessi_bot_job_manager.py +++ b/eessi_bot_job_manager.py @@ -3,15 +3,15 @@ # (Slurm) job manager for the GitHub App for the EESSI project # # This tool monitors EESSI build jobs and acts on state changes of -# these jobs. It releases jobs initially held, it processes finished -# jobs and for both reports status changes/results back to the -# corresponding GitHub pull request to a software-layer repo (origin -# or fork). +# these jobs. It releases jobs initially held, it processes running and +# finished jobs and reports status changes/results back to the +# corresponding GitHub pull request to a target software-layer repository. # # EESSI build jobs are recognised by # - being submitted in JobUserHeld status (sbatch parameter --hold) # - job ids listed in a specific directory (ids being symlinks to job # directories created by EESSI bot) +# - a metadata file in the job's working directory # # This file is part of the EESSI build-and-deploy bot, # see https://github.com/EESSI/eessi-bot-software-layer @@ -102,7 +102,6 @@ def get_current_jobs(self): Raises: Exception: if the environment variable USER is not set """ - # who am i username = os.getenv('USER', None) if username is None: raise Exception("Unable to find username") @@ -114,10 +113,9 @@ def get_current_jobs(self): log_file=self.logfile, ) - # create dictionary of jobs - # if any with the following information per job: - # jobid, state, nodelist_reason - # skip first two lines of output ("range(2,...)") + # create dictionary of jobs from output of 'squeue_cmd' + # with the following information per job: jobid, state, + # nodelist_reason current_jobs = {} lines = str(squeue_output).rstrip().split("\n") bad_state_messages = { @@ -127,8 +125,9 @@ def get_current_jobs(self): } # get job info, logging any Slurm issues + # Note, the first two lines of the output are skipped ("range(2,...)") + # because they contain header information. for i in range(2, len(lines)): - # assume lines 2 to len(lines) contain jobs job = lines[i].rstrip().split() if len(job) >= 9: job_id = job[0] @@ -159,7 +158,6 @@ def determine_running_jobs(self, current_jobs): running_jobs.append(job["jobid"]) return running_jobs - # known_jobs = job_manager.get_known_jobs() def get_known_jobs(self): """ Obtain information about jobs that should be known to the job manager @@ -176,7 +174,9 @@ def get_known_jobs(self): (dict): maps a job id to a dictionary containing key information about a job (currently: 'jobid') """ - # find all symlinks resembling job ids (digits only) in jobdir + # find all symlinks resembling job ids (digits only) in + # self.submitted_jobs_dir (the symlink is created by method + # process_new_job) known_jobs = {} if os.path.isdir(self.submitted_jobs_dir): regex = re.compile(r"(\d)+") @@ -209,7 +209,6 @@ def get_known_jobs(self): return known_jobs - # new_jobs = job.manager.determine_new_jobs(known_jobs, current_jobs) def determine_new_jobs(self, known_jobs, current_jobs): """ Determine which jobs are new. @@ -223,9 +222,6 @@ def determine_new_jobs(self, known_jobs, current_jobs): Returns: (list): list of ids of new jobs """ - # known_jobs is a dictionary: jobid -> {'jobid':jobid} - # current_jobs is a dictionary: jobid -> {'jobid':jobid, - # 'state':val,'reason':val} new_jobs = [] for ckey in current_jobs: if ckey not in known_jobs: @@ -233,8 +229,6 @@ def determine_new_jobs(self, known_jobs, current_jobs): return new_jobs - # finished_jobs = job.manager.determine_finished_jobs(known_jobs, - # current_jobs) def determine_finished_jobs(self, known_jobs, current_jobs): """ Determine which jobs have finished. @@ -248,9 +242,6 @@ def determine_finished_jobs(self, known_jobs, current_jobs): Returns: (list): list of ids of finished jobs """ - # known_jobs is a dictionary: jobid -> {'jobid':jobid} - # current_jobs is a dictionary: jobid -> {'jobid':jobid, - # 'state':val,'reason':val} finished_jobs = [] for kkey in known_jobs: if kkey not in current_jobs: @@ -269,7 +260,7 @@ def read_job_pr_metadata(self, job_metadata_path): (ConfigParser): instance of ConfigParser corresponding to the 'PR' section or None """ - # just use a function provided by module tools.job_metadata + # reuse function from module tools.job_metadata to read metadata file metadata = read_metadata_file(job_metadata_path, self.logfile) if metadata and "PR" in metadata: return metadata["PR"] @@ -287,14 +278,13 @@ def read_job_result(self, job_result_file_path): (ConfigParser): instance of ConfigParser corresponding to the 'RESULT' section or None """ - # just use a function provided by module tools.job_metadata + # reuse function from module tools.job_metadata to read metadata file result = read_metadata_file(job_result_file_path, self.logfile) if result and "RESULT" in result: return result["RESULT"] else: return None - # job_manager.process_new_job(current_jobs[nj]) def process_new_job(self, new_job): """ Process a new job by verifying that it is a bot job and if so @@ -322,8 +312,8 @@ def process_new_job(self, new_job): log_file=self.logfile, ) - # parse output, - # look for WorkDir=dir + # parse output of 'scontrol_cmd' to determine the job's working + # directory match = re.search(r".* WorkDir=(\S+) .*", str(scontrol_output)) if match: @@ -338,7 +328,8 @@ def process_new_job(self, new_job): job_id, ) - # check if metadata file exist + # assuming that a bot job's working directory contains a metadata + # file, its existence is used to check if the job belongs to the bot metadata_pr = self.read_job_pr_metadata(job_metadata_path) if metadata_pr is None: @@ -365,20 +356,9 @@ def process_new_job(self, new_job): log_file=self.logfile, ) - # update PR - # (a) get repo name and PR number - # from file _bot_job.metadata - # (b) find & get comment for this job - # (c) add a row to the table - - # (a) get repo name and PR number from - # file _bot_job.metadata - # the file should be written by the event handler - # to the working dir of the job - - # get repo name + # update PR defined by repo and pr_number stored in the job's + # metadata file repo_name = metadata_pr.get("repo", "") - # get pr number pr_number = metadata_pr.get("pr_number", None) gh = github.get_instance() @@ -386,7 +366,7 @@ def process_new_job(self, new_job): repo = gh.get_repo(repo_name) pr = repo.get_pull(int(pr_number)) - # (b) find & get comment for this job + # find & get comment for this job # only get comment if we don't know its id yet if "comment_id" not in new_job: new_job_cmnt = get_submitted_job_comment(pr, new_job['jobid']) @@ -399,8 +379,7 @@ def process_new_job(self, new_job): ) new_job["comment_id"] = new_job_cmnt.id - # (c) add a row to the table - # add row to status table if we found a comment + # update status table if we found a comment if "comment_id" in new_job: new_job_comments_cfg = config.read_config()[NEW_JOB_COMMENTS] dt = datetime.now(timezone.utc) @@ -413,11 +392,7 @@ def process_new_job(self, new_job): " for job '%s'" % job_id, self.logfile, ) - # TODO just create one? else: - # TODO can we run this tool on a job directory? the path to - # the directory might be obtained from - # a comment to the PR log( "process_new_job(): did not find work dir for job '%s'" % job_id, @@ -444,12 +419,9 @@ def process_running_jobs(self, running_job): gh = github.get_instance() - # set some variables for accessing work dir of job + # set variable for accessing the working directory of the job job_dir = os.path.join(self.submitted_jobs_dir, running_job["jobid"]) - # TODO create function for obtaining values from metadata file - # might be based on allowing multiple configuration files - # in tools/config.py metadata_file = "_bot_job%s.metadata" % running_job["jobid"] job_metadata_path = os.path.join(job_dir, metadata_file) @@ -458,9 +430,7 @@ def process_running_jobs(self, running_job): if metadata_pr is None: raise Exception("Unable to find metadata file") - # get repo name repo_name = metadata_pr.get("repo", "") - # get pr number pr_number = metadata_pr.get("pr_number", None) repo = gh.get_repo(repo_name) @@ -513,13 +483,9 @@ def process_finished_job(self, finished_job): """ fn = sys._getframe().f_code.co_name - # PROCEDURE - # - MOVE symlink to finished dir - # - REPORT status always to log, if accessible also to PR comment - job_id = finished_job['jobid'] - # MOVE symlink from job_ids_dir/submitted to jobs_ids_dir/finished + # move symlink from job_ids_dir/submitted to jobs_ids_dir/finished old_symlink = os.path.join(self.submitted_jobs_dir, job_id) finished_jobs_dir = os.path.join(self.job_ids_dir, "finished") @@ -548,10 +514,9 @@ def process_finished_job(self, finished_job): job_result_file_path = os.path.join(new_symlink, job_result_file) job_results = self.read_job_result(job_result_file_path) - # set comment_description in case no results were found (self.read_job_result - # returned None), it's also used (reused actually) in case the job - # results do not have a preformatted comment job_result_unknown_fmt = finished_job_comments_cfg[JOB_RESULT_UNKNOWN_FMT] + # set fallback comment_description in case no result file was found + # (self.read_job_result returned None) comment_description = job_result_unknown_fmt.format(filename=job_result_file) if job_results: # get preformatted comment_description or use previously set default for unknown @@ -568,24 +533,18 @@ def process_finished_job(self, finished_job): comment_update = f"\n|{dt.strftime('%b %d %X %Z %Y')}|finished|" comment_update += f"{comment_description}|" - # obtain id of PR comment to be updated (from _bot_jobID.metadata) + # obtain id of PR comment to be updated (from file '_bot_jobID.metadata') metadata_file = f"_bot_job{job_id}.metadata" job_metadata_path = os.path.join(new_symlink, metadata_file) metadata_pr = self.read_job_pr_metadata(job_metadata_path) if metadata_pr is None: - # TODO should we raise the Exception here? maybe first process - # the finished job and raise an exception at the end? raise Exception("Unable to find metadata file ... skip updating PR comment") - # get repo name repo_name = metadata_pr.get("repo", None) - # get pr number pr_number = metadata_pr.get("pr_number", -1) - # get pr comment id pr_comment_id = metadata_pr.get("pr_comment_id", -1) log(f"{fn}(): pr comment id {pr_comment_id}", self.logfile) - # establish contact to pull request on github gh = github.get_instance() repo = gh.get_repo(repo_name) @@ -607,7 +566,8 @@ def main(): opts = job_manager_parse() - # config is read and checked for settings to raise an exception early when the job_manager runs. + # config is read and checked for settings to raise an exception early when + # the job_manager runs config.check_required_cfg_settings(REQUIRED_CONFIG) github.connect() @@ -629,13 +589,16 @@ def main(): # before main loop, get list of known jobs (stored on disk) # main loop + # --------- # get current jobs of the bot user (job id, state, reason) - # (assume all are jobs building software) # determine new jobs (comparing known and current jobs) - # process new jobs (filtered by optional command line option) + # process new jobs (filtered by optional command line option) + # determine running jobs (comparing known and current jobs) + # process running jobs (filtered by optional command line option) # determine finished jobs (comparing known and current jobs) - # process finished jobs (filtered by optional command line option) + # process finished jobs (filtered by optional command line option) # set known jobs to list of current jobs + # wait configurable period before next iteration begins max_iter = int(opts.max_manager_iterations) # retrieve some settings from app.cfg @@ -691,15 +654,12 @@ def main(): for nj in new_jobs: # assume it is not a bot job is_bot_job = False + # apply filtering of job ids if not job_manager.job_filter or nj in job_manager.job_filter: is_bot_job = job_manager.process_new_job(current_jobs[nj]) if not is_bot_job: # add job id to non_bot_jobs list non_bot_jobs.append(nj) - # else: - # log("job manager main loop: skipping new job" - # " %s due to parameter '--jobs %s'" % ( - # nj,opts.jobs), job_manager.logfile) # remove non bot jobs from current_jobs for job in non_bot_jobs: @@ -713,6 +673,7 @@ def main(): ) for rj in running_jobs: + # apply filtering of job ids if not job_manager.job_filter or rj in job_manager.job_filter: job_manager.process_running_jobs(current_jobs[rj]) @@ -725,12 +686,9 @@ def main(): ) # process finished jobs for fj in finished_jobs: + # apply filtering of job ids if not job_manager.job_filter or fj in job_manager.job_filter: job_manager.process_finished_job(known_jobs[fj]) - # else: - # log("job manager main loop: skipping finished " - # "job %s due"" to parameter '--jobs %s'" % (fj,opts.jobs), - # " job_manager.logfile)" known_jobs = current_jobs