diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 0cda1758..f52ba1aa 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.5.0" +release_version = "0.5.1" diff --git a/pandaserver/taskbuffer/JobSpec.py b/pandaserver/taskbuffer/JobSpec.py index 92f8077e..fabbb079 100755 --- a/pandaserver/taskbuffer/JobSpec.py +++ b/pandaserver/taskbuffer/JobSpec.py @@ -214,6 +214,7 @@ class JobSpec(object): "requestType": "rt", "jobCloning": "sc", "scoutJob": "sj", + "taskQueuedTime": "tq", "usePrefetcher": "up", "useSecrets": "us", "useZipToPin": "uz", @@ -917,6 +918,22 @@ def set_input_output_file_types(self) -> None: out_types = sorted(list(out_types)) self.outputFileType = ",".join(out_types)[: self._limitLength["outputFileType"]] + # set task queued time + def set_task_queued_time(self, queued_time: float | None): + """ + Set task queued time in job metrics. Skip if queued_time is None + + :param queued_time: task queued time in seconds since epoch + """ + if queued_time is None: + return + if self.specialHandling in [None, "", "NULL"]: + items = [] + else: + items = self.specialHandling.split(",") + items.append(f"{self._tagForSH['taskQueuedTime']}={queued_time}") + self.specialHandling = ",".join(items) + # utils @@ -927,3 +944,21 @@ def push_status_changes(special_handling): items = special_handling.split(",") return JobSpec._tagForSH["pushStatusChanges"] in items return False + + +# get task queued time +def get_task_queued_time(special_handling) -> datetime.datetime | None: + """ + Get task queued time from job metrics + + :param special_handling: special handling string + :return: task queued time. None if unset + """ + try: + if special_handling is not None: + for item in special_handling.split(","): + if item.startswith(f"{JobSpec._tagForSH['taskQueuedTime']}="): + return datetime.datetime.fromtimestamp(float(item.split("=")[-1]), datetime.timezone.utc) + except Exception: + pass + return None diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 15758355..0062354d 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -37,10 +37,15 @@ task_split_rules, ) from pandaserver.taskbuffer.DatasetSpec import DatasetSpec +from pandaserver.taskbuffer.db_proxy_mods import metrics_module, task_module from pandaserver.taskbuffer.DdmSpec import DdmSpec from pandaserver.taskbuffer.FileSpec import FileSpec from pandaserver.taskbuffer.HarvesterMetricsSpec import HarvesterMetricsSpec -from pandaserver.taskbuffer.JobSpec import JobSpec, push_status_changes +from pandaserver.taskbuffer.JobSpec import ( + JobSpec, + get_task_queued_time, + push_status_changes, +) from pandaserver.taskbuffer.ResourceSpec import ( BASIC_RESOURCE_TYPE, ResourceSpec, @@ -124,29 +129,16 @@ def convert_dict_to_bind_vars(item): return ret -# create method name and logger -def create_method_name_logger(comment: str, tag: str = None) -> tuple[str, LogWrapper]: - """ - create method name and logger from function comment - param comment: comment of the function - param tag: tag to add to the method name - return: (method name, log wrapper) - """ - method_name = comment.split(" ")[-2].split(".")[-1] - if tag is not None: - method_name += f"< {tag} >" - tmp_log = LogWrapper(_logger, method_name) - return method_name, tmp_log - - # topics in SQL_QUEUE SQL_QUEUE_TOPIC_async_dataset_update = "async_dataset_update" # proxy -class DBProxy: +class DBProxy(metrics_module.MetricsModule, task_module.TaskModule): # constructor def __init__(self, useOtherError=False): + # init modules + super().__init__(_logger) # connection object self.conn = None # cursor object @@ -2503,8 +2495,9 @@ def updateJobStatus(self, pandaID, jobStatus, param, updateStateChange=False, at varMap[":ngType2"] = "trn_output" tmp_log.debug(sqlJediDU + comment + str(varMap)) self.cur.execute(sqlJediDU + comment, varMap) - # update lastStart + # first transition to running if oldJobStatus in ("starting", "sent") and jobStatus == "running": + # update lastStart sqlLS = "UPDATE ATLAS_PANDAMETA.siteData SET lastStart=CURRENT_DATE " sqlLS += "WHERE site=:site AND hours=:hours AND flag IN (:flag1,:flag2) " varMap = {} @@ -2514,6 +2507,11 @@ def updateJobStatus(self, pandaID, jobStatus, param, updateStateChange=False, at varMap[":flag2"] = "analysis" self.cur.execute(sqlLS + comment, varMap) tmp_log.debug("updated lastStart") + # record queuing period + if jediTaskID and get_task_queued_time(specialHandling): + tmp_success = self.record_job_queuing_period(pandaID) + if tmp_success is True: + tmp_log.debug("recorded queuing period") # update input if updatedFlag and jediTaskID is not None and jobStatus == "running" and oldJobStatus != jobStatus: self.updateInputStatusJedi(jediTaskID, pandaID, jobStatus) @@ -8172,7 +8170,7 @@ def propagateResultToJEDI( datasetContentsStat = {} # loop over all files finishUnmerge = set() - hasInput = False + trigger_reattempt = False tmpLog.debug(f"waitLock={waitLock} async_params={async_params}") # make pseudo files for dynamic number of events if EventServiceUtils.isDynNumEventsSH(jobSpec.specialHandling): @@ -8448,6 +8446,7 @@ def propagateResultToJEDI( elif fileSpec.status != "merging": # decrement nUsed to trigger reattempt datasetContentsStat[datasetID]["nFilesUsed"] -= 1 + trigger_reattempt = True else: # increment nTobeUsed to trigger merging datasetContentsStat[datasetID]["nFilesTobeUsed"] += 1 @@ -8615,6 +8614,23 @@ def propagateResultToJEDI( toSet = self.checkFailureCountWithCorruptedFiles(jobSpec.jediTaskID, jobSpec.PandaID) if toSet: self.setCorruptedEventRanges(jobSpec.jediTaskID, jobSpec.PandaID) + # update task queued time + if trigger_reattempt and get_task_queued_time(jobSpec.specialHandling): + sql_update_tq = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL " + var_map = {":jediTaskID": jobSpec.jediTaskID} + tmpLog.debug(sql_update_tq + comment + str(var_map)) + if async_params is not None: + self.insert_to_query_pool( + SQL_QUEUE_TOPIC_async_dataset_update, + async_params["PandaID"], + async_params["jediTaskID"], + sql_update_tq, + var_map, + async_params["exec_order"], + ) + async_params["exec_order"] += 1 + else: + cur.execute(sql_update_tq + comment, var_map) # add jobset info for job cloning if useJobCloning: self.recordRetryHistoryJEDI( @@ -8717,6 +8733,13 @@ def propagateResultToJEDI( except Exception: tmpLog.error(f"failed calculating gCO2 with {traceback.format_exc()}") + # task and job metrics + if get_task_queued_time(jobSpec.specialHandling): + # update task queued time + self.update_task_queued_time(jobSpec.jediTaskID) + # record job queuing time if the job didn't start running + self.record_job_queuing_period(jobSpec.PandaID, jobSpec) + # return return True @@ -21536,7 +21559,7 @@ def insert_to_query_pool(self, topic, panda_id, task_id, sql, var_map, exec_orde def async_update_datasets(self, panda_id): comment = " /* DBProxy.async_update_datasets */" methodName = comment.split(" ")[-2].split(".")[-1] - methodName += f" < ID={panda_id} >" + methodName += f" < PandaID={panda_id} >" tmpLog = LogWrapper(_logger, methodName) tmpLog.debug("start") try: @@ -21655,273 +21678,3 @@ def create_pseudo_files_for_dyn_num_events(self, job_spec, tmp_log): pseudo_files.append(tmpFileSpec) tmp_log.debug(f"{len(pseudo_files)} pseudo files") return pseudo_files - - # set job or task metrics - def set_workload_metrics(self, jedi_task_id: int, panda_id: int | None, metrics: dict, use_commit: bool = True) -> bool: - """ - Set job or task metrics - - :param jedi_task_id: jediTaskID - :param panda_id: PandaID. None to set task - :param metrics: metrics data - :param use_commit: use commit - :return: True if success - """ - comment = " /* DBProxy.set_workload_metrics */" - if panda_id is not None: - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}") - else: - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") - tmp_log.debug("start") - try: - if panda_id is not None: - table_name = "Job_Metrics" - var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id} - else: - table_name = "Task_Metrics" - var_map = {":jediTaskID": jedi_task_id} - # check if data is already there - sql_check = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID " - if panda_id is not None: - sql_check += "AND PandaID=:PandaID " - # insert data - sql_insert = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} " - if panda_id is not None: - sql_insert += "(jediTaskID,PandaID,creationTime,modificationTime,data) VALUES(:jediTaskID,:PandaID,CURRENT_DATE,CURRENT_DATE,:data) " - else: - sql_insert += "(jediTaskID,creationTime,modificationTime,data) VALUES(:jediTaskID,CURRENT_DATE,CURRENT_DATE,:data) " - # update data - sql_update = f"UPDATE {panda_config.schemaPANDA}.{table_name} SET modificationTime=CURRENT_DATE,data=:data WHERE jediTaskID=:jediTaskID " - if panda_id is not None: - sql_update += "AND PandaID=:PandaID " - # start transaction - if use_commit: - self.conn.begin() - # check if data is already there - self.cur.execute(sql_check + comment, var_map) - # read data - tmp_data = None - for (clob_data,) in self.cur: - try: - tmp_data = clob_data.read() - except AttributeError: - tmp_data = str(clob_data) - break - if not tmp_data: - # insert new data - var_map[":data"] = json.dumps(metrics) - self.cur.execute(sql_insert + comment, var_map) - tmp_log.debug("inserted") - else: - # update existing data - tmp_data = json.loads(tmp_data) - tmp_data.update(metrics) - var_map[":data"] = json.dumps(tmp_data) - self.cur.execute(sql_update + comment, var_map) - tmp_log.debug("updated") - if use_commit: - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmp_log.debug("done") - return True - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, method_name) - return False - - # get job or task metrics - def get_workload_metrics(self, jedi_task_id: int, panda_id: int | None) -> tuple[bool, dict | None]: - """ - Get job metrics or task metrics - - :param jedi_task_id: jediTaskID - :param panda_id: PandaID. None to get task metrics - :return: (False, None) if failed, otherwise (True, metrics) - """ - comment = " /* DBProxy.get_workload_metrics */" - if panda_id is not None: - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}") - else: - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") - tmp_log.debug("start") - try: - if panda_id is not None: - table_name = "Job_Metrics" - var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id} - else: - table_name = "Task_Metrics" - var_map = {":jediTaskID": jedi_task_id} - # get data - sql_get = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID " - if panda_id is not None: - sql_get += "AND PandaID=:PandaID " - self.cur.execute(sql_get + comment, var_map) - # read data - metrics = None - for (clob_data,) in self.cur: - try: - metrics = clob_data.read() - except AttributeError: - metrics = str(clob_data) - break - if metrics is not None: - metrics = json.loads(metrics) - tmp_log.debug(f"got {sys.getsizeof(metrics)} bytes") - else: - tmp_log.debug("no data") - return True, metrics - except Exception: - # error - self.dumpErrorMessage(_logger, method_name) - return False, None - - # get jobs' metrics in a task - def get_jobs_metrics_in_task(self, jedi_task_id: int) -> tuple[bool, list | None]: - """ - Get metrics of jobs in a task - - :param jedi_task_id: jediTaskID - :return: (False, None) if failed, otherwise (True, list of [PandaID, metrics]) - """ - comment = " /* DBProxy.get_jobs_metrics_in_task */" - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") - tmp_log.debug("start") - try: - table_name = "Job_Metrics" - var_map = {":jediTaskID": jedi_task_id} - # get data - sql_get = f"SELECT PandaID,data FROM {panda_config.schemaPANDA}.Job_Metrics WHERE jediTaskID=:jediTaskID " - self.cur.execute(sql_get + comment, var_map) - # read data - metrics_list = [] - for panda_id, clob_data in self.cur: - try: - tmp_data = clob_data.read() - except AttributeError: - tmp_data = str(clob_data) - metrics_list.append([panda_id, json.loads(tmp_data)]) - tmp_log.debug(f"got metrics for {len(metrics_list)} jobs") - return True, metrics_list - except Exception: - # error - self.dumpErrorMessage(_logger, method_name) - return False, None - - # enable job cloning - def enable_job_cloning(self, jedi_task_id: int, mode: str = None, multiplicity: int = None, num_sites: int = None) -> tuple[bool, str]: - """ - Enable job cloning for a task - - :param jedi_task_id: jediTaskID - :param mode: mode of cloning, runonce or storeonce - :param multiplicity: number of jobs to be created for each target - :param num_sites: number of sites to be used for each target - :return: (True, None) if success otherwise (False, error message) - """ - comment = " /* DBProxy.enable_job_cloning */" - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") - tmp_log.debug("start") - try: - ret_value = (True, None) - # start transaction - self.conn.begin() - # get current split rule - sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID " - var_map = {":jediTaskID": jedi_task_id} - self.cur.execute(sql_check + comment, var_map) - res = self.cur.fetchone() - if not res: - # not found - ret_value = (False, "task not found") - else: - (split_rule,) = res - # set default values - if mode is None: - mode = "runonce" - if multiplicity is None: - multiplicity = 2 - if num_sites is None: - num_sites = 2 - # ID of job cloning mode - mode_id = EventServiceUtils.getJobCloningValue(mode) - if mode_id == "": - ret_value = (False, f"invalid job cloning mode: {mode}") - else: - # set mode - split_rule = task_split_rules.replace_rule(split_rule, "useJobCloning", mode_id) - # set semaphore size - split_rule = task_split_rules.replace_rule(split_rule, "nEventsPerWorker", 1) - # set job multiplicity - split_rule = task_split_rules.replace_rule(split_rule, "nEsConsumers", multiplicity) - # set number of sites - split_rule = task_split_rules.replace_rule(split_rule, "nSitesPerJob", num_sites) - # update split rule and event service flag - sql_update = ( - f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID " - ) - var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_JOB_CLONING} - self.cur.execute(sql_update + comment, var_map) - if not self.cur.rowcount: - ret_value = (False, "failed to update task") - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmp_log.debug("done") - return ret_value - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, method_name) - return False, "failed to enable job cloning" - - # disable job cloning - def disable_job_cloning(self, jedi_task_id: int) -> tuple[bool, str]: - """ - Disable job cloning for a task - - :param jedi_task_id: jediTaskID - :return: (True, None) if success otherwise (False, error message) - """ - comment = " /* DBProxy.disable_job_cloning */" - method_name, tmp_log = create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") - tmp_log.debug("start") - try: - ret_value = (True, None) - # start transaction - self.conn.begin() - # get current split rule - sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID " - var_map = {":jediTaskID": jedi_task_id} - self.cur.execute(sql_check + comment, var_map) - res = self.cur.fetchone() - if not res: - # not found - ret_value = (False, "task not found") - else: - (split_rule,) = res - # remove job cloning related rules - split_rule = task_split_rules.remove_rule_with_name(split_rule, "useJobCloning") - split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEventsPerWorker") - split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEsConsumers") - split_rule = task_split_rules.remove_rule_with_name(split_rule, "nSitesPerJob") - # update split rule and event service flag - sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID " - var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_NORMAL} - self.cur.execute(sql_update + comment, var_map) - if not self.cur.rowcount: - ret_value = (False, "failed to update task") - # commit - if not self._commit(): - raise RuntimeError("Commit error") - tmp_log.debug("done") - return ret_value - except Exception: - # roll back - self._rollback() - # error - self.dumpErrorMessage(_logger, method_name) - return False, "failed to disable job cloning" diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index d6f9c3ea..ac488e4b 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -17,6 +17,7 @@ from pandaserver.srvcore import CoreUtils from pandaserver.taskbuffer import ErrorCode, EventServiceUtils, JobUtils, ProcessGroups from pandaserver.taskbuffer.DBProxyPool import DBProxyPool +from pandaserver.taskbuffer.JobSpec import get_task_queued_time _logger = PandaLogger().getLogger("TaskBuffer") @@ -370,6 +371,7 @@ def storeJobs( use_secrets = job.use_secrets() push_changes = job.push_status_changes() is_push_job = job.is_push_job() + task_queued_time = get_task_queued_time(job.specialHandling) # reset specialHandling specialHandling = specialHandling[:-1] job.specialHandling = specialHandling @@ -385,6 +387,8 @@ def storeJobs( job.set_push_status_changes() if is_push_job: job.set_push_job() + if task_queued_time: + job.set_task_queued_time(task_queued_time.timestamp()) # set DDM backend if ddmBackEnd is not None: job.setDdmBackEnd(ddmBackEnd) diff --git a/pandaserver/taskbuffer/db_proxy_mods/__init__.py b/pandaserver/taskbuffer/db_proxy_mods/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pandaserver/taskbuffer/db_proxy_mods/base_module.py b/pandaserver/taskbuffer/db_proxy_mods/base_module.py new file mode 100644 index 00000000..d9bb6829 --- /dev/null +++ b/pandaserver/taskbuffer/db_proxy_mods/base_module.py @@ -0,0 +1,58 @@ +import sys +import traceback + +from pandacommon.pandalogger.LogWrapper import LogWrapper +from pandacommon.pandalogger.PandaLogger import PandaLogger + + +# Base class for DB proxy modules +class BaseModule: + # constructor + def __init__(self, log_stream: PandaLogger): + self._log_stream = log_stream + self.conn = None + self.cur = None + + # abstract method to commit + def _commit(self): + """ + Commit the transaction + """ + raise NotImplementedError("commit is not implemented") + + # abstract method to rollback + def _rollback(self): + """ + Rollback the transaction + """ + raise NotImplementedError("rollback is not implemented") + + # dump error message + def dump_error_message(self, tmp_log: LogWrapper): + """ + Dump error message to the log + + :param tmp_log: log wrapper + """ + # error + err_type, err_value = sys.exc_info()[:2] + err_str = f"{err_type.__name__} {err_value}" + err_str.strip() + err_str += " " + err_str += traceback.format_exc() + tmp_log.error(err_str) + + # create method name and logger + def create_method_name_logger(self, comment: str, tag: str = None) -> tuple[str, LogWrapper]: + """ + Create method name and logger from function comment + + param comment: comment of the function + param tag: tag to add to the method name + return: (method name, log wrapper) + """ + method_name = comment.split(" ")[-2].split(".")[-1] + if tag is not None: + method_name += f" < {tag} >" + tmp_log = LogWrapper(self._log_stream, method_name) + return method_name, tmp_log diff --git a/pandaserver/taskbuffer/db_proxy_mods/metrics_module.py b/pandaserver/taskbuffer/db_proxy_mods/metrics_module.py new file mode 100644 index 00000000..9b1e06a4 --- /dev/null +++ b/pandaserver/taskbuffer/db_proxy_mods/metrics_module.py @@ -0,0 +1,305 @@ +import datetime +import json +import sys + +from pandacommon.pandalogger.PandaLogger import PandaLogger + +from pandaserver.config import panda_config +from pandaserver.srvcore import CoreUtils +from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule +from pandaserver.taskbuffer.JobSpec import JobSpec, get_task_queued_time + + +# Module class to define metrics related methods +class MetricsModule(BaseModule): + # constructor + def __init__(self, log_stream: PandaLogger): + super().__init__(log_stream) + + # set job or task metrics + def set_workload_metrics(self, jedi_task_id: int, panda_id: int | None, metrics: dict, use_commit: bool = True) -> bool: + """ + Set job or task metrics + + :param jedi_task_id: jediTaskID + :param panda_id: PandaID. None to set task + :param metrics: metrics data + :param use_commit: use commit + :return: True if success + """ + comment = " /* DBProxy.set_workload_metrics */" + if panda_id is not None: + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}") + else: + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + try: + if panda_id is not None: + table_name = "Job_Metrics" + var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id} + else: + table_name = "Task_Metrics" + var_map = {":jediTaskID": jedi_task_id} + # check if data is already there + sql_check = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID " + if panda_id is not None: + sql_check += "AND PandaID=:PandaID " + # insert data + sql_insert = f"INSERT INTO {panda_config.schemaPANDA}.{table_name} " + if panda_id is not None: + sql_insert += "(jediTaskID,PandaID,creationTime,modificationTime,data) VALUES(:jediTaskID,:PandaID,CURRENT_DATE,CURRENT_DATE,:data) " + else: + sql_insert += "(jediTaskID,creationTime,modificationTime,data) VALUES(:jediTaskID,CURRENT_DATE,CURRENT_DATE,:data) " + # update data + sql_update = f"UPDATE {panda_config.schemaPANDA}.{table_name} SET modificationTime=CURRENT_DATE,data=:data WHERE jediTaskID=:jediTaskID " + if panda_id is not None: + sql_update += "AND PandaID=:PandaID " + # start transaction + if use_commit: + self.conn.begin() + # check if data is already there + self.cur.execute(sql_check + comment, var_map) + # read data + tmp_data = None + for (clob_data,) in self.cur: + try: + tmp_data = clob_data.read() + except AttributeError: + tmp_data = str(clob_data) + break + if not tmp_data: + # insert new data + var_map[":data"] = json.dumps(metrics, cls=CoreUtils.NonJsonObjectEncoder) + self.cur.execute(sql_insert + comment, var_map) + tmp_log.debug("inserted") + else: + # update existing data + tmp_data = json.loads(tmp_data, object_hook=CoreUtils.as_python_object) + tmp_data.update(metrics) + var_map[":data"] = json.dumps(tmp_data, cls=CoreUtils.NonJsonObjectEncoder) + self.cur.execute(sql_update + comment, var_map) + tmp_log.debug("updated") + if use_commit: + # commit + if not self._commit(): + raise RuntimeError("Commit error") + tmp_log.debug("done") + return True + except Exception: + # roll back + self._rollback() + # error + self.dump_error_message(tmp_log) + return False + + # get job or task metrics + def get_workload_metrics(self, jedi_task_id: int, panda_id: int = None) -> tuple[bool, dict | None]: + """ + Get job metrics or task metrics + + :param jedi_task_id: jediTaskID + :param panda_id: PandaID. None to get task metrics + :return: (False, None) if failed, otherwise (True, metrics) + """ + comment = " /* DBProxy.get_workload_metrics */" + if panda_id is not None: + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id} PandaID={panda_id}") + else: + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + try: + if panda_id is not None: + table_name = "Job_Metrics" + var_map = {":jediTaskID": jedi_task_id, ":PandaID": panda_id} + else: + table_name = "Task_Metrics" + var_map = {":jediTaskID": jedi_task_id} + # get data + sql_get = f"SELECT data FROM {panda_config.schemaPANDA}.{table_name} WHERE jediTaskID=:jediTaskID " + if panda_id is not None: + sql_get += "AND PandaID=:PandaID " + self.cur.execute(sql_get + comment, var_map) + # read data + metrics = None + for (clob_data,) in self.cur: + try: + metrics = clob_data.read() + except AttributeError: + metrics = str(clob_data) + break + if metrics is not None: + metrics = json.loads(metrics, object_hook=CoreUtils.as_python_object) + tmp_log.debug(f"got {sys.getsizeof(metrics)} bytes") + else: + tmp_log.debug("no data") + return True, metrics + except Exception: + # error + self.dump_error_message(tmp_log) + return False, None + + # get jobs' metrics in a task + def get_jobs_metrics_in_task(self, jedi_task_id: int) -> tuple[bool, list | None]: + """ + Get metrics of jobs in a task + + :param jedi_task_id: jediTaskID + :return: (False, None) if failed, otherwise (True, list of [PandaID, metrics]) + """ + comment = " /* DBProxy.get_jobs_metrics_in_task */" + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + try: + var_map = {":jediTaskID": jedi_task_id} + # get data + sql_get = f"SELECT PandaID,data FROM {panda_config.schemaPANDA}.Job_Metrics WHERE jediTaskID=:jediTaskID " + self.cur.execute(sql_get + comment, var_map) + # read data + metrics_list = [] + for panda_id, clob_data in self.cur: + try: + tmp_data = clob_data.read() + except AttributeError: + tmp_data = str(clob_data) + metrics_list.append([panda_id, json.loads(tmp_data, object_hook=CoreUtils.as_python_object)]) + tmp_log.debug(f"got metrics for {len(metrics_list)} jobs") + return True, metrics_list + except Exception: + # error + self.dump_error_message(tmp_log) + return False, None + + # update task queued time + def update_task_queued_time(self, jedi_task_id: int): + """ + Update task queued time depending on the number of inputs to be processed. Update it if it was None and inputs become available. + Set None if no input is available and record the queuing duration to task metrics. + + :param jedi_task_id: jediTaskID of the task + """ + comment = " /* DBProxy.update_task_queued_time */" + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + # check if the task is queued + sql_check = f"SELECT status,oldStatus,queuedTime,currentPriority,gshare FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID " + var_map = {":jediTaskID": jedi_task_id} + self.cur.execute(sql_check + comment, var_map) + res = self.cur.fetchone() + if not res: + # not found + tmp_log.debug("task not found") + return + (task_status, task_old_status, queued_time, current_priority, global_share) = res + has_input = False + active_status_list = ("ready", "running", "scouting", "scouted") + if task_status in active_status_list or (task_old_status in active_status_list and task_status == "pending"): + # check if the task has unprocessed inputs + sql_input = ( + f"SELECT 1 FROM {panda_config.schemaJEDI}.JEDI_Datasets WHERE jediTaskID=:jediTaskID AND type IN (:type1,:type2) AND masterID IS NULL " + "AND nFilesToBeUsed>0 AND nFilesToBeUSed>nFilesUsed " + ) + var_map = {":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"} + self.cur.execute(sql_input + comment, var_map) + res = self.cur.fetchone() + has_input = res is not None + # set queued time if it was None and inputs are available + if has_input: + if queued_time is None: + # update queued time + tmp_log.debug(f"set queued time when task is {task_status}") + sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=CURRENT_DATE WHERE jediTaskID=:jediTaskID AND queuedTime IS NULL " + var_map = {":jediTaskID": jedi_task_id} + self.cur.execute(sql_update + comment, var_map) + else: + tmp_log.debug(f"keep current queued time {queued_time.strftime('%Y-%m-%d %H:%M:%S')}") + # record queuing duration since the task has no more input to process + if queued_time is not None and not has_input: + # unset queued time + tmp_log.debug(f"unset queued time and record duration when task is {task_status}") + sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET queuedTime=NULL WHERE jediTaskID=:jediTaskID AND queuedTime IS NOT NULL " + var_map = {":jediTaskID": jedi_task_id} + self.cur.execute(sql_update + comment, var_map) + # get task metrics dict + tmp_success, task_metrics = self.get_workload_metrics(jedi_task_id, None) + if not tmp_success: + err_str = f"failed to get task metrics for jediTaskId={jedi_task_id}" + tmp_log.error(err_str) + return + # new + if not task_metrics: + task_metrics = {} + # add duration + task_metrics.setdefault("queuingPeriods", []) + if len(task_metrics["queuingPeriods"]) < 10000: + task_metrics["queuingPeriods"].append( + {"start": queued_time, "end": datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), "status": task_status} + ) + tmp_success = self.set_workload_metrics(jedi_task_id, None, task_metrics, False) + if not tmp_success: + err_str = f"failed to update task metrics for jediTaskId={jedi_task_id}" + tmp_log.error(err_str) + return + else: + tmp_log.debug("skipped since queuing period list is too long") + tmp_log.debug("done") + + # record job queuing period + def record_job_queuing_period(self, panda_id: int, job_spec: JobSpec = None) -> bool | None: + """ + Record queuing period in job metrics. Skip if job.jobMetrics doesn't contain task queued time + + :param panda_id: Job's PandaID + :param job_spec: job spec. None to get it from the database + :return: True if success. False if failed. None if skipped + """ + comment = " /* DBProxy.record_job_queuing_period */" + method_name, tmp_log = self.create_method_name_logger(comment, f"PandaID={panda_id}") + tmp_log.debug(f"start with job spec: {job_spec is None}") + # get task queued time + if job_spec is None: + sql_check = f"SELECT jediTaskID,jobStatus,specialHandling FROM {panda_config.schemaPANDA}.jobsActive4 WHERE PandaID=:PandaID " + var_map = {":PandaID": panda_id} + self.cur.execute(sql_check + comment, var_map) + res = self.cur.fetchone() + if not res: + # not found + tmp_log.debug("job not found") + return None + jedi_task_id, job_status, tmp_str = res + else: + jedi_task_id = job_spec.jediTaskID + job_status = job_spec.jobStatus + tmp_str = job_spec.specialHandling + task_queued_time = get_task_queued_time(tmp_str) + # record queuing duration + if jedi_task_id and task_queued_time: + tmp_log.debug(f"to record queuing period") + # get job metrics dict + tmp_success, job_metrics = self.get_workload_metrics(jedi_task_id, panda_id) + if not tmp_success: + err_str = "Failed to get job metrics " + tmp_log.error(err_str) + return False + # new + if not job_metrics: + job_metrics = {} + # add duration + if "queuingPeriod" in job_metrics: + tmp_log.debug("skipped since queuing period already exists") + return None + else: + job_metrics["queuingPeriod"] = { + "start": task_queued_time, + "end": datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), + "status": job_status, + } + tmp_success = self.set_workload_metrics(jedi_task_id, panda_id, job_metrics, False) + if not tmp_success: + err_str = "Failed to update job metrics" + tmp_log.error(err_str) + return False + tmp_log.debug("done") + return True + else: + tmp_log.debug(f"skipped as jediTaskID={jedi_task_id} taskQueuedTime={task_queued_time}") + return None diff --git a/pandaserver/taskbuffer/db_proxy_mods/task_module.py b/pandaserver/taskbuffer/db_proxy_mods/task_module.py new file mode 100644 index 00000000..3b627f60 --- /dev/null +++ b/pandaserver/taskbuffer/db_proxy_mods/task_module.py @@ -0,0 +1,133 @@ +import datetime +import json +import sys + +from pandacommon.pandalogger.PandaLogger import PandaLogger + +from pandaserver.config import panda_config +from pandaserver.srvcore import CoreUtils +from pandaserver.taskbuffer import EventServiceUtils, task_split_rules +from pandaserver.taskbuffer.db_proxy_mods.base_module import BaseModule + + +# Module class to define task related methods +class TaskModule(BaseModule): + # constructor + def __init__(self, log_stream: PandaLogger): + super().__init__(log_stream) + + # enable job cloning + def enable_job_cloning(self, jedi_task_id: int, mode: str = None, multiplicity: int = None, num_sites: int = None) -> tuple[bool, str]: + """ + Enable job cloning for a task + + :param jedi_task_id: jediTaskID + :param mode: mode of cloning, runonce or storeonce + :param multiplicity: number of jobs to be created for each target + :param num_sites: number of sites to be used for each target + :return: (True, None) if success otherwise (False, error message) + """ + comment = " /* DBProxy.enable_job_cloning */" + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + try: + ret_value = (True, None) + # start transaction + self.conn.begin() + # get current split rule + sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID " + var_map = {":jediTaskID": jedi_task_id} + self.cur.execute(sql_check + comment, var_map) + res = self.cur.fetchone() + if not res: + # not found + ret_value = (False, "task not found") + else: + (split_rule,) = res + # set default values + if mode is None: + mode = "runonce" + if multiplicity is None: + multiplicity = 2 + if num_sites is None: + num_sites = 2 + # ID of job cloning mode + mode_id = EventServiceUtils.getJobCloningValue(mode) + if mode_id == "": + ret_value = (False, f"invalid job cloning mode: {mode}") + else: + # set mode + split_rule = task_split_rules.replace_rule(split_rule, "useJobCloning", mode_id) + # set semaphore size + split_rule = task_split_rules.replace_rule(split_rule, "nEventsPerWorker", 1) + # set job multiplicity + split_rule = task_split_rules.replace_rule(split_rule, "nEsConsumers", multiplicity) + # set number of sites + split_rule = task_split_rules.replace_rule(split_rule, "nSitesPerJob", num_sites) + # update split rule and event service flag + sql_update = ( + f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID " + ) + var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_JOB_CLONING} + self.cur.execute(sql_update + comment, var_map) + if not self.cur.rowcount: + ret_value = (False, "failed to update task") + # commit + if not self._commit(): + raise RuntimeError("Commit error") + tmp_log.debug("done") + return ret_value + except Exception: + # roll back + self._rollback() + # error + self.dump_error_message(tmp_log) + return False, "failed to enable job cloning" + + # disable job cloning + def disable_job_cloning(self, jedi_task_id: int) -> tuple[bool, str]: + """ + Disable job cloning for a task + + :param jedi_task_id: jediTaskID + :return: (True, None) if success otherwise (False, error message) + """ + comment = " /* DBProxy.disable_job_cloning */" + method_name, tmp_log = self.create_method_name_logger(comment, f"jediTaskID={jedi_task_id}") + tmp_log.debug("start") + try: + ret_value = (True, None) + # start transaction + self.conn.begin() + # get current split rule + sql_check = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks WHERE jediTaskID=:jediTaskID " + var_map = {":jediTaskID": jedi_task_id} + self.cur.execute(sql_check + comment, var_map) + res = self.cur.fetchone() + if not res: + # not found + ret_value = (False, "task not found") + else: + (split_rule,) = res + # remove job cloning related rules + split_rule = task_split_rules.remove_rule_with_name(split_rule, "useJobCloning") + split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEventsPerWorker") + split_rule = task_split_rules.remove_rule_with_name(split_rule, "nEsConsumers") + split_rule = task_split_rules.remove_rule_with_name(split_rule, "nSitesPerJob") + # update split rule and event service flag + sql_update = f"UPDATE {panda_config.schemaJEDI}.JEDI_Tasks SET splitRule=:splitRule,eventService=:eventService WHERE jediTaskID=:jediTaskID " + var_map = {":jediTaskID": jedi_task_id, ":splitRule": split_rule, ":eventService": EventServiceUtils.TASK_NORMAL} + self.cur.execute(sql_update + comment, var_map) + if not self.cur.rowcount: + ret_value = (False, "failed to update task") + # commit + if not self._commit(): + raise RuntimeError("Commit error") + tmp_log.debug("done") + return ret_value + except Exception: + # roll back + self._rollback() + # error + self.dump_error_message(tmp_log) + return False, "failed to disable job cloning"