diff --git a/pandaserver/daemons/scripts/task_evaluator.py b/pandaserver/daemons/scripts/task_evaluator.py index 4bf2c516..0d1cb99e 100644 --- a/pandaserver/daemons/scripts/task_evaluator.py +++ b/pandaserver/daemons/scripts/task_evaluator.py @@ -10,6 +10,7 @@ from pandacommon.pandalogger import logger_utils from pandacommon.pandalogger.PandaLogger import PandaLogger from pandacommon.pandautils.thread_utils import GenericThread + from pandaserver.config import panda_config from pandaserver.daemons.scripts.metric_collector import MetricsDB @@ -204,6 +205,15 @@ def analy_task_eval(self): "AND ds.masterID IS NULL " "GROUP BY ds.jediTaskID " ) + sql_get_task_n_jobs = ( + "SELECT COUNT(*) FROM ( " + "SELECT DISTINCT c.PandaID " + "FROM ATLAS_PANDA.JEDI_Datasets ds, ATLAS_PANDA.JEDI_Dataset_Contents c " + "WHERE c.jediTaskID=ds.jediTaskID AND c.datasetID=ds.datasetID " + "AND ds.jediTaskID=:taskID AND ds.masterID IS NULL " + "AND ds.type IN ('input', 'pseudo_input') " + ") " + ) try: # initialize # tmp_site_dict = dict() @@ -235,8 +245,11 @@ def analy_task_eval(self): n_files_total = 0 n_files_finished = 0 n_files_failed = 0 + n_files_remaining = 0 pct_finished = 0 pct_failed = 0 + n_jobs = 0 + n_jobs_remaining = 0 # get dataset info of each task varMap = {":taskID": taskID} dsinfo_list = self.tbuf.querySQL(sql_get_task_dsinfo, varMap) @@ -248,27 +261,34 @@ def analy_task_eval(self): } for tup in dsinfo_list } + # get n jobs of each task + tmp_res = self.tbuf.querySQL(sql_get_task_n_jobs, varMap) + for obj in tmp_res: + (n_jobs,) = obj + dsinfo_dict[taskID]["nJobs"] = n_jobs + break # get task proceeding progress ds_info = dsinfo_dict.get(taskID) if ds_info is not None: n_files_total = ds_info.get("nFiles", 0) n_files_finished = ds_info.get("nFilesFinished", 0) n_files_failed = ds_info.get("nFilesFailed", 0) + n_files_remaining = max(n_files_total - n_files_finished - n_files_failed, 0) + n_jobs = ds_info.get("nJobs", 0) if n_files_total > 0: pct_finished = n_files_finished * 100 / n_files_total pct_failed = n_files_failed * 100 / n_files_total + n_jobs_remaining = int(n_jobs * n_files_remaining / n_files_total) # classify if gshare == "Express Analysis": # Express Analysis tasks always in class S task_class = 2 else: # parameters - progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A") - if progress_to_boost_A is None: - progress_to_boost_A = 90 - progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B") - if progress_to_boost_B is None: - progress_to_boost_B = 95 + progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A", default=90) + progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B", default=95) + max_rem_jobs_to_boost_A = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_A", default=500) + max_rem_jobs_to_boost_B = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_B", default=200) # check usage of the user usage_dict = ue_dict.get(user) if usage_dict is None: @@ -279,10 +299,10 @@ def analy_task_eval(self): else: task_class = 0 # boost for nearly done tasks - if task_class == 1 and pct_finished >= progress_to_boost_A: + if task_class == 1 and pct_finished >= progress_to_boost_A and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_A: # almost done A-tasks, to boost task_class = 2 - elif task_class == 0 and pct_finished >= progress_to_boost_B: + elif task_class == 0 and pct_finished >= progress_to_boost_B and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_B: # almost done B-tasks, to boost task_class = 2 # fill in task class diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index c4737fce..aed02d9f 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -610,11 +610,13 @@ def lockJobsForReassign( return res # get a DB configuration value - def getConfigValue(self, component, key, app="pandaserver", vo=None): + def getConfigValue(self, component, key, app="pandaserver", vo=None, default=None): # get DB proxy proxy = self.proxyPool.getProxy() # exec res = proxy.getConfigValue(component, key, app, vo) + if res is None and default is not None: + res = default # release DB proxy self.proxyPool.putProxy(proxy)