From 88443caaef638f7ddbf92559945e69670e7f5be4 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Thu, 22 Aug 2024 11:25:00 +0200 Subject: [PATCH] timeout for build or non-JEDI jobs stalled in defined state --- pandaserver/daemons/scripts/copyArchive.py | 24 +++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pandaserver/daemons/scripts/copyArchive.py b/pandaserver/daemons/scripts/copyArchive.py index 7017173f0..3e615eb72 100644 --- a/pandaserver/daemons/scripts/copyArchive.py +++ b/pandaserver/daemons/scripts/copyArchive.py @@ -519,7 +519,7 @@ def _memoryCheck(str): statsPerShare.setdefault(gshare, {"nq": 0, "nr": 0}) statsPerPQ.setdefault(computingSite, {}) statsPerPQ[computingSite].setdefault(gshare, {"nq": 0, "nr": 0}) - if jobStatus in ["definied", "assigned", "activated", "starting"]: + if jobStatus in ["defined", "assigned", "activated", "starting"]: statsPerPQ[computingSite][gshare]["nq"] += nJobs statsPerShare[gshare]["nq"] += nJobs elif jobStatus == "running": @@ -729,6 +729,28 @@ def _memoryCheck(str): Client.killJobs(jediJobs[iJob : iJob + nJob], 51, keepUnmerged=True) iJob += nJob + # reassign stalled defined build and non-JEDI jobs + timeLimit = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(minutes=timeoutValue) + varMap = {} + varMap[":jobStatus"] = "defined" + varMap[":prodSourceLabel_p"] = "panda" + varMap[":prodSourceLabel_u"] = "user" + varMap[":timeLimit"] = timeLimit + varMap[":lockedBy"] = "jedi" + status, res = taskBuffer.querySQLS( + "SELECT PandaID FROM ATLAS_PANDA.jobsDefined4 WHERE ((prodSourceLabel=:prodSourceLabel_p AND transformation LIKE '%build%') OR " + "(prodSourceLabel=:prodSourceLabel_u AND lockedBy<>:lockedBy)) AND jobStatus=:jobStatus AND creationTime<:timeLimit ORDER BY PandaID", + varMap, + ) + jobs = [] + if res is not None: + for (id,) in res: + jobs.append(id) + # kill + if len(jobs): + Client.killJobs(jobs, 2) + _logger.debug(f"reassign stalled defined build and non-JEDI jobs with timeout {timeoutValue}min ({str(jobs)})") + # reassign long-waiting jobs in defined table timeLimit = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(hours=12) status, res = taskBuffer.lockJobsForReassign("ATLAS_PANDA.jobsDefined4", timeLimit, [], ["managed"], [], [], [], True)