diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..24ec8e4 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,21 @@ +repos: + +- repo: https://github.com/ikamensh/flynt + rev: '' # Specify a specific version if desired + hooks: + - id: flynt + args: [ "--line-length", "160"] # Flynt does not support pyproject.toml in special locations + +- repo: https://github.com/psf/black + rev: 23.9.1 + hooks: + - id: black + types: [python] + args: ["--config", "pyproject.toml"] + +- repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + name: isort (python) + args: ["--settings-path", "pyproject.toml"] diff --git a/bin/hgcs_master.py b/bin/hgcs_master.py index ae27041..f81f0c3 100644 --- a/bin/hgcs_master.py +++ b/bin/hgcs_master.py @@ -2,11 +2,15 @@ main executable of HGCS """ -import os -import sys -import inspect import argparse +import inspect import logging +import os +import sys + +from hgcs import agents # noqa: E402 +from hgcs import hgcs_config # noqa: E402 +from hgcs import utils # noqa: E402 # # Get main directory path # _MAIN_DIR = os.path.join( os.path.dirname(__file__), '..' ) @@ -15,34 +19,28 @@ # _LIB_PATH = os.path.join( _MAIN_DIR, 'lib' ) # sys.path.insert(0, _LIB_PATH) -from hgcs import hgcs_config # noqa: E402 -from hgcs import agents # noqa: E402 -from hgcs import utils # noqa: E402 -#=============================================================== +# =============================================================== LOG_LEVEL_MAP = { - 'ERROR': logging.ERROR, - 'WARNING': logging.WARNING, - 'INFO': logging.INFO, - 'DEBUG': logging.DEBUG, + "ERROR": logging.ERROR, + "WARNING": logging.WARNING, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, } -#=============================================================== +# =============================================================== + def main(): """ main function """ # command argparse - oparser = argparse.ArgumentParser(prog='hgcs', add_help=True) + oparser = argparse.ArgumentParser(prog="hgcs", add_help=True) # subparsers = oparser.add_subparsers() - oparser.add_argument('-c', '--config', action='store', - dest='config', metavar='', - help='Configuration file') - oparser.add_argument('-F', '--foregroudlog', action='store_true', - dest='foregroudlog', - help='Print logs to foregroud') + oparser.add_argument("-c", "--config", action="store", dest="config", metavar="", help="Configuration file") + oparser.add_argument("-F", "--foregroudlog", action="store_true", dest="foregroudlog", help="Print logs to foregroud") # start parsing if len(sys.argv) == 1: oparser.print_help() @@ -52,63 +50,60 @@ def main(): if os.path.isfile(arguments.config): config_file_path = arguments.config else: - print(f'Invalid configuration file: {arguments.config}') + print(f"Invalid configuration file: {arguments.config}") sys.exit(1) # defaults - log_file = '/tmp/hgcs.log' - log_level = 'DEBUG' + log_file = "/tmp/hgcs.log" + log_level = "DEBUG" logger_format_colored = True # load config try: config = hgcs_config.ConfigClass(config_file_path) except IOError as exc: - print(f'IOError: {exc}') + print(f"IOError: {exc}") sys.exit(1) except Exception as exc: - print(f'Cannot load conifg: {exc}') + print(f"Cannot load conifg: {exc}") sys.exit(1) # handle master part of config try: - master_section = getattr(config, 'Master') + master_section = getattr(config, "Master") except AttributeError: pass else: - if getattr(master_section, 'log_file', False): - log_file = getattr(master_section, 'log_file') + if getattr(master_section, "log_file", False): + log_file = getattr(master_section, "log_file") logger_format_colored = False - if getattr(master_section, 'log_level', False): - log_level = getattr(master_section, 'log_level') + if getattr(master_section, "log_level", False): + log_level = getattr(master_section, "log_level") # case for logs to foregroud stderr if arguments.foregroudlog: log_file = None logger_format_colored = True # add threads of agents to run thread_list = [] - for name, class_obj in inspect.getmembers(agents, - lambda m: inspect.isclass(m) and m.__module__ == 'hgcs.agents'): + for name, class_obj in inspect.getmembers(agents, lambda m: inspect.isclass(m) and m.__module__ == "hgcs.agents"): if hasattr(config, name): section = getattr(config, name) - if getattr(section, 'enable', False): + if getattr(section, "enable", False): param_dict = { - 'sleep_period': getattr(section, 'sleep_period'), - 'flush_period': getattr(section, 'flush_period', None), - 'grace_period': getattr(section, 'grace_period', None), - 'limit': getattr(section, 'limit', None), - } + "sleep_period": getattr(section, "sleep_period"), + "flush_period": getattr(section, "flush_period", None), + "grace_period": getattr(section, "grace_period", None), + "limit": getattr(section, "limit", None), + } agent_instance = class_obj(**param_dict) - utils.setup_logger(agent_instance.logger, - pid=agent_instance.get_pid, - colored=logger_format_colored, - to_file=log_file) + utils.setup_logger(agent_instance.logger, pid=agent_instance.get_pid, colored=logger_format_colored, to_file=log_file) logging_log_level = LOG_LEVEL_MAP.get(log_level, logging.ERROR) agent_instance.logger.setLevel(logging_log_level) thread_list.append(agent_instance) # run threads for thr in thread_list: - print(f'Start thread of agent {thr.__class__.__name__}') + print(f"Start thread of agent {thr.__class__.__name__}") thr.start() -#=============================================================== -if __name__ == '__main__': +# =============================================================== + +if __name__ == "__main__": main() diff --git a/lib/hgcs/agents.py b/lib/hgcs/agents.py index 7be5e0c..40a49fc 100644 --- a/lib/hgcs/agents.py +++ b/lib/hgcs/agents.py @@ -2,16 +2,17 @@ agents of HGCS """ -import os # import sys import errno +import os +import re import shutil import time -import re import htcondor +from hgcs.utils import MySchedd, ThreadBase, global_lock # noqa: E402 -#=============================================================== +# =============================================================== # # Get main directory path # _MAIN_DIR = os.path.join( os.path.dirname(__file__), '..' ) @@ -20,19 +21,21 @@ # _LIB_PATH = os.path.join( _MAIN_DIR, 'lib' ) # sys.path.insert(0, _LIB_PATH) -from hgcs.utils import ThreadBase, MySchedd, global_lock # noqa: E402 -#=============================================================== +# =============================================================== + def get_condor_job_id(job): """ get full condor job ID as ClusterId.ProcId """ - cluster_id = job.get('ClusterId') - proc_id = job.get('ProcId') - return f'{cluster_id}.{proc_id}' + cluster_id = job.get("ClusterId") + proc_id = job.get("ProcId") + return f"{cluster_id}.{proc_id}" + + +# =============================================================== -#=============================================================== class LogRetriever(ThreadBase): """ @@ -40,19 +43,20 @@ class LogRetriever(ThreadBase): """ projection = [ - 'ClusterId', 'ProcId', 'JobStatus', - 'Iwd', 'Err', 'Out', 'UserLog', - 'SUBMIT_UserLog', 'SUBMIT_TransferOutputRemaps', - ] - - requirements = ( - 'isString(SUBMIT_UserLog) ' - '&& LeaveJobInQueue isnt false ' - '&& ( JobStatus == 4 ' - '|| JobStatus == 3 ) ' - ) - - def __init__(self, flush_period = 86400, retrieve_mode='copy', **kwarg): + "ClusterId", + "ProcId", + "JobStatus", + "Iwd", + "Err", + "Out", + "UserLog", + "SUBMIT_UserLog", + "SUBMIT_TransferOutputRemaps", + ] + + requirements = "isString(SUBMIT_UserLog) " "&& LeaveJobInQueue isnt false " "&& ( JobStatus == 4 " "|| JobStatus == 3 ) " + + def __init__(self, flush_period=86400, retrieve_mode="copy", **kwarg): ThreadBase.__init__(self, **kwarg) if flush_period is None: self.flush_period = 86400 @@ -61,15 +65,15 @@ def __init__(self, flush_period = 86400, retrieve_mode='copy', **kwarg): self.retrieve_mode = retrieve_mode def run(self): - self.logger.debug(f'startTimestamp: {self.start_timestamp}') + self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: - self.logger.info('run starts') + self.logger.info("run starts") if time.time() > last_flush_timestamp + self.flush_period: last_flush_timestamp = time.time() already_handled_job_id_set = set() - self.logger.info('flushed already_handled_job_id_set') + self.logger.info("flushed already_handled_job_id_set") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -77,39 +81,38 @@ def run(self): break except RuntimeError as exc: if i_try < n_try: - self.logger.warning(f'{exc} . Retry...') + self.logger.warning(f"{exc} . Retry...") time.sleep(3) else: - self.logger.error(f'{exc} . No more retry. Exit') + self.logger.error(f"{exc} . No more retry. Exit") return - for job in schedd.xquery(constraint=self.requirements, - projection=self.projection): + for job in schedd.xquery(constraint=self.requirements, projection=self.projection): job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug(f'to retrieve for condor job {job_id}') - if self.retrieve_mode == 'symlink': + self.logger.debug(f"to retrieve for condor job {job_id}") + if self.retrieve_mode == "symlink": self.via_system(job, symlink_mode=True) - elif self.retrieve_mode == 'copy': + elif self.retrieve_mode == "copy": ret_val = self.via_system(job) if ret_val: already_handled_job_id_set.add(job_id) - elif self.retrieve_mode == 'condor': + elif self.retrieve_mode == "condor": self.via_condor_retrieve(job) n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(already_handled_job_id_set), 'LeaveJobInQueue', 'false') + schedd.edit(list(already_handled_job_id_set), "LeaveJobInQueue", "false") except RuntimeError: if i_try < n_try: - self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning(f'failed to edit job {job_id} . Skipped...') + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.clear() break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) def via_system(self, job, symlink_mode=False): @@ -118,22 +121,22 @@ def via_system(self, job, symlink_mode=False): """ ret_val = True job_id = get_condor_job_id(job) - src_dir = job.get('Iwd') - src_err_name = job.get('Err') - src_out_name = job.get('Out') - src_log_name = job.get('UserLog') + src_dir = job.get("Iwd") + src_err_name = job.get("Err") + src_out_name = job.get("Out") + src_log_name = job.get("UserLog") src_err = os.path.join(src_dir, src_err_name) src_out = os.path.join(src_dir, src_out_name) src_log = os.path.join(src_dir, src_log_name) dest_err = None dest_out = None - dest_log = job.get('SUBMIT_UserLog') - transfer_remap_list = str(job.get('SUBMIT_TransferOutputRemaps')).split(';') + dest_log = job.get("SUBMIT_UserLog") + transfer_remap_list = str(job.get("SUBMIT_TransferOutputRemaps")).split(";") if not dest_log: - self.logger.debug(f'{job_id} has no attribute of spool. Skipped...') + self.logger.debug(f"{job_id} has no attribute of spool. Skipped...") return True for _m in transfer_remap_list: - match = re.search(r'([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)', _m) + match = re.search(r"([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)", _m) if match: name = match.group(1) dest_path = os.path.normpath(match.group(2)) @@ -145,33 +148,33 @@ def via_system(self, job, symlink_mode=False): dest_err = dest_path for src_path, dest_path in zip([src_err, src_out, src_log], [dest_err, dest_out, dest_log]): if not os.path.isfile(src_path) or os.path.islink(src_path): - if job.get('JobStatus') != 4: + if job.get("JobStatus") != 4: continue ret_val = False - self.logger.error(f'{src_path} is not a regular file. Skipped...') + self.logger.error(f"{src_path} is not a regular file. Skipped...") continue if not dest_path: ret_val = False - self.logger.error(f'no destination path for {src_path} . Skipped...') + self.logger.error(f"no destination path for {src_path} . Skipped...") continue try: if symlink_mode: os.symlink(src_path, dest_path) if os.path.islink(dest_path): - self.logger.debug(f'{dest_path} symlink made') + self.logger.debug(f"{dest_path} symlink made") else: ret_val = False - self.logger.error(f'{dest_path} made but not found') + self.logger.error(f"{dest_path} made but not found") else: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): - self.logger.debug(f'{dest_path} copy made') + self.logger.debug(f"{dest_path} copy made") else: ret_val = False - self.logger.error(f'{dest_path} made but not found') + self.logger.error(f"{dest_path} made but not found") except OSError as exc: if exc.errno == errno.EEXIST: - self.logger.debug(f'{dest_path} file already exists. Skipped...') + self.logger.debug(f"{dest_path} file already exists. Skipped...") else: ret_val = False self.logger.error(exc) @@ -193,23 +196,17 @@ class CleanupDelayer(ThreadBase): agent to adjust LeaveJobInQueue of jobs to delay cleanup of the jobs """ - requirements = ( - 'SUBMIT_UserLog is undefined ' - '&& LeaveJobInQueue is false ' - '&& ( member(JobStatus, {1,2,5,6,7}) )' - ) - ad_LeaveJobInQueue_template = ( - '( time() - EnteredCurrentStatus ) < {delay_time} ' - ) + requirements = "SUBMIT_UserLog is undefined " "&& LeaveJobInQueue is false " "&& ( member(JobStatus, {1,2,5,6,7}) )" + ad_LeaveJobInQueue_template = "( time() - EnteredCurrentStatus ) < {delay_time} " def __init__(self, sleep_period=60, delay_time=7200): ThreadBase.__init__(self) self.delay_time = delay_time def run(self): - self.logger.debug(f'startTimestamp: {self.start_timestamp}') + self.logger.debug(f"startTimestamp: {self.start_timestamp}") while True: - self.logger.info('run starts') + self.logger.info("run starts") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -217,30 +214,27 @@ def run(self): break except RuntimeError as exc: if i_try < n_try: - self.logger.warning(f'{exc} . Retry...') + self.logger.warning(f"{exc} . Retry...") time.sleep(3) else: - self.logger.error(f'{exc} . No more retry. Exit') + self.logger.error(f"{exc} . No more retry. Exit") return - job_id_list = [ get_condor_job_id(job) \ - for job in schedd.xquery(constraint=self.requirements) ] + job_id_list = [get_condor_job_id(job) for job in schedd.xquery(constraint=self.requirements)] n_jobs = len(job_id_list) n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(job_id_list, 'LeaveJobInQueue', - self.ad_LeaveJobInQueue_template.format( - delay_time=self.delay_time)) + schedd.edit(job_id_list, "LeaveJobInQueue", self.ad_LeaveJobInQueue_template.format(delay_time=self.delay_time)) except RuntimeError: if i_try < n_try: - self.logger.warning(f'failed to edit {n_jobs} jobs . Retry: {i_try}') + self.logger.warning(f"failed to edit {n_jobs} jobs . Retry: {i_try}") time.sleep(1) else: - self.logger.warning(f'failed to edit {n_jobs} jobs . Skipped...') + self.logger.warning(f"failed to edit {n_jobs} jobs . Skipped...") else: - self.logger.debug(f'adjusted LeaveJobInQueue of {n_jobs} condor jobs ') + self.logger.debug(f"adjusted LeaveJobInQueue of {n_jobs} condor jobs ") break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) @@ -250,15 +244,16 @@ class SDFFetcher(ThreadBase): """ projection = [ - 'ClusterId', 'ProcId', 'JobStatus', - 'UserLog', 'SUBMIT_UserLog', - 'sdfPath', 'sdfCopied', - ] + "ClusterId", + "ProcId", + "JobStatus", + "UserLog", + "SUBMIT_UserLog", + "sdfPath", + "sdfCopied", + ] - requirements = ( - '(isUndefined(sdfCopied) || sdfCopied == 0) ' - '&& isString(sdfPath) ' - ) + requirements = "(isUndefined(sdfCopied) || sdfCopied == 0) " "&& isString(sdfPath) " def __init__(self, flush_period=86400, limit=6000, **kwarg): ThreadBase.__init__(self, **kwarg) @@ -272,15 +267,15 @@ def __init__(self, flush_period=86400, limit=6000, **kwarg): self.limit = 6000 def run(self): - self.logger.debug(f'startTimestamp: {self.start_timestamp}') + self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: - self.logger.info('run starts') + self.logger.info("run starts") if time.time() > last_flush_timestamp + self.flush_period: last_flush_timestamp = time.time() already_handled_job_id_set = set() - self.logger.info('flushed already_handled_job_id_set') + self.logger.info("flushed already_handled_job_id_set") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -288,40 +283,38 @@ def run(self): break except RuntimeError as exc: if i_try < n_try: - self.logger.warning(f'{exc} . Retry...') + self.logger.warning(f"{exc} . Retry...") time.sleep(3) else: - self.logger.error(f'{exc} . No more retry. Exit') + self.logger.error(f"{exc} . No more retry. Exit") return already_sdf_copied_job_id_set = set() to_skip_sdf_copied_job_id_set = set() try: - jobs_iter = schedd.xquery(constraint=self.requirements, - projection=self.projection, - limit=self.limit) + jobs_iter = schedd.xquery(constraint=self.requirements, projection=self.projection, limit=self.limit) for job in jobs_iter: job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug(f'to copy sdf for condor job {job_id}') + self.logger.debug(f"to copy sdf for condor job {job_id}") ret_val = self.via_system(job) if ret_val is True: already_sdf_copied_job_id_set.add(job_id) elif ret_val is False: to_skip_sdf_copied_job_id_set.add(job_id) except RuntimeError as exc: - self.logger.error(f'Failed to query jobs. Exit. RuntimeError: {exc} ') + self.logger.error(f"Failed to query jobs. Exit. RuntimeError: {exc} ") else: n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(already_sdf_copied_job_id_set), 'sdfCopied', '1') + schedd.edit(list(already_sdf_copied_job_id_set), "sdfCopied", "1") except RuntimeError: if i_try < n_try: - self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning(f'failed to edit job {job_id} . Skipped...') + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.update(already_sdf_copied_job_id_set) already_sdf_copied_job_id_set.clear() @@ -329,18 +322,18 @@ def run(self): n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(to_skip_sdf_copied_job_id_set), 'sdfCopied', '2') + schedd.edit(list(to_skip_sdf_copied_job_id_set), "sdfCopied", "2") except RuntimeError: if i_try < n_try: - self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning(f'failed to edit job {job_id} . Skipped...') + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.update(to_skip_sdf_copied_job_id_set) to_skip_sdf_copied_job_id_set.clear() break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) def via_system(self, job): @@ -349,37 +342,37 @@ def via_system(self, job): """ ret_val = True job_id = get_condor_job_id(job) - src_path = job.get('sdfPath') - dest_log = job.get('SUBMIT_UserLog') + src_path = job.get("sdfPath") + dest_log = job.get("SUBMIT_UserLog") if not dest_log: - dest_log = job.get('UserLog') + dest_log = job.get("UserLog") if not dest_log: - self.logger.debug(f'{job_id} has no valid SUBMIT_UserLog nor UserLog. Skipped...') + self.logger.debug(f"{job_id} has no valid SUBMIT_UserLog nor UserLog. Skipped...") return True dest_dir = os.path.dirname(dest_log) - dest_filename = re.sub(r'.log$', '.jdl', os.path.basename(dest_log)) + dest_filename = re.sub(r".log$", ".jdl", os.path.basename(dest_log)) dest_path = os.path.normpath(os.path.join(dest_dir, dest_filename)) if not os.path.isfile(src_path): ret_val = False - self.logger.error(f'{src_path} is not a regular file. Skipped...') + self.logger.error(f"{src_path} is not a regular file. Skipped...") if not dest_path: ret_val = False - self.logger.error(f'no destination path for {src_path} . Skipped...') + self.logger.error(f"no destination path for {src_path} . Skipped...") if ret_val is True: if os.path.isfile(dest_path): - self.logger.debug(f'{dest_path} file already exists. Skipped...') + self.logger.debug(f"{dest_path} file already exists. Skipped...") return True try: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): os.chmod(dest_path, 0o644) - self.logger.debug(f'{dest_path} copy made') + self.logger.debug(f"{dest_path} copy made") else: ret_val = None - self.logger.error(f'{dest_path} made but not found') + self.logger.error(f"{dest_path} made but not found") except OSError as exc: if exc.errno == errno.EEXIST: - self.logger.debug(f'{dest_path} file already exists. Skipped...') + self.logger.debug(f"{dest_path} file already exists. Skipped...") else: ret_val = None self.logger.error(exc) @@ -394,10 +387,7 @@ class XJobCleaner(ThreadBase): agent to clean up jobs in removed status in the queue with forcex """ - requirements_template = ( - 'JobStatus =?= 3 ' - '&& time() - EnteredCurrentStatus >= {grace_period} ' - ) + requirements_template = "JobStatus =?= 3 " "&& time() - EnteredCurrentStatus >= {grace_period} " def __init__(self, grace_period=86400, **kwarg): ThreadBase.__init__(self, **kwarg) @@ -407,9 +397,9 @@ def __init__(self, grace_period=86400, **kwarg): self.grace_period = grace_period def run(self): - self.logger.debug(f'startTimestamp: {self.start_timestamp}') + self.logger.debug(f"startTimestamp: {self.start_timestamp}") while True: - self.logger.info('run starts') + self.logger.info("run starts") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -417,20 +407,19 @@ def run(self): break except RuntimeError as exc: if i_try < n_try: - self.logger.warning(f'{exc} . Retry...') + self.logger.warning(f"{exc} . Retry...") time.sleep(3) else: - self.logger.error(f'{exc} . No more retry. Exit') + self.logger.error(f"{exc} . No more retry. Exit") return try: - requirements = self.requirements_template.format( - grace_period=int(self.grace_period)) - self.logger.debug('try to remove-x jobs') + requirements = self.requirements_template.format(grace_period=int(self.grace_period)) + self.logger.debug("try to remove-x jobs") with global_lock: act_ret = schedd.act(htcondor.JobAction.RemoveX, requirements) except RuntimeError as exc: - self.logger.error(f'Failed to remove-x jobs. Exit. RuntimeError: {exc} ') + self.logger.error(f"Failed to remove-x jobs. Exit. RuntimeError: {exc} ") else: - self.logger.debug(f'act return : {str(dict(act_ret))}') - self.logger.info('run ends') + self.logger.debug(f"act return : {str(dict(act_ret))}") + self.logger.info("run ends") time.sleep(self.sleep_period) diff --git a/lib/hgcs/hgcs_config.py b/lib/hgcs/hgcs_config.py index d8f5a59..5fc043a 100644 --- a/lib/hgcs/hgcs_config.py +++ b/lib/hgcs/hgcs_config.py @@ -3,34 +3,38 @@ """ import os -import sys import re +import sys try: import configparser except ImportError: import ConfigParser as configparser -#=============================================================== +# =============================================================== + -class _SectionClass(): +class _SectionClass: """ dummy class for config section """ + def __init__(self): pass -class ConfigClass(): + +class ConfigClass: """ class for HGCS configurations """ + def __init__(self, config_file=None): # get ConfigParser tmp_conf = configparser.ConfigParser() # default and env variable for config file path config_path_specified = os.path.normpath(config_file) - config_env_var = 'HGCS_CONFIG_PATH' - config_path_default = '/etc/hgcs.cfg' + config_env_var = "HGCS_CONFIG_PATH" + config_path_default = "/etc/hgcs.cfg" if config_path_specified: config_path = config_path_specified elif config_env_var in os.environ: @@ -41,7 +45,7 @@ def __init__(self, config_file=None): try: tmp_conf.read(config_path) except Exception as exc: - print(f'Failed to read config file from {config_path}: {exc}') + print(f"Failed to read config file from {config_path}: {exc}") raise exc # loop over all sections for tmp_section in tmp_conf.sections(): @@ -54,23 +58,23 @@ def __init__(self, config_file=None): # expand all values for tmp_key, tmp_val in tmp_dict.items(): # use env vars - if tmp_val.startswith('$'): - tmp_match = re.search(r'\$\{*([^\}]+)\}*', tmp_val) + if tmp_val.startswith("$"): + tmp_match = re.search(r"\$\{*([^\}]+)\}*", tmp_val) env_name = tmp_match.group(1) if env_name not in os.environ: - raise KeyError(f'{env_name} in config is undefined env variable') + raise KeyError(f"{env_name} in config is undefined env variable") tmp_val = os.environ[env_name] # convert string to bool/int - if tmp_val.lower() == 'true': + if tmp_val.lower() == "true": tmp_val = True - elif tmp_val.lower() == 'false': + elif tmp_val.lower() == "false": tmp_val = False - elif tmp_val.lower() == 'none': + elif tmp_val.lower() == "none": tmp_val = None - elif re.match(r'^\d+$', tmp_val): + elif re.match(r"^\d+$", tmp_val): tmp_val = int(tmp_val) - elif '\n' in tmp_val: - tmp_val = tmp_val.split('\n') + elif "\n" in tmp_val: + tmp_val = tmp_val.split("\n") # remove empty tmp_val = [x.strip() for x in tmp_val if x.strip()] # update dict diff --git a/lib/hgcs/utils.py b/lib/hgcs/utils.py index 83ad876..1d40ec2 100644 --- a/lib/hgcs/utils.py +++ b/lib/hgcs/utils.py @@ -2,10 +2,10 @@ common utilities of HGCS """ -import os -import time import logging +import os import threading +import time try: from threading import get_ident @@ -14,11 +14,12 @@ import htcondor -#=============================================================== +# =============================================================== global_lock = threading.Lock() -#=============================================================== +# =============================================================== + def setup_logger(logger, pid=None, colored=True, to_file=None): """ @@ -29,33 +30,38 @@ def setup_logger(logger, pid=None, colored=True, to_file=None): colored = False else: hdlr = logging.StreamHandler() + def emit_decorator(orig_func): def func(*args): - _fstr = f'[%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s' + _fstr = f"[%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s" format_str = _fstr if colored: levelno = args[0].levelno if levelno >= logging.CRITICAL: - color = '\033[35;1m' + color = "\033[35;1m" elif levelno >= logging.ERROR: - color = '\033[31;1m' + color = "\033[31;1m" elif levelno >= logging.WARNING: - color = '\033[33;1m' + color = "\033[33;1m" elif levelno >= logging.INFO: - color = '\033[32;1m' + color = "\033[32;1m" elif levelno >= logging.DEBUG: - color = '\033[36;1m' + color = "\033[36;1m" else: - color = '\033[0m' - format_str = f'{color}{_fstr}\033[0m' + color = "\033[0m" + format_str = f"{color}{_fstr}\033[0m" formatter = logging.Formatter(format_str) hdlr.setFormatter(formatter) return orig_func(*args) + return func + hdlr.emit = emit_decorator(hdlr.emit) logger.addHandler(hdlr) -#=============================================================== + +# =============================================================== + class ThreadBase(threading.Thread): """ @@ -74,7 +80,7 @@ def get_pid(self): """ get unique thread identifier including process ID (from OS) and thread ID (from python) """ - return f'{self.os_pid}-{get_ident()}' + return f"{self.os_pid}-{get_ident()}" def run(self): """ diff --git a/pkg_info.py b/pkg_info.py index 7d3221b..1b7cbed 100644 --- a/pkg_info.py +++ b/pkg_info.py @@ -1 +1 @@ -release_version = '1.0.0' +release_version = "2.0.0" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..29d29c5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,57 @@ + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "HGCS" +dynamic = ["version"] +description = "HGCS package" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "FaHui Lin , Harvester group", email = "atlas-adc-harvester-central-support@cern.ch" }, +] + +dependencies = [ + 'htcondor >= 10.3.0', +] + +[project.optional-dependencies] +kubernetes = ['kubernetes', 'pyyaml'] +mysql = ['mysqlclient'] +atlasgrid = ['uWSGI >= 2.0.20', 'htcondor >= 10.3.0', 'mysqlclient >= 2.1.1'] + +[project.urls] +Homepage = "https://github.com/PanDAWMS/HGCS" + +[tool.hatch.version] +path = "pkg_info.py" +pattern = "release_version = \"(?P[^\"]+)\"" + +[tool.hatch.build] +directory = "dist" + +[tool.hatch.build.targets.wheel] +packages = ["lib/hgcs"] + +[tool.hatch.build.targets.wheel.shared-data] +"temp/hgcs.cfg.template" = "etc/hgcs/hgcs.cfg.template" +"temp/hgcs.service.template" = "etc/systemd/system/hgcs.service.template" +"temp/logrotate-hgcs" = "etc/logrotate.d/logrotate-hgcs" +"bin" = "bin" + +[tool.hatch.build.targets.sdist] +exclude = [ + ".github", + ".idea", +] + +[tool.black] +line-length=160 + +[tool.isort] +profile = "black" + +[tool.flynt] +line-length = 160 diff --git a/setup.py b/setup.py index b2bf218..5987053 100644 --- a/setup.py +++ b/setup.py @@ -1,46 +1,52 @@ import sys -from setuptools import setup, find_packages -sys.path.insert(0, '.') +from setuptools import find_packages, setup -import pkg_info # noqa: E402 +sys.path.insert(0, ".") +import pkg_info # noqa: E402 setup( - name='hgcs', + name="hgcs", version=pkg_info.release_version, - description='HGCS Package', - long_description='''This package contains HGCS components''', - license='GPL', - author='FaHui Lin , Harvester group', - author_email='atlas-adc-harvester-central-support@cern.ch', - url='https://github.com/PanDAWMS/HGCS', - python_requires='>=3.6', - packages=find_packages(where='lib'), - package_dir = {'': 'lib'}, + description="HGCS Package", + long_description="""This package contains HGCS components""", + license="GPL", + author="FaHui Lin , Harvester group", + author_email="atlas-adc-harvester-central-support@cern.ch", + url="https://github.com/PanDAWMS/HGCS", + python_requires=">=3.6", + packages=find_packages(where="lib"), + package_dir={"": "lib"}, install_requires=[ - 'htcondor >= 9.6.0', - ], - + "htcondor >= 10.3.0", + ], # optional pip dependencies - extras_require={ - }, - + extras_require={}, data_files=[ - # config - ('etc/hgcs', ['temp/hgcs.cfg.template', - ] - ), - # init script - ('etc/systemd/system', ['temp/hgcs.service.template', - ] - ), - # logrotate - ('etc/logrotate.d', ['temp/logrotate-hgcs', - ] - ), - ], - - scripts=['bin/hgcs_master.py', - ] - ) + # config + ( + "etc/hgcs", + [ + "temp/hgcs.cfg.template", + ], + ), + # init script + ( + "etc/systemd/system", + [ + "temp/hgcs.service.template", + ], + ), + # logrotate + ( + "etc/logrotate.d", + [ + "temp/logrotate-hgcs", + ], + ), + ], + scripts=[ + "bin/hgcs_master.py", + ], +) diff --git a/temp/single_script.py b/temp/single_script.py index c0ce748..6512f2f 100644 --- a/temp/single_script.py +++ b/temp/single_script.py @@ -1,61 +1,65 @@ -import os -import sys import errno -import shutil - -import time +import logging +import os import re +import shutil +import sys import threading -import logging +import time try: from threading import get_ident except ImportError: from thread import get_ident +import classad +import htcondor from six import configparser -import htcondor -import classad +# =============================================================== -#=============================================================== def setupLogger(logger, pid=None, colored=True): logger.setLevel(logging.DEBUG) hdlr = logging.StreamHandler() + def emit_decorator(fn): def func(*args): if colored: levelno = args[0].levelno - if (levelno >= logging.CRITICAL): - color = '\033[35;1m' - elif (levelno >= logging.ERROR): - color = '\033[31;1m' - elif (levelno >= logging.WARNING): - color = '\033[33;1m' - elif (levelno >= logging.INFO): - color = '\033[32;1m' - elif (levelno >= logging.DEBUG): - color = '\033[36;1m' + if levelno >= logging.CRITICAL: + color = "\033[35;1m" + elif levelno >= logging.ERROR: + color = "\033[31;1m" + elif levelno >= logging.WARNING: + color = "\033[33;1m" + elif levelno >= logging.INFO: + color = "\033[32;1m" + elif levelno >= logging.DEBUG: + color = "\033[36;1m" else: - color = '\033[0m' + color = "\033[0m" # formatter = logging.Formatter('{0}%(asctime)s %(levelname)s in %(filename)s:%(funcName)s:%(lineno)d [%(message)s]\033[0m'.format(color)) - formatter = logging.Formatter('{0}[%(asctime)s %(levelname)s]({1})(%(name)s.%(funcName)s) %(message)s\033[0m'.format(color, pid)) + formatter = logging.Formatter(f"{color}[%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s\x1b[0m") else: - formatter = logging.Formatter('%(asctime)s %(levelname)s]({0})(%(name)s.%(funcName)s) %(message)s'.format(pid)) + formatter = logging.Formatter(f"%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s") hdlr.setFormatter(formatter) return fn(*args) + return func + hdlr.emit = emit_decorator(hdlr.emit) logger.addHandler(hdlr) def get_condor_job_id(job): - cluster_id = job.get('ClusterId') - proc_id = job.get('ProcId') - return '{0}.{1}'.format(cluster_id, proc_id) + cluster_id = job.get("ClusterId") + proc_id = job.get("ProcId") + return f"{cluster_id}.{proc_id}" + + +# =============================================================== -#=============================================================== class ThreadBase(threading.Thread): def __init__(self): @@ -67,11 +71,12 @@ def __init__(self): @property def get_pid(self): - return '{0}-{1}'.format(self.os_pid, get_ident()) + return f"{self.os_pid}-{get_ident()}" class MySchedd(htcondor.Schedd): __instance = None + def __new__(cls, *args, **kwargs): if not isinstance(cls.__instance, cls): cls.__instance = super(MySchedd, cls).__new__(cls, *args, **kwargs) @@ -79,36 +84,36 @@ def __new__(cls, *args, **kwargs): class LogRetriever(ThreadBase): - projection = [ - 'ClusterId', 'ProcId', 'JobStatus', - 'Iwd', 'Err', 'Out', 'UserLog', - 'SUBMIT_UserLog', 'SUBMIT_TransferOutputRemaps', - ] - - requirements = ( - 'isString(SUBMIT_UserLog) ' - '&& LeaveJobInQueue isnt false ' - '&& ( JobStatus == 4 ' - '|| JobStatus == 3 ) ' - ) - - def __init__(self, retrieve_mode='copy', sleep_period=60, flush_period=86400): + "ClusterId", + "ProcId", + "JobStatus", + "Iwd", + "Err", + "Out", + "UserLog", + "SUBMIT_UserLog", + "SUBMIT_TransferOutputRemaps", + ] + + requirements = "isString(SUBMIT_UserLog) " "&& LeaveJobInQueue isnt false " "&& ( JobStatus == 4 " "|| JobStatus == 3 ) " + + def __init__(self, retrieve_mode="copy", sleep_period=60, flush_period=86400): ThreadBase.__init__(self) self.retrieve_mode = retrieve_mode self.sleep_period = sleep_period self.flush_period = flush_period def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) + self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: - self.logger.info('run starts') + self.logger.info("run starts") if time.time() > last_flush_timestamp + self.flush_period: last_flush_timestamp = time.time() already_handled_job_id_set = set() - self.logger.info('flushed already_handled_job_id_set') + self.logger.info("flushed already_handled_job_id_set") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -116,60 +121,59 @@ def run(self): break except RuntimeError as e: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f"{e} . Retry...") time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f"{e} . No more retry. Exit") return - for job in schedd.xquery(constraint=self.requirements, - projection=self.projection): + for job in schedd.xquery(constraint=self.requirements, projection=self.projection): job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug('to retrieve for condor job {0}'.format(job_id)) - if self.retrieve_mode == 'symlink': + self.logger.debug(f"to retrieve for condor job {job_id}") + if self.retrieve_mode == "symlink": self.via_system(job, symlink_mode=True) - elif self.retrieve_mode == 'copy': + elif self.retrieve_mode == "copy": retVal = self.via_system(job) if retVal: already_handled_job_id_set.add(job_id) - elif self.retrieve_mode == 'condor': + elif self.retrieve_mode == "condor": self.via_condor_retrieve(job) n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(already_handled_job_id_set), 'LeaveJobInQueue', 'false') + schedd.edit(list(already_handled_job_id_set), "LeaveJobInQueue", "false") except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.clear() break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) def via_system(self, job, symlink_mode=False): retVal = True job_id = get_condor_job_id(job) - src_dir = job.get('Iwd') - src_err_name = job.get('Err') - src_out_name = job.get('Out') - src_log_name = job.get('UserLog') + src_dir = job.get("Iwd") + src_err_name = job.get("Err") + src_out_name = job.get("Out") + src_log_name = job.get("UserLog") src_err = os.path.join(src_dir, src_err_name) src_out = os.path.join(src_dir, src_out_name) src_log = os.path.join(src_dir, src_log_name) dest_err = None dest_out = None - dest_log = job.get('SUBMIT_UserLog') - transfer_remap_list = str(job.get('SUBMIT_TransferOutputRemaps')).split(';') + dest_log = job.get("SUBMIT_UserLog") + transfer_remap_list = str(job.get("SUBMIT_TransferOutputRemaps")).split(";") if not dest_log: - self.logger.debug('{0} has no attribute of spool. Skipped...'.format(job_id)) + self.logger.debug(f"{job_id} has no attribute of spool. Skipped...") return True for _m in transfer_remap_list: - match = re.search('([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)', _m) + match = re.search("([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)", _m) if match: name = match.group(1) dest_path = os.path.normpath(match.group(2)) @@ -181,33 +185,33 @@ def via_system(self, job, symlink_mode=False): dest_err = dest_path for src_path, dest_path in zip([src_err, src_out, src_log], [dest_err, dest_out, dest_log]): if not os.path.isfile(src_path) or os.path.islink(src_path): - if job.get('JobStatus') != 4: + if job.get("JobStatus") != 4: continue retVal = False - self.logger.error('{0} is not a regular file. Skipped...'.format(src_path)) + self.logger.error(f"{src_path} is not a regular file. Skipped...") continue if not dest_path: retVal = False - self.logger.error('no destination path for {0} . Skipped...'.format(src_path)) + self.logger.error(f"no destination path for {src_path} . Skipped...") continue try: if symlink_mode: os.symlink(src_path, dest_path) if os.path.islink(dest_path): - self.logger.debug('{0} symlink made'.format(dest_path)) + self.logger.debug(f"{dest_path} symlink made") else: retVal = False - self.logger.error('{0} made but not found'.format(dest_path)) + self.logger.error(f"{dest_path} made but not found") else: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): - self.logger.debug('{0} copy made'.format(dest_path)) + self.logger.debug(f"{dest_path} copy made") else: retVal = False - self.logger.error('{0} made but not found'.format(dest_path)) + self.logger.error(f"{dest_path} made but not found") except OSError as e: if e.errno == errno.EEXIST: - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + self.logger.debug(f"{dest_path} file already exists. Skipped...") else: retVal = False self.logger.error(e) @@ -221,15 +225,8 @@ def via_condor_retrieve(self, job): class CleanupDelayer(ThreadBase): - - requirements = ( - 'SUBMIT_UserLog is undefined ' - '&& LeaveJobInQueue is false ' - '&& ( member(JobStatus, {1,2,5,6,7}) )' - ) - ad_LeaveJobInQueue_template = ( - '( time() - EnteredCurrentStatus ) < {delay_time} ' - ) + requirements = "SUBMIT_UserLog is undefined " "&& LeaveJobInQueue is false " "&& ( member(JobStatus, {1,2,5,6,7}) )" + ad_LeaveJobInQueue_template = "( time() - EnteredCurrentStatus ) < {delay_time} " def __init__(self, sleep_period=60, delay_time=7200): ThreadBase.__init__(self) @@ -237,9 +234,9 @@ def __init__(self, sleep_period=60, delay_time=7200): self.delay_time = delay_time def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) + self.logger.debug(f"startTimestamp: {self.start_timestamp}") while True: - self.logger.info('run starts') + self.logger.info("run starts") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -247,46 +244,45 @@ def run(self): break except RuntimeError as e: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f"{e} . Retry...") time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f"{e} . No more retry. Exit") return # for job in schedd.xquery(constraint=self.requirements): # job_id = get_condor_job_id(job) # self.logger.debug('to adjust LeaveJobInQueue of condor job {0}'.format(job_id)) - job_id_list = [ get_condor_job_id(job) for job in schedd.xquery(constraint=self.requirements) ] + job_id_list = [get_condor_job_id(job) for job in schedd.xquery(constraint=self.requirements)] n_jobs = len(job_id_list) n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(job_id_list, 'LeaveJobInQueue', - self.ad_LeaveJobInQueue_template.format(delay_time=self.delay_time)) + schedd.edit(job_id_list, "LeaveJobInQueue", self.ad_LeaveJobInQueue_template.format(delay_time=self.delay_time)) except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit {0} jobs . Retry: {1}'.format(n_jobs, i_try)) + self.logger.warning(f"failed to edit {n_jobs} jobs . Retry: {i_try}") time.sleep(1) else: - self.logger.warning('failed to edit {0} jobs . Skipped...'.format(n_jobs)) + self.logger.warning(f"failed to edit {n_jobs} jobs . Skipped...") else: - self.logger.debug('adjusted LeaveJobInQueue of {0} condor jobs '.format(n_jobs)) + self.logger.debug(f"adjusted LeaveJobInQueue of {n_jobs} condor jobs ") break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) class SDFFetcher(ThreadBase): - projection = [ - 'ClusterId', 'ProcId', 'JobStatus', - 'UserLog', 'SUBMIT_UserLog', - 'sdfPath', 'sdfCopied', - ] + "ClusterId", + "ProcId", + "JobStatus", + "UserLog", + "SUBMIT_UserLog", + "sdfPath", + "sdfCopied", + ] - requirements = ( - 'sdfCopied == 0 ' - '&& isString(sdfPath) ' - ) + requirements = "sdfCopied == 0 " "&& isString(sdfPath) " limit = 6000 @@ -296,15 +292,15 @@ def __init__(self, sleep_period=60, flush_period=86400): self.flush_period = flush_period def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) + self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: - self.logger.info('run starts') + self.logger.info("run starts") if time.time() > last_flush_timestamp + self.flush_period: last_flush_timestamp = time.time() already_handled_job_id_set = set() - self.logger.info('flushed already_handled_job_id_set') + self.logger.info("flushed already_handled_job_id_set") n_try = 999 for i_try in range(1, n_try + 1): try: @@ -312,40 +308,38 @@ def run(self): break except RuntimeError as e: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f"{e} . Retry...") time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f"{e} . No more retry. Exit") return already_sdf_copied_job_id_set = set() failed_and_to_skip_sdf_copied_job_id_set = set() try: - jobs_iter = schedd.xquery(constraint=self.requirements, - projection=self.projection, - limit=self.limit) + jobs_iter = schedd.xquery(constraint=self.requirements, projection=self.projection, limit=self.limit) for job in jobs_iter: job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug('to copy sdf for condor job {0}'.format(job_id)) + self.logger.debug(f"to copy sdf for condor job {job_id}") retVal = self.via_system(job) if retVal is True: already_sdf_copied_job_id_set.add(job_id) elif retVal is False: failed_and_to_skip_sdf_copied_job_id_set.add(job_id) except RuntimeError as e: - self.logger.error('Failed to query jobs. Exit. RuntimeError: {0} '.format(e)) + self.logger.error(f"Failed to query jobs. Exit. RuntimeError: {e} ") else: n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(already_sdf_copied_job_id_set), 'sdfCopied', '1') + schedd.edit(list(already_sdf_copied_job_id_set), "sdfCopied", "1") except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.update(already_sdf_copied_job_id_set) already_sdf_copied_job_id_set.clear() @@ -353,54 +347,54 @@ def run(self): n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(failed_and_to_skip_sdf_copied_job_id_set), 'sdfCopied', '2') + schedd.edit(list(failed_and_to_skip_sdf_copied_job_id_set), "sdfCopied", "2") except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f"failed to edit job {job_id} . Retry: {i_try}") time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f"failed to edit job {job_id} . Skipped...") else: already_handled_job_id_set.update(failed_and_to_skip_sdf_copied_job_id_set) failed_and_to_skip_sdf_copied_job_id_set.clear() break - self.logger.info('run ends') + self.logger.info("run ends") time.sleep(self.sleep_period) def via_system(self, job): retVal = True job_id = get_condor_job_id(job) - src_path = job.get('sdfPath') - dest_log = job.get('SUBMIT_UserLog') + src_path = job.get("sdfPath") + dest_log = job.get("SUBMIT_UserLog") if not dest_log: - dest_log = job.get('UserLog') + dest_log = job.get("UserLog") if not dest_log: - self.logger.debug('{0} has no valid SUBMIT_UserLog nor UserLog. Skipped...'.format(job_id)) + self.logger.debug(f"{job_id} has no valid SUBMIT_UserLog nor UserLog. Skipped...") return True dest_dir = os.path.dirname(dest_log) - dest_filename = re.sub(r'.log$', '.jdl', os.path.basename(dest_log)) + dest_filename = re.sub(r".log$", ".jdl", os.path.basename(dest_log)) dest_path = os.path.normpath(os.path.join(dest_dir, dest_filename)) if not os.path.isfile(src_path): retVal = False - self.logger.error('{0} is not a regular file. Skipped...'.format(src_path)) + self.logger.error(f"{src_path} is not a regular file. Skipped...") if not dest_path: retVal = False - self.logger.error('no destination path for {0} . Skipped...'.format(src_path)) + self.logger.error(f"no destination path for {src_path} . Skipped...") if retVal is True: if os.path.isfile(dest_path): - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + self.logger.debug(f"{dest_path} file already exists. Skipped...") return True try: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): os.chmod(dest_path, 0o644) - self.logger.debug('{0} copy made'.format(dest_path)) + self.logger.debug(f"{dest_path} copy made") else: retVal = None - self.logger.error('{0} made but not found'.format(dest_path)) + self.logger.error(f"{dest_path} made but not found") except OSError as e: if e.errno == errno.EEXIST: - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + self.logger.debug(f"{dest_path} file already exists. Skipped...") else: retVal = None self.logger.error(e) @@ -429,7 +423,7 @@ def testing(): thread_list.append(LogRetriever(sleep_period=sleep_period)) # thread_list.append(CleanupDelayer(sleep_period=sleep_period)) thread_list.append(SDFFetcher(sleep_period=sleep_period)) - [ thr.start() for thr in thread_list ] + [thr.start() for thr in thread_list] def main(): @@ -439,6 +433,6 @@ def main(): pass -if __name__ == '__main__': +if __name__ == "__main__": testing() # main()