diff --git a/Dockerfile b/Dockerfile index 054a8910..81a3757f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -157,6 +157,7 @@ RUN ln -fs /opt/idds/config/idds/supervisord_idds.ini /etc/supervisord.d/idds.in RUN ln -fs /opt/idds/config/idds/supervisord_httpd.ini /etc/supervisord.d/httpd.ini # RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini RUN ln -fs /opt/idds/config/idds/supervisord_logrotate.ini /etc/supervisord.d/logrotate.ini +RUN ln -fs /opt/idds/config/idds/supervisord_healthmonitor.ini /etc/supervisord.d/healthmonitor.ini RUN ln -fs /opt/idds/config/idds/logrotate_idds /etc/logrotate.d/idds # for syslog-ng diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index bb611ec8..25fa512a 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -59,7 +59,7 @@ def get_log_dir(): return "/var/log/idds" -def setup_logging(name, stream=None, loglevel=None): +def setup_logging(name, stream=None, log_file=None, loglevel=None): """ Setup logging """ @@ -79,14 +79,33 @@ def setup_logging(name, stream=None, loglevel=None): loglevel = loglevel.upper() loglevel = getattr(logging, loglevel) - if stream is None: + if log_file is not None: + if not log_file.startswith("/"): + logdir = None + if config_has_section('common') and config_has_option('common', 'logdir'): + logdir = config_get('common', 'logdir') + if not logdir: + logdir = '/var/log/idds' + log_file = os.path.join(logdir, log_file) + + if log_file: + logging.basicConfig(filename=log_file, + level=loglevel, + format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') + elif stream is None: if os.environ.get('IDDS_LOG_FILE', None): idds_log_file = os.environ.get('IDDS_LOG_FILE', None) logging.basicConfig(filename=idds_log_file, level=loglevel, format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') - elif config_has_section('common') and config_has_option('common', 'logdir'): - logging.basicConfig(filename=os.path.join(config_get('common', 'logdir'), name), + elif ((config_has_section('common') and config_has_option('common', 'logdir') and config_has_option('common', 'logfile')) or log_file): + if log_file: + log_filename = log_file + else: + log_filename = config_get('common', 'logfile') + if not log_filename.startswith("/"): + log_filename = os.path.join(config_get('common', 'logdir'), log_filename) + logging.basicConfig(filename=log_filename, level=loglevel, format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') else: diff --git a/main/config_default/healthmonitor_daemon b/main/config_default/healthmonitor_daemon new file mode 100755 index 00000000..39acb949 --- /dev/null +++ b/main/config_default/healthmonitor_daemon @@ -0,0 +1,4 @@ +#!/bin/bash +# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds; sleep 86400; done + +while true; do python /opt/idds/config/idds/idds_health_check.py; sleep 600; done diff --git a/main/config_default/idds_health_check.py b/main/config_default/idds_health_check.py new file mode 100644 index 00000000..f6002103 --- /dev/null +++ b/main/config_default/idds_health_check.py @@ -0,0 +1,223 @@ +#!/usr/bin/python + +""" +check iDDS health +""" + +import json +import os +import re +import subprocess +import time + + +def check_command(command, check_string): + print("Checking command : {0}".format(command)) + print("For string : {0}".format(check_string)) + + tmp_array = command.split() + output = ( + subprocess.Popen(tmp_array, stdout=subprocess.PIPE) + .communicate()[0] + .decode("ascii") + ) + + if re.search(check_string, output): + print("Found the string, return 100") + return 100 + else: + print("String not found, return 0") + return 0 + + +def is_logrotate_running(): + # get the count of logrotate processes - if >=1 then logrotate is running + output = ( + subprocess.Popen( + "ps -eo pgid,args | grep logrotate | grep -v grep | wc -l", + stdout=subprocess.PIPE, + shell=True, + ) + .communicate()[0] + .decode("ascii") + ) + + try: + cleaned_output = output.strip() + n_logrotate_processes = int(cleaned_output) + except ValueError: + print( + "The string has an unexpected format and couldn't be converted to an integer." + ) + + # logrotate process found + if n_logrotate_processes >= 1: + print("Logrotate is running") + return True + + return False + + +def is_restarting(): + # get the count of logrotate processes - if >=1 then logrotate is running + output = ( + subprocess.Popen( + "ps -eo pgid,args | grep restart|grep http | grep -v grep | wc -l", + stdout=subprocess.PIPE, + shell=True, + ) + .communicate()[0] + .decode("ascii") + ) + + try: + cleaned_output = output.strip() + n_restarting_processes = int(cleaned_output) + except ValueError: + print( + "The string has an unexpected format and couldn't be converted to an integer." + ) + + # logrotate process found + if n_restarting_processes >= 1: + print("http is restarting") + return True + + return False + + +def http_availability(host): + # check the http + avail = 0 + if os.environ.get('X509_USER_PROXY', None): + curl = "curl -i -k --cert $X509_USER_PROXY --key $X509_USER_PROXY --cacert $X509_USER_PROXY https://%s:8443/idds/ping" % host + avail = check_command(curl, '"Status": "OK"') + print("http check availability (with proxy): %s" % avail) + elif os.environ.get('PANDA_AUTH', None) and os.environ.get('PANDA_AUTH_VO', None) and os.environ.get('PANDA_AUTH_ID_TOKEN', None): + curl = "curl -i -k -H \"X-IDDS-Auth-Type: ${PANDA_AUTH}\" -H \"X-IDDS-Auth-VO: ${PANDA_AUTH_VO}\" -H \"X-Idds-Auth-Token: ${PANDA_AUTH_ID_TOKEN}\" https://%s:8443/idds/ping" % host + avail = check_command(curl, '"Status": "OK"') + print("http check availability (with oidc token): %s" % avail) + if not avail or avail == 0: + curl = "curl -i -k https://%s:8443/idds/ping" % host + avail = check_command(curl, 'IDDSException') + print("http check availability (without proxy): %s" % avail) + + if not avail or avail == 0: + logrotate_running = is_logrotate_running() + restarting = is_restarting() + if logrotate_running and restarting: + print("log rotation is running and http is restarting") + return 1 + return avail + + +def process_availability(): + # check the http + process_avail = 0 + output = ( + subprocess.Popen( + "ps -eo pgid,args | grep 'idds/agents/main.py' | grep -v grep | uniq", + stdout=subprocess.PIPE, + shell=True, + ) + .communicate()[0] + .decode("ascii") + ) + count = 0 + for line in output.split("\n"): + line = line.strip() + if line == "": + continue + count += 1 + if count >= 1: + process_avail = 100 + + print("agent process check availability: %s" % process_avail) + return process_avail + + +def heartbeat_availability(log_location): + avail = 100 + hang_workers = 0 + heartbeat_file = os.path.join(log_location, 'idds_availability') + if not os.path.exists(heartbeat_file): + avail = 0 + print("idds_heartbeat at %s not exist, avail: %s" % (heartbeat_file, avail)) + return avail, hang_workers + + mod_time = os.path.getmtime(heartbeat_file) + print("idds_heartbeat updated at %s (currently is %s, %s seconds ago)" % (mod_time, time.time(), time.time() - mod_time)) + if mod_time < time.time() - 1800: + avail = 0 + return avail, hang_workers + + try: + with open(heartbeat_file, 'r') as f: + d = json.load(f) + for agent in d: + info = d[agent] + num_hang_workers = info['num_hang_workers'] + num_active_workers = info['num_active_workers'] + if num_active_workers > 0 and num_hang_workers > 0: + hang_workers += num_hang_workers + agent_avail = int(num_hang_workers * 100 / num_active_workers) + if agent_avail < avail: + avail = agent_avail + print("iDDS agent %s has % hang workers" % num_hang_workers) + except Exception as ex: + print("Failed to parse idds_heartbeat: %s" % str(ex)) + avail = 50 + + return avail, hang_workers + + +def idds_availability(host, log_location): + infos = {} + http_avail = http_availability(host) + print(f"http avail: {http_avail}") + + process_avail = process_availability() + print(f"agent daemon avail: {process_avail}") + + heartbeat_avail, hang_workers = heartbeat_availability(log_location) + print(f"heartbeat avail: {heartbeat_avail}, hang workers: {hang_workers}") + infos['num_hang_workers'] = hang_workers + + if not http_avail: + availability = 0 + avail_info = "iDDS http rest service is not running" + elif not process_avail: + availability = 50 + avail_info = "iDDS agents are not running" + else: + if not heartbeat_avail: + availability = 50 + avail_info = "iDDS agents are running. However heartbeat file is not found (or not renewed)" + elif heartbeat_avail < 100: + availability = heartbeat_avail + avail_info = "iDDS agents are running. However there are hanging workers" + else: + availability = heartbeat_avail + avail_info = "iDDS is OK" + + print("availability: %s, avail_info: %s, infos: %s" % (availability, avail_info, infos)) + + return availability, avail_info, infos + + +def main(): + host = 'localhost' + log_location = '/var/log/idds' + avail, avail_info, infos = idds_availability(host, log_location) + + health_file = os.path.join(log_location, 'idds_health') + if avail >= 100: + with open(health_file, 'w') as f: + f.write('OK') + else: + if os.path.exists(health_file): + os.remove(health_file) + + +if __name__ == '__main__': + main() diff --git a/main/config_default/logrotate_daemon b/main/config_default/logrotate_daemon index 70091208..668f7dd9 100755 --- a/main/config_default/logrotate_daemon +++ b/main/config_default/logrotate_daemon @@ -1,4 +1,6 @@ #!/bin/bash # while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds; sleep 86400; done -while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done +# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done + +while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds; sleep 86400; done diff --git a/main/config_default/supervisord_healthmonitor.ini b/main/config_default/supervisord_healthmonitor.ini new file mode 100644 index 00000000..c23a9f56 --- /dev/null +++ b/main/config_default/supervisord_healthmonitor.ini @@ -0,0 +1,17 @@ +[program:health-monitor] +# command=/usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds +command=/opt/idds/config/idds/healthmonitor_daemon +# process_name=%(process_num)02d +# user=atlpan +childlogdir=/var/log/idds +stdout_logfile=/var/log/idds/%(program_name)s-stdout.log +stderr_logfile=/var/log/idds/%(program_name)s-stderr.log +stdout_logfile_maxbytes=2GB +stderr_logfile_maxbytes=2GB +stdout_logfile_backups=1 +stderr_logfile_backups=1 +redirect_stderr=false +autorestart=true +stopsignal=TERM +stopasgroup=true +exitcodes=1 diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index afd24d32..7c5a6335 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -274,7 +274,8 @@ def update_processing(self, processing, processing_model, use_bulk_update_mappin new_contents_ext=processing.get('new_contents_ext', None), update_contents_ext=processing.get('update_contents_ext', None), new_input_dependency_contents=processing.get('new_input_dependency_contents', None), - use_bulk_update_mappings=use_bulk_update_mappings) + use_bulk_update_mappings=use_bulk_update_mappings, + message_bulk_size=self.message_bulk_size) except exceptions.DatabaseException as ex: if 'ORA-00060' in str(ex): self.logger.warn(log_prefix + "update_processing (cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource") diff --git a/main/lib/idds/core/messages.py b/main/lib/idds/core/messages.py index 96ee78fb..07b2aceb 100644 --- a/main/lib/idds/core/messages.py +++ b/main/lib/idds/core/messages.py @@ -50,7 +50,7 @@ def add_messages(messages, bulk_size=1000, session=None): @transactional_session def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None, source=None, request_id=None, workload_id=None, transform_id=None, - processing_id=None, use_poll_period=False, retries=None, delay=60, + processing_id=None, use_poll_period=False, retries=None, delay=None, min_request_id=None, fetching_id=None, internal_id=None, record_fetched=False, record_fetched_status=MessageStatus.Fetched, session=None): diff --git a/main/lib/idds/rest/v1/app.py b/main/lib/idds/rest/v1/app.py index 08936792..057a1cce 100644 --- a/main/lib/idds/rest/v1/app.py +++ b/main/lib/idds/rest/v1/app.py @@ -6,22 +6,22 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 """---------------------- Web service app ----------------------""" import logging -import sys + import flask from flask import Flask, Response from idds.common import exceptions # from idds.common.authentication import authenticate_x509, authenticate_oidc, authenticate_is_super_user -from idds.common.config import (config_has_section, config_has_option, config_get) from idds.common.constants import HTTP_STATUS_CODE from idds.common.utils import get_rest_debug +# from idds.common.utils import get_rest_debug, setup_logging, get_logger from idds.core.authentication import authenticate_x509, authenticate_oidc, authenticate_is_super_user # from idds.common.utils import get_rest_url_prefix from idds.rest.v1 import requests @@ -39,7 +39,6 @@ class LoggingMiddleware(object): def __init__(self, app, logger, url_map): - import logging self._app = app self._logger = logger self._url_map = url_map @@ -155,20 +154,10 @@ def after_request(response): return response -def setup_logging(loglevel=None): - if loglevel is None: - if config_has_section('common') and config_has_option('common', 'loglevel'): - loglevel = getattr(logging, config_get('common', 'loglevel').upper()) - else: - loglevel = logging.INFO - - logging.basicConfig(stream=sys.stdout, level=loglevel, - format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') - - def create_app(auth_type=None): - setup_logging() + # setup_logging(name='idds_app', log_file="idds_rest.log") + # get_logger(name='idds_app', filename='idds_rest.log') # url_prefix = get_rest_url_prefix() application = Flask(__name__) diff --git a/main/lib/idds/rest/v1/controller.py b/main/lib/idds/rest/v1/controller.py index dc9f6a6d..f52a15f8 100644 --- a/main/lib/idds/rest/v1/controller.py +++ b/main/lib/idds/rest/v1/controller.py @@ -6,19 +6,32 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2020 +# - Wen Guan, , 2019 - 2024 from flask import Response, request from flask.views import MethodView from idds.common.constants import HTTP_STATUS_CODE -from idds.common.utils import json_dumps +from idds.common.utils import json_dumps, get_logger class IDDSController(MethodView): """ Default ESS Controller class. """ + def get_class_name(self): + return self.__class__.__name__ + + def setup_logger(self, logger=None): + """ + Setup logger + """ + if logger: + self.logger = logger + else: + self.logger = get_logger(name=self.get_class_name(), filename='idds_rest.log') + return self.logger + def post(self): """ Not supported. """ return Response(status=HTTP_STATUS_CODE.BadRequest, content_type='application/json')() diff --git a/main/lib/idds/rest/v1/messages.py b/main/lib/idds/rest/v1/messages.py index acd8f599..6bd88889 100644 --- a/main/lib/idds/rest/v1/messages.py +++ b/main/lib/idds/rest/v1/messages.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2021 +# - Wen Guan, , 2024 from traceback import format_exc @@ -28,6 +28,10 @@ class Message(IDDSController): """ Get message """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = self.setup_logger() + def get(self, request_id, workload_id, transform_id, internal_id): """ Get messages with given id. HTTP Success: diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 9752c723..1d2285a1 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -17,8 +17,8 @@ # os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' -# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' +os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' from pandaclient import Client # noqa E402 @@ -84,6 +84,7 @@ task_ids = [473, 472] + [i for i in range(325, 345)] task_ids = [476, 477, 478] task_ids = [937, 938, 940, 941] +task_ids = [124, 619] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 27dfe021..37fe20fe 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus925.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus925.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus925.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus961.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus961.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus961.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus961.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus961.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus961.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/start-daemon.sh b/start-daemon.sh index 9a8fe2bd..703277f9 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -134,10 +134,19 @@ else cp /opt/idds/config_default/supervisord_httpd.ini /opt/idds/config/idds/supervisord_httpd.ini # cp /opt/idds/config_default/supervisord_syslog-ng.ini /opt/idds/config/idds/supervisord_syslog-ng.ini + echo "setup log rotation" cp /opt/idds/config_default/supervisord_logrotate.ini /opt/idds/config/idds/supervisord_logrotate.ini cp /opt/idds/config_default/logrotate_idds /opt/idds/config/idds/logrotate_idds cp /opt/idds/config_default/logrotate_daemon /opt/idds/config/idds/logrotate_daemon chmod +x /opt/idds/config/idds/logrotate_daemon + chown root /opt/idds/config/idds/logrotate_idds + + echo "setup health monitor" + cp /opt/idds/config_default/supervisord_healthmonitor.ini /opt/idds/config/idds/ + cp /opt/idds/config_default/healthmonitor_daemon /opt/idds/config/idds/ + cp /opt/idds/config_default/idds_health_check.py /opt/idds/config/idds/ + chmod +x /opt/idds/config/idds/healthmonitor_daemon + chmod +x /opt/idds/config/idds/idds_health_check.py fi if [ -f /etc/grid-security/hostkey.pem ]; then diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 8edb5d07..047ae8ec 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -58,7 +58,7 @@ def get_current_workflow(cls): class WorkflowContext(Context): def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False, - container_options=None): + cloud=None, site=None, queue=None, vo=None, container_options=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -73,11 +73,11 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo self._source_dir = source_dir self.remote_source_file = None - self._vo = None + self._vo = vo - self._queue = None - self._site = None - self._cloud = None + self._queue = queue + self._site = site + self._cloud = queue self._working_group = None