Skip to content

Commit

Permalink
Merge pull request #397 from tmaeno/master
Browse files Browse the repository at this point in the history
to set taskid to datasets.moverid
  • Loading branch information
tmaeno authored Aug 22, 2024
2 parents 104701d + 69905d6 commit cf6d7d3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 25 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.3.17"
release_version = "0.3.18"
34 changes: 10 additions & 24 deletions pandaserver/dataservice/setupper_atlas_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,16 @@ def setup_source(self) -> None:
disp_error = {}
back_end_map = {}
ds_task_map = dict()
jedi_task_id = None
# special datasets in rucio where files are zipped into a file
use_zip_to_pin_map = dict()
# extract prodDBlock
for job in self.jobs:
# ignore failed jobs
if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
continue
if jedi_task_id is None and job.jediTaskID not in ["NULL", None, 0]:
jedi_task_id = job.jediTaskID
# production datablock
if job.prodDBlock != "NULL" and job.prodDBlock and (job.prodSourceLabel not in ["user", "panda"]):
# get VUID and record prodDBlock into DB
Expand Down Expand Up @@ -469,6 +472,8 @@ def setup_source(self) -> None:
dataset.currentfiles = int(sum(filter(None, file_list[dispatch_data_block]["fsizes"])) / 1024 / 1024)
except Exception:
dataset.currentfiles = 0
if jedi_task_id is not None:
dataset.MoverID = jedi_task_id
disp_list.append(dataset)
self.vuid_map[dataset.name] = dataset.vuid
except Exception:
Expand Down Expand Up @@ -816,7 +821,6 @@ def subscribe_dispatch_data_block(self) -> None:
"""
disp_error = {}
failed_jobs = []
ddm_jobs = []
ddm_user = "NULL"
for job in self.jobs:
# ignore failed jobs
Expand Down Expand Up @@ -1015,29 +1019,8 @@ def subscribe_dispatch_data_block(self) -> None:
job.ddmErrorDiag = disp_error[disp]
self.logger.debug(f"failed PandaID={job.PandaID} with {job.ddmErrorDiag}")
failed_jobs.append(job)
# update failed jobs only. succeeded jobs should be activate by DDM callback
# update failed jobs only. succeeded jobs should be activated by DDM callback
self.update_failed_jobs(failed_jobs)
# submit ddm jobs
if ddm_jobs:
ddm_ret = self.task_buffer.storeJobs(ddm_jobs, ddm_user, joinThr=True)
# update datasets
ddm_index = 0
ddm_ds_list = []
for ddm_panda_id, _, _ in ddm_ret:
# invalid PandaID
if ddm_panda_id in ["NULL", None]:
continue
# get dispatch dataset
ds_name = ddm_jobs[ddm_index].jobParameters.split()[-1]
ddm_index += 1
tmp_ds = self.task_buffer.queryDatasetWithMap({"name": ds_name})
if tmp_ds is not None:
# set MoverID
tmp_ds.MoverID = ddm_panda_id
ddm_ds_list.append(tmp_ds)
# update
if ddm_ds_list:
self.task_buffer.updateDatasets(ddm_ds_list)

# correct LFN for attemptNr
def correct_lfn(self) -> None:
Expand Down Expand Up @@ -1763,13 +1746,15 @@ def create_dispatch_datasets(self, dataset_file_map):
dataset.status = "defined"
dataset.numberfiles = len(lfns)
dataset.currentfiles = 0
if tmp_val["taskID"] not in [None, "NULL"]:
dataset.MoverID = tmp_val["taskID"]
disp_list.append(dataset)
except Exception:
error_type, error_value = sys.exc_info()[:2]
self.logger.error(f"ext registerNewDataset : failed to decode VUID for {dis_dispatch_block} - {error_type} {error_value}")
continue
# freezeDataset dispatch dataset
self.logger.debug("freezeDataset " + dis_dispatch_block)
self.logger.debug(f"freezeDataset {dis_dispatch_block}")
for attempt in range(3):
status = False
try:
Expand Down Expand Up @@ -2040,6 +2025,7 @@ def setup_jumbo_jobs(self) -> None:
dataset.status = "defined"
dataset.numberfiles = len(lfns)
dataset.currentfiles = 0
dataset.MoverID = jumbo_job_spec.jediTaskID
self.task_buffer.insertDatasets([dataset])
# set destination
jumbo_job_spec.destinationSE = jumbo_job_spec.computingSite
Expand Down
37 changes: 37 additions & 0 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6696,6 +6696,43 @@ def deleteDataset(self, name):
_logger.error(f"deleteDataset() : {type} {value}")
return False

# trigger cleanup of internal datasets used by a task
def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
"""
Set deleting flag to dispatch datasets used by a task, which triggers deletion in datasetManager
"""
comment = " /* DBProxy.trigger_cleanup_internal_datasets */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_log = LogWrapper(_logger, f"{method_name} < jediTaskID={task_id} >")
tmp_log.debug("start")
sql1 = (
f"UPDATE {panda_config.schemaPANDA}.Datasets SET status=:newStatus,modificationdate=CURRENT_DATE "
"WHERE type=:type AND MoverID=:taskID AND status IN (:status_d,:status_c) "
)
try:
# start transaction
self.conn.begin()
# update
var_map = {
":type": "dispatch",
":newStatus": "deleting",
":taskID": task_id,
":status_d": "defined",
":status_c": "completed",
}
self.cur.execute(sql1 + comment, var_map)
ret_u = self.cur.rowcount
# commit
if not self._commit():
raise RuntimeError("Commit error")
tmp_log.debug(f"set flag to {ret_u} dispatch datasets")
return True
except Exception:
# roll back
self._rollback()
self.dumpErrorMessage(_logger, method_name)
return False

# get serial number for dataset, insert dummy datasets to increment SN
def getSerialNumber(self, datasetname, definedFreshFlag=None):
comment = " /* DBProxy.getSerialNumber */"
Expand Down
7 changes: 7 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,13 @@ def updateDatasets(self, datasets, withLock=False, withCriteria="", criteriaMap=
# return
return retList

# trigger cleanup of internal datasets used by a task
def trigger_cleanup_internal_datasets(self, task_id: int) -> bool:
proxy = self.proxyPool.getProxy()
ret = proxy.trigger_cleanup_internal_datasets(task_id)
self.proxyPool.putProxy(proxy)
return ret

# delete dataset
def deleteDatasets(self, datasets):
# get DBproxy
Expand Down

0 comments on commit cf6d7d3

Please sign in to comment.