Skip to content

Commit

Permalink
Merge branch 'setupper' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
EdwardKaravakis authored Aug 9, 2024
2 parents edd27c9 + 8489be7 commit 9a456ba
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions pandaserver/dataservice/setupper_atlas_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,13 +1561,14 @@ def dynamic_data_placement(self) -> None:
return
return

# make dis datasets for existing files to avoid deletion when jobs are queued
def make_dis_datasets_for_existing_files(self) -> None:
def collect_existing_files(self):
"""
Make dis datasets for existing files method for running the setup process.
Collects existing files to avoid deletion when jobs are queued.
This method iterates over all jobs and collects files that should not be deleted,
organizing them by destination DDM endpoint and log dataset name.
:return: A dictionary mapping (destination DDM endpoint, log dataset name) to a list of files.
"""
self.logger.debug("make dis datasets for existing files")
# collect existing files
dataset_file_map = {}
n_max_jobs = 20
n_jobs_map = {}
Expand Down Expand Up @@ -1595,7 +1596,10 @@ def make_dis_datasets_for_existing_files(self) -> None:
scope_dest_input, _ = select_scope(dest_site, tmp_job.prodSourceLabel, tmp_job.job_label)
dest_ddm_id = dest_site.ddm_input[scope_dest_input]
# Nucleus used as Satellite
if tmp_job.getCloud() != self.site_mapper.getSite(tmp_job.computingSite).cloud and self.site_mapper.getSite(tmp_job.computingSite).cloud in ["US"]:
if (
tmp_job.getCloud() != self.site_mapper.getSite(tmp_job.computingSite).cloud
and self.site_mapper.getSite(tmp_job.computingSite).cloud in ["US"]
):
tmp_site_spec = self.site_mapper.getSite(tmp_job.computingSite)
scope_tmp_site_input, _ = select_scope(tmp_site_spec, tmp_job.prodSourceLabel, tmp_job.job_label)
tmp_se_tokens = tmp_site_spec.setokens_input[scope_tmp_site_input]
Expand Down Expand Up @@ -1625,12 +1629,17 @@ def make_dis_datasets_for_existing_files(self) -> None:
# if available at Satellite
real_dest_ddm_id = (dest_ddm_id,)
if (
tmp_job.getCloud() in self.available_lfns_in_satellites
and tmp_file.dataset in self.available_lfns_in_satellites[tmp_job.getCloud()]
and tmp_job.computingSite in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"]
and tmp_file.lfn in self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"][tmp_job.computingSite]
tmp_job.getCloud() in self.available_lfns_in_satellites
and tmp_file.dataset in self.available_lfns_in_satellites[tmp_job.getCloud()]
and tmp_job.computingSite in
self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"]
and tmp_file.lfn in
self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["sites"][
tmp_job.computingSite]
):
real_dest_ddm_id = self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["siteDQ2IDs"][tmp_job.computingSite]
real_dest_ddm_id = \
self.available_lfns_in_satellites[tmp_job.getCloud()][tmp_file.dataset]["siteDQ2IDs"][
tmp_job.computingSite]
real_dest_ddm_id = tuple(real_dest_ddm_id)
# append
if real_dest_ddm_id not in dataset_file_map[map_key]:
Expand All @@ -1652,6 +1661,13 @@ def make_dis_datasets_for_existing_files(self) -> None:
}
# add file spec
dataset_file_map[map_key][real_dest_ddm_id]["files"][tmp_file.lfn]["fileSpecs"].append(tmp_file)
return dataset_file_map

def create_dispatch_datasets(self, dataset_file_map):
"""
Creates dispatch datasets for the collected files.
Returns a list of datasets to be inserted into the database.
"""
# loop over all locations
disp_list = []
for _, tmp_dum_val in dataset_file_map.items():
Expand Down Expand Up @@ -1802,6 +1818,19 @@ def make_dis_datasets_for_existing_files(self) -> None:
# failure
if not is_ok:
continue
return disp_list

# make dis datasets for existing files to avoid deletion when jobs are queued
def make_dis_datasets_for_existing_files(self) -> None:
"""
Make dis datasets for existing files method for running the setup process.
"""
self.logger.debug("make dis datasets for existing files")
# collect existing files
dataset_file_map = self.collect_existing_files()
# create dispatch datasets for the collected files
disp_list = self.create_dispatch_datasets(dataset_file_map)

# insert datasets to DB
self.task_buffer.insertDatasets(disp_list)
self.logger.debug("finished to make dis datasets for existing files")
Expand Down

0 comments on commit 9a456ba

Please sign in to comment.