Skip to content

Commit

Permalink
Merge pull request #457 from PanDAWMS/error_classification
Browse files Browse the repository at this point in the history
Error classification
  • Loading branch information
fbarreir authored Nov 21, 2024
2 parents 4b56c42 + 5432f2e commit 5c8f044
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 100 deletions.
16 changes: 8 additions & 8 deletions pandaserver/dataservice/adder_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,16 @@ def handle_failed_job(self) -> None:

if source and error_code:
try:
self.logger.debug("AdderGen.run will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("AdderGen.run will call job_failure_postprocessing")
retryModule.job_failure_postprocessing(
self.taskBuffer,
self.job.PandaID,
errors,
self.job.attemptNr,
)
self.logger.debug("apply_retrial_rules is back")
self.logger.debug("job_failure_postprocessing is back")
except Exception as e:
self.logger.error(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")

self.job.jobStatus = "failed"
for file in self.job.Files:
Expand Down Expand Up @@ -488,18 +488,18 @@ def check_job_status(self) -> None:
"error_diag": error_diag,
}
]
self.logger.debug("AdderGen.run 2 will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("AdderGen.run 2 will call job_failure_postprocessing")
retryModule.job_failure_postprocessing(
self.taskBuffer,
job_tmp.PandaID,
errors,
job_tmp.attemptNr,
)
self.logger.debug("apply_retrial_rules 2 is back")
self.logger.debug("job_failure_postprocessing 2 is back")
except IndexError:
pass
except Exception as e:
self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")

self.setup_closer()

Expand Down
17 changes: 9 additions & 8 deletions pandaserver/jobdispatcher/Watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from pandacommon.pandalogger.LogWrapper import LogWrapper
from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandaserver.dataservice.closer import Closer
from pandaserver.jobdispatcher import ErrorCode
from pandaserver.taskbuffer import EventServiceUtils, retryModule
Expand Down Expand Up @@ -136,11 +137,11 @@ def run(self):
]

try:
self.logger.debug("Watcher will call apply_retrial_rules")
retryModule.apply_retrial_rules(self.taskBuffer, job.PandaID, errors, job.attemptNr)
self.logger.debug("apply_retrial_rules is back")
self.logger.debug("Watcher will call job_failure_postprocessing")
retryModule.job_failure_postprocessing(self.taskBuffer, job.PandaID, errors, job.attemptNr)
self.logger.debug("job_failure_postprocessing is back")
except Exception as e:
self.logger.debug(f"apply_retrial_rules excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.debug(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")

# updateJobs was successful and it failed a job with taskBufferErrorCode
try:
Expand All @@ -155,20 +156,20 @@ def run(self):
source = "taskBufferErrorCode"
error_code = job_tmp.taskBufferErrorCode
error_diag = job_tmp.taskBufferErrorDiag
self.logger.debug("Watcher.run 2 will call apply_retrial_rules")
retryModule.apply_retrial_rules(
self.logger.debug("Watcher.run 2 will call job_failure_postprocessing")
retryModule.job_failure_postprocessing(
self.taskBuffer,
job_tmp.PandaID,
source,
error_code,
error_diag,
job_tmp.attemptNr,
)
self.logger.debug("apply_retrial_rules 2 is back")
self.logger.debug("job_failure_postprocessing 2 is back")
except IndexError:
pass
except Exception as e:
self.logger.error(f"apply_retrial_rules 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")

cThr = Closer(self.taskBuffer, destDBList, job)
cThr.run()
Expand Down
65 changes: 65 additions & 0 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14402,6 +14402,71 @@ def setMaxAttempt(self, jobID, taskID, files, maxAttempt):
tmpLog.debug("done")
return True

def increase_max_attempt(self, job_id, task_id, files):
"""Increase the max attempt number by one for specific files."""
comment = " /* DBProxy.increase_max_attempt */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_log = LogWrapper(_logger, method_name)
tmp_log.debug("start")

# Update the file entries to increase the max attempt number by one
input_types = ("input", "pseudo_input", "pp_input", "trn_log", "trn_output")
input_files = [
pandafile for pandafile in files
if pandafile.type in input_types and re.search("DBRelease", pandafile.lfn) is None
]
input_file_ids = [input_file.fileID for input_file in input_files]
input_dataset_ids = [input_file.datasetID for input_file in input_files]

if input_file_ids:
try:
# Start transaction
self.conn.begin()

var_map = {
":taskID": task_id,
":pandaID": job_id,
}

# Bind the files
file_bindings = []
for index, file_id in enumerate(input_file_ids):
var_map[f":file{index}"] = file_id
file_bindings.append(f":file{index}")
file_bindings_str = ",".join(file_bindings)

# Bind the datasets
dataset_bindings = []
for index, dataset_id in enumerate(input_dataset_ids):
var_map[f":dataset{index}"] = dataset_id
dataset_bindings.append(f":dataset{index}")
dataset_bindings_str = ",".join(dataset_bindings)

sql_update = f"""
UPDATE ATLAS_PANDA.JEDI_Dataset_Contents
SET maxAttempt = maxAttempt + 1
WHERE JEDITaskID = :taskID
AND datasetID IN ({dataset_bindings_str})
AND fileID IN ({file_bindings_str})
AND pandaID = :pandaID
"""

self.cur.execute(sql_update + comment, var_map)

# Commit updates
if not self._commit():
raise RuntimeError("Commit error")

except Exception:
# Roll back
self._rollback()
# Log error
self.dumpErrorMessage(_logger, method_name)
return False

tmp_log.debug("done")
return True

def setNoRetry(self, jobID, taskID, files):
# Logging
comment = " /* DBProxy.setNoRetry */"
Expand Down
2 changes: 1 addition & 1 deletion pandaserver/taskbuffer/PandaDBSchemaInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class PandaDBSchemaInfo:
schema_version = None

def method(self):
schema_version = "0.0.20"
schema_version = "0.0.22"
_logger.debug(f"PanDA schema version required for Server is : {schema_version}")
return schema_version
11 changes: 11 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,17 @@ def setMaxAttempt(self, jobID, jediTaskID, files, attemptNr):

return ret

# error classification action: increase by one the max number of retries
def increase_max_attempt(self, job_id, task_id, files):
# get proxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.increase_max_attempt(job_id, task_id, files)
# release proxy
self.proxyPool.putProxy(proxy)

return ret

# retry module action: set maxAttempt to the current attemptNr to avoid further retries
def setNoRetry(self, jobID, jediTaskID, files):
# get proxy
Expand Down
Loading

0 comments on commit 5c8f044

Please sign in to comment.