Skip to content

Commit

Permalink
Merge pull request #454 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
task evaluator: add max_rem_jobs_to_boost of Express Analysis
  • Loading branch information
mightqxc authored Nov 20, 2024
2 parents 8277059 + c8b3ed0 commit 4b56c42
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
36 changes: 28 additions & 8 deletions pandaserver/daemons/scripts/task_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pandacommon.pandalogger import logger_utils
from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.thread_utils import GenericThread

from pandaserver.config import panda_config
from pandaserver.daemons.scripts.metric_collector import MetricsDB

Expand Down Expand Up @@ -204,6 +205,15 @@ def analy_task_eval(self):
"AND ds.masterID IS NULL "
"GROUP BY ds.jediTaskID "
)
sql_get_task_n_jobs = (
"SELECT COUNT(*) FROM ( "
"SELECT DISTINCT c.PandaID "
"FROM ATLAS_PANDA.JEDI_Datasets ds, ATLAS_PANDA.JEDI_Dataset_Contents c "
"WHERE c.jediTaskID=ds.jediTaskID AND c.datasetID=ds.datasetID "
"AND ds.jediTaskID=:taskID AND ds.masterID IS NULL "
"AND ds.type IN ('input', 'pseudo_input') "
") "
)
try:
# initialize
# tmp_site_dict = dict()
Expand Down Expand Up @@ -235,8 +245,11 @@ def analy_task_eval(self):
n_files_total = 0
n_files_finished = 0
n_files_failed = 0
n_files_remaining = 0
pct_finished = 0
pct_failed = 0
n_jobs = 0
n_jobs_remaining = 0
# get dataset info of each task
varMap = {":taskID": taskID}
dsinfo_list = self.tbuf.querySQL(sql_get_task_dsinfo, varMap)
Expand All @@ -248,27 +261,34 @@ def analy_task_eval(self):
}
for tup in dsinfo_list
}
# get n jobs of each task
tmp_res = self.tbuf.querySQL(sql_get_task_n_jobs, varMap)
for obj in tmp_res:
(n_jobs,) = obj
dsinfo_dict[taskID]["nJobs"] = n_jobs
break
# get task proceeding progress
ds_info = dsinfo_dict.get(taskID)
if ds_info is not None:
n_files_total = ds_info.get("nFiles", 0)
n_files_finished = ds_info.get("nFilesFinished", 0)
n_files_failed = ds_info.get("nFilesFailed", 0)
n_files_remaining = max(n_files_total - n_files_finished - n_files_failed, 0)
n_jobs = ds_info.get("nJobs", 0)
if n_files_total > 0:
pct_finished = n_files_finished * 100 / n_files_total
pct_failed = n_files_failed * 100 / n_files_total
n_jobs_remaining = int(n_jobs * n_files_remaining / n_files_total)
# classify
if gshare == "Express Analysis":
# Express Analysis tasks always in class S
task_class = 2
else:
# parameters
progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A")
if progress_to_boost_A is None:
progress_to_boost_A = 90
progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B")
if progress_to_boost_B is None:
progress_to_boost_B = 95
progress_to_boost_A = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_A", default=90)
progress_to_boost_B = self.tbuf.getConfigValue("analy_eval", "PROGRESS_TO_BOOST_B", default=95)
max_rem_jobs_to_boost_A = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_A", default=500)
max_rem_jobs_to_boost_B = self.tbuf.getConfigValue("analy_eval", "MAX_REM_JOBS_TO_BOOST_B", default=200)
# check usage of the user
usage_dict = ue_dict.get(user)
if usage_dict is None:
Expand All @@ -279,10 +299,10 @@ def analy_task_eval(self):
else:
task_class = 0
# boost for nearly done tasks
if task_class == 1 and pct_finished >= progress_to_boost_A:
if task_class == 1 and pct_finished >= progress_to_boost_A and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_A:
# almost done A-tasks, to boost
task_class = 2
elif task_class == 0 and pct_finished >= progress_to_boost_B:
elif task_class == 0 and pct_finished >= progress_to_boost_B and n_jobs_remaining > 0 and n_jobs_remaining <= max_rem_jobs_to_boost_B:
# almost done B-tasks, to boost
task_class = 2
# fill in task class
Expand Down
4 changes: 3 additions & 1 deletion pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,11 +610,13 @@ def lockJobsForReassign(
return res

# get a DB configuration value
def getConfigValue(self, component, key, app="pandaserver", vo=None):
def getConfigValue(self, component, key, app="pandaserver", vo=None, default=None):
# get DB proxy
proxy = self.proxyPool.getProxy()
# exec
res = proxy.getConfigValue(component, key, app, vo)
if res is None and default is not None:
res = default
# release DB proxy
self.proxyPool.putProxy(proxy)

Expand Down

0 comments on commit 4b56c42

Please sign in to comment.