From b262504f41c0794a6cb99a4ac0d19341fb2a0af3 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 29 May 2024 15:36:32 +0200 Subject: [PATCH 1/5] New version --- PILOTVERSION | 2 +- pilot/util/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 66566d18..fd154cfd 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.6.8 \ No newline at end of file +3.7.7.1 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 528081f7..564e6336 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '1' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 725e7f54d0e9a6e7136e1f4f3a5b9283255f1c19 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 29 May 2024 16:26:22 +0200 Subject: [PATCH 2/5] Added cvmfs diagnostics --- pilot.py | 2 ++ pilot/user/atlas/cvmfs.py | 12 ++++++++++++ pilot/util/cvmfs.py | 25 +++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/pilot.py b/pilot.py index 20a3c47d..3bb549ac 100755 --- a/pilot.py +++ b/pilot.py @@ -54,6 +54,7 @@ PILOT_MULTIJOB_START_TIME, ) from pilot.util.cvmfs import ( + cvmfs_diagnostics, is_cvmfs_available, get_last_update ) @@ -123,6 +124,7 @@ def main() -> int: # check cvmfs if available ec = check_cvmfs(logger) if ec: + cvmfs_diagnostics() return ec if not args.rucio_host: diff --git a/pilot/user/atlas/cvmfs.py b/pilot/user/atlas/cvmfs.py index 03568b11..9e6e79d2 100644 --- a/pilot/user/atlas/cvmfs.py +++ b/pilot/user/atlas/cvmfs.py @@ -51,3 +51,15 @@ def get_last_update_file() -> str: :return: last update file (str). """ return f'{get_cvmfs_base_path()}/sft.cern.ch/lcg/lastUpdate' + + +def get_cvmfs_diagnostics_commands() -> list: + """ + Return a list of commands to be used for CVMFS diagnostics. + + :return: list of commands (list). + """ + return [ + 'cvmfs_config stat atlas.cern.ch', + f'attr -g revision {get_cvmfs_base_path()}/atlas.cern.ch' + ] diff --git a/pilot/util/cvmfs.py b/pilot/util/cvmfs.py index b2075d42..9358a710 100644 --- a/pilot/util/cvmfs.py +++ b/pilot/util/cvmfs.py @@ -27,6 +27,8 @@ import time import types +from pilot.util.container import execute + logger = logging.getLogger(__name__) @@ -149,3 +151,26 @@ def extract_timestamp(filename: str) -> int: signal.alarm(0) # Disable the alarm return timestamp + + +def cvmfs_diagnostics(): + """Run cvmfs_diagnostics.""" + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + user = __import__(f'pilot.user.{pilot_user}.cvmfs', globals(), locals(), [pilot_user], 0) + try: + cmds = user.get_cvmfs_diagnostics_commands() + except AttributeError: + logger.warning('get_cvmfs_diagnostics_commands not defined in user cvmfs module') + return + + if cmds: + for cmd in cmds: + timeout = 60 + logger.info(f'running cvmfs diagnostics command using timeout={timeout}s') + exit_code, stdout, stderr = execute(cmd, timeout=timeout) + if exit_code == 0: + logger.info(f'cvmfs diagnostics completed successfully:\n{stdout}') + else: + logger.warning(f'cvmfs diagnostics failed: {stderr}') + else: + logger.warning('cvmfs diagnostics commands not defined in user cvmfs module') From d76379f8278b64d62affe49fccc50c54cc39824c Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 7 Jun 2024 10:02:42 +0200 Subject: [PATCH 3/5] Comment update. Merged with Eddies commit --- pilot/eventservice/esprocess/esmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/eventservice/esprocess/esmanager.py b/pilot/eventservice/esprocess/esmanager.py index 0c3b868c..53cf4a28 100644 --- a/pilot/eventservice/esprocess/esmanager.py +++ b/pilot/eventservice/esprocess/esmanager.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 """Event Service manager to set up and run ESProcess.""" From 620ef2bf5c474e8b651a1922da225c9fcf0ccc45 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 11 Jun 2024 10:46:21 +0200 Subject: [PATCH 4/5] Updated version number after merge --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index fd154cfd..34fbe1b6 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.7.1 \ No newline at end of file +3.7.7.2 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 564e6336..990f5526 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '2' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 5feb7cef4c93b85669385dd7124a6c7a8ed8a103 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 11 Jun 2024 14:57:01 +0200 Subject: [PATCH 5/5] Pylint changes --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/realtimelogger.py | 235 +++++++++++++++++++---------------- 3 files changed, 133 insertions(+), 106 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 34fbe1b6..2df33a99 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.7.2 \ No newline at end of file +3.7.7.3 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 990f5526..f73f723d 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2' # build number should be reset to '1' for every new development cycle +BUILD = '3' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index b39d6098..d7514545 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -18,17 +18,20 @@ # # Authors: # - Shuwei Ye, yesw@bnl.gov, 2021 -# - Paul Nilsson, paul.nilsson@cern.ch, 2021-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2021-24 +# - Wen Guan, wen.guan@cern.ch, 2024 + +"""Real-time logger.""" -import os -import time import json -from pilot.util.config import config -from pilot.util.https import cacert -# from pilot.util.proxy import create_cert_files -from pilot.util.transport import HttpTransport -from logging import Logger, INFO import logging +import os +import time + +try: + from fluent import handler as fluent_handler +except ImportError: + pass try: import google.cloud.logging @@ -36,31 +39,46 @@ except ImportError: pass +try: + from logstash_async.handler import AsynchronousLogstashHandler +except ImportError: + pass + +try: + from loki_logger_handler.loki_logger_handler import LokiLoggerHandler +except ImportError: + pass + +from typing import Any + +from pilot.util.config import config +from pilot.util.https import cacert +# from pilot.util.proxy import create_cert_files +from pilot.util.transport import HttpTransport + logger = logging.getLogger(__name__) -def get_realtime_logger(args=None, info_dic=None, workdir=None, secrets=""): +def get_realtime_logger(args: Any = None, info_dic: dict = None, workdir: str = None, secrets: str = ""): """ Helper function for real-time logger. The info_dic dictionary has the format: {'logging_type': .., 'protocol': .., 'url': .., 'port': .., 'logname': ..} - :param args: pilot arguments object. - :param info_dic: info dictionary. - :param workdir: job working directory (string). + :param args: pilot arguments object (Any) + :param info_dic: info dictionary (dict) + :param workdir: job working directory (str) + :param secrets: secrets (str) :return: RealTimeLogger instance (self). """ - if RealTimeLogger.glogger is None: RealTimeLogger(args, info_dic, workdir, secrets) + return RealTimeLogger.glogger def cleanup(): - """ - Clean-up function for external use. - """ - + """Clean-up function for external use.""" logger.debug('attempting real-time logger cleanup') if RealTimeLogger.glogger: RealTimeLogger.glogger.cleanup() @@ -68,7 +86,7 @@ def cleanup(): # RealTimeLogger is called if args.realtimelogger is on -class RealTimeLogger(Logger): +class RealTimeLogger(logging.Logger): """ RealTimeLogger class definition. """ @@ -81,7 +99,7 @@ class RealTimeLogger(Logger): _cacert = "" current_handler = None # needed for removing logger object from outside function - def __init__(self, args, info_dic, workdir, secrets, level=INFO): + def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: Any = logging.INFO): """ Default init function. @@ -92,16 +110,16 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): 'logname': .., 'logfiles': [..]} - :param args: pilot arguments object. - :param info_dic: info dictionary. - :param workdir: job working directory (string). - :param level: logging level (constant). - :return: + :param args: pilot arguments object (Any) + :param info_dic: info dictionary (dict) + :param workdir: job working directory (str) + :param level: logging level (Any). """ - - super(RealTimeLogger, self).__init__(name="realTimeLogger", level=level) + super().__init__(name="realTimeLogger", level=level) RealTimeLogger.glogger = self + if workdir: # bypass pylint warning - keep workdir for possible future development + pass if not info_dic: logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)') RealTimeLogger.glogger = None @@ -125,67 +143,61 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): _handler = None - try: - if logtype == "google-cloud-logging": - client = google.cloud.logging.Client() - _handler = CloudLoggingHandler(client, name=name) - api_logger = logging.getLogger('google.cloud.logging_v2') - api_logger.setLevel(INFO) - elif logtype == "fluent": - from fluent import handler - _handler = handler.FluentHandler(name, host=server, port=port) - elif logtype == "logstash": - # from logstash_async.transport import HttpTransport - from logstash_async.handler import AsynchronousLogstashHandler - # from logstash_async.handler import LogstashFormatter - - # certificate method (still in development): - - #certdir = os.environ.get('SSL_CERT_DIR', '') - #path = os.path.join(certdir, "CERN-GridCA.pem") - #crt, key = create_cert_files(workdir) - #if not crt or not key: - # logger.warning('failed to create crt/key') - # _handler = None - # return - #transport = HttpTransport( - # server, - # port, - # timeout=5.0, - # ssl_enable=True, - # ssl_verify=path, - # cert=(crt, key) - #) - - # login+password method: - if isinstance(secrets, str): - secrets = json.loads(secrets) - - ssl_enable, ssl_verify = self.get_rtlogging_ssl() - transport = HttpTransport( - server, - port, - ssl_enable=ssl_enable, - ssl_verify=ssl_verify, - timeout=5.0, - username=secrets.get('logstash_login', 'unknown_login'), - password=secrets.get('logstash_password', 'unknown_password') - ) - - # create the handler - _handler = AsynchronousLogstashHandler( - server, - port, - transport=transport, - database_path='logstash_test.db' - ) - elif logtype == 'loki': - _handler = self.setup_loki_handler() - else: - logger.warning(f'unknown logtype: {logtype}') - _handler = None - except (ModuleNotFoundError, ImportError) as exc: - logger.warning(f'exception caught while setting up log handlers: {exc}') + if logtype == "google-cloud-logging": + client = google.cloud.logging.Client() + _handler = CloudLoggingHandler(client, name=name) + api_logger = logging.getLogger('google.cloud.logging_v2') + api_logger.setLevel(logger.INFO) + elif logtype == "fluent": + _handler = fluent_handler.FluentHandler(name, host=server, port=port) + elif logtype == "logstash": + # from logstash_async.transport import HttpTransport + # from logstash_async.handler import LogstashFormatter + + # certificate method (still in development): + + #certdir = os.environ.get('SSL_CERT_DIR', '') + #path = os.path.join(certdir, "CERN-GridCA.pem") + #crt, key = create_cert_files(workdir) + #if not crt or not key: + # logger.warning('failed to create crt/key') + # _handler = None + # return + #transport = HttpTransport( + # server, + # port, + # timeout=5.0, + # ssl_enable=True, + # ssl_verify=path, + # cert=(crt, key) + #) + + # login+password method: + if isinstance(secrets, str): + secrets = json.loads(secrets) + + ssl_enable, ssl_verify = self.get_rtlogging_ssl() + transport = HttpTransport( + server, + port, + ssl_enable=ssl_enable, + ssl_verify=ssl_verify, + timeout=5.0, + username=secrets.get('logstash_login', 'unknown_login'), + password=secrets.get('logstash_password', 'unknown_password') + ) + + # create the handler + _handler = AsynchronousLogstashHandler( + server, + port, + transport=transport, + database_path='logstash_test.db' + ) + elif logtype == 'loki': + _handler = self.setup_loki_handler() + else: + logger.warning(f'unknown logtype: {logtype}') _handler = None if _handler is not None: @@ -196,8 +208,7 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): del self def setup_loki_handler(self): - from loki_logger_handler.loki_logger_handler import LokiLoggerHandler - + """Setup the Loki logger handler.""" loki_labels = {'application': 'PanDA_Pilot', 'envirnment': 'Production'} try: labels = os.environ.get('LOKI_LABELS', {}) @@ -231,10 +242,7 @@ def setup_loki_handler(self): return _handler def cleanup(self): - """ - Clean-up. - """ - + """Clean-up.""" # close open files, if anything is still open self.close_files() @@ -247,7 +255,12 @@ def cleanup(self): RealTimeLogger.glogger = None del self - def set_jobinfo(self, job): + def set_jobinfo(self, job: Any): + """ + Set job info. + + :param job: job object (Any). + """ self.jobinfo = {"TaskID": job.taskid, "PandaJobID": job.jobid} if 'HARVESTER_WORKER_ID' in os.environ: self.jobinfo["Harvester_WorkerID"] = os.environ.get('HARVESTER_WORKER_ID') @@ -270,7 +283,13 @@ def send_with_jobinfo(self, msg): self.info(logobj) - def add_logfiles(self, job_or_filenames, reset=True): + def add_logfiles(self, job_or_filenames: Any or list, reset: bool = True): + """ + Add log files. + + :param job_or_filenames: job object or list of log file names (Any or list) + :param reset: reset the log files (bool). + """ self.close_files() if reset: self.logfiles = [] @@ -292,6 +311,7 @@ def add_logfiles(self, job_or_filenames, reset=True): logger.info(f'added log files: {self.logfiles}') def close_files(self): + """Close files.""" for openfile in self.openfiles.values(): if openfile is not None: openfile.close() @@ -299,13 +319,20 @@ def close_files(self): self.logfiles = [] def send_loginfiles(self): + """Send log files.""" for openfile in self.openfiles.values(): if openfile is not None: lines = openfile.readlines() for line in lines: self.send_with_jobinfo(line.strip()) - def sending_logs(self, args, job): + def sending_logs(self, args: Any, job: Any): + """ + Send logs. + + :param args: pilot arguments object (Any) + :param job: job object (Any). + """ logger.info('starting RealTimeLogger.sending_logs') self.set_jobinfo(job) self.add_logfiles(job) @@ -317,21 +344,21 @@ def sending_logs(self, args, job): if i % 10 == 1: logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})') # there might be special cases when RT logs should be sent, e.g. for pilot logs - if job.state == '' or job.state == 'starting' or job.state == 'running': + if job.state in {'', 'starting', 'running'}: if len(self.logfiles) > len(self.openfiles): for logfile in self.logfiles: if logfile not in self.openfiles: if os.path.exists(logfile): - openfile = open(logfile) - openfile.seek(0) - self.openfiles[logfile] = openfile - logger.debug(f'opened logfile: {logfile}') + openfile = open(logfile, encoding='utf-8') + if openfile: + openfile.seek(0) + self.openfiles[logfile] = openfile + logger.debug(f'opened logfile: {logfile}') # logger.debug(f'real-time logging: sending logs for state={job.state} [1]') self.send_loginfiles() - elif job.state == 'stagein' or job.state == 'stageout': + elif job.state in {'stagein', 'stageout'}: logger.debug('no real-time logging during stage-in/out') - pass else: # run longer for pilotlog # wait for job.completed=True, for a maximum of N minutes @@ -361,7 +388,7 @@ def get_rtlogging_ssl(self): pilot_user = os.environ.get('PILOT_USER', 'generic').lower() try: - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) + user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) ssl_enable, ssl_verify = user.get_rtlogging_ssl() except Exception: ssl_enable = config.Pilot.ssl_enable