Skip to content

Commit

Permalink
Merge pull request #453 from tmaeno/master
Browse files Browse the repository at this point in the history
fixed setupper_atlas_plugin not to ignore ddm errors
  • Loading branch information
tmaeno authored Nov 18, 2024
2 parents 7e73040 + b6ab3ac commit 6a72ffd
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions pandaserver/dataservice/setupper_atlas_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ def setup_source(self) -> None:
self.replica_map[job.dispatchDBlock][file.dataset] = self.all_replica_map[file.dataset]

# register dispatch dataset
disp_list = self.register_dispatch_datasets(file_list, use_zip_to_pin_map, ds_task_map, tmp_logger, disp_error,
jedi_task_id)
disp_list = self.register_dispatch_datasets(file_list, use_zip_to_pin_map, ds_task_map, tmp_logger, disp_error, jedi_task_id)
# insert datasets to DB
self.task_buffer.insertDatasets(prod_list + disp_list)
# job status
Expand All @@ -356,13 +355,13 @@ def setup_source(self) -> None:
del prod_error

def register_dispatch_datasets(
self,
file_list: Dict[str, Dict[str, List[str]]],
use_zip_to_pin_map: Dict[str, bool],
ds_task_map: Dict[str, int],
tmp_logger: LogWrapper,
disp_error: Dict[str, str],
jedi_task_id: Optional[int]
self,
file_list: Dict[str, Dict[str, List[str]]],
use_zip_to_pin_map: Dict[str, bool],
ds_task_map: Dict[str, int],
tmp_logger: LogWrapper,
disp_error: Dict[str, str],
jedi_task_id: Optional[int],
) -> List[DatasetSpec]:
"""
Register dispatch datasets in Rucio.
Expand Down Expand Up @@ -396,10 +395,10 @@ def register_dispatch_datasets(
tmp_zip_out = {}
dis_files = {"lfns": [], "guids": [], "fsizes": [], "chksums": []}
for tmp_lfn, tmp_guid, tmp_file_size, tmp_checksum in zip(
file_list[dispatch_data_block]["lfns"],
file_list[dispatch_data_block]["guids"],
file_list[dispatch_data_block]["fsizes"],
file_list[dispatch_data_block]["chksums"],
file_list[dispatch_data_block]["lfns"],
file_list[dispatch_data_block]["guids"],
file_list[dispatch_data_block]["fsizes"],
file_list[dispatch_data_block]["chksums"],
):
if tmp_lfn in tmp_zip_out:
tmp_zip_file_name = f"{tmp_zip_out[tmp_lfn]['scope']}:{tmp_zip_out[tmp_lfn]['name']}"
Expand Down Expand Up @@ -444,9 +443,7 @@ def register_dispatch_datasets(
self.logger.debug(f"sleep {attempt}/{max_attempt}")
time.sleep(10)
if not is_ok:
disp_error[
dispatch_data_block] = "setupper.setup_source() could not register dispatch_data_block with {0}".format(
err_str.split("\n")[-1])
disp_error[dispatch_data_block] = "setupper.setup_source() could not register dispatch_data_block with {0}".format(err_str.split("\n")[-1])
continue
tmp_logger.debug(out)
new_out = out
Expand All @@ -464,8 +461,7 @@ def register_dispatch_datasets(
time.sleep(10)
if not status:
tmp_logger.error(out)
disp_error[
dispatch_data_block] = f"setupper.setup_source() could not freeze dispatch_data_block with {out}"
disp_error[dispatch_data_block] = f"setupper.setup_source() could not freeze dispatch_data_block with {out}"
continue

# get VUID
Expand All @@ -479,8 +475,7 @@ def register_dispatch_datasets(
dataset.status = "defined"
dataset.numberfiles = len(file_list[dispatch_data_block]["lfns"])
try:
dataset.currentfiles = int(
sum(filter(None, file_list[dispatch_data_block]["fsizes"])) / 1024 / 1024)
dataset.currentfiles = int(sum(filter(None, file_list[dispatch_data_block]["fsizes"])) / 1024 / 1024)
except Exception:
dataset.currentfiles = 0
if jedi_task_id is not None:
Expand Down Expand Up @@ -774,6 +769,11 @@ def setup_destination(self, start_idx: int = -1, n_jobs_in_loop: int = 50) -> No
# set new destDBlock
if dest in newname_list:
file.destinationDBlock = newname_list[dest]
# update job status if failed and increment number of files
for job in jobs_list:
# ignore failed jobs
if job.jobStatus in ["failed", "cancelled"] or job.isCancelled():
continue
for file in job.Files:
dest = (
file.destinationDBlock,
Expand Down

0 comments on commit 6a72ffd

Please sign in to comment.