Skip to content

Commit

Permalink
fix issue of max shell argument size when merging many hits file
Browse files Browse the repository at this point in the history
  • Loading branch information
esseivaju committed Jan 17, 2024
1 parent e74901d commit af87819
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import json
import shutil
import stat
import tempfile
import time
import traceback
Expand Down Expand Up @@ -586,9 +587,14 @@ def run(self) -> None:
self.bookKeeper.stop_saver_thread()

if not total_events:
max_iter = 5
n_iter = 0
# didn't have any events to process, we only need to do merging so keep doing it
while self.handle_merge_transforms(True):
pass
n_iter += 1
if n_iter == max_iter:
break

else:
# we might still simulate more events, just finish the current merge tasks
self.handle_merge_transforms(True)
Expand Down Expand Up @@ -616,7 +622,10 @@ def rename_output_files(self, output_map: Dict[str, str]):
new_filename = output_map[file]
except KeyError:
# read the commit log to recover the correct name. If we get another KeyError, we can't recover
new_filename = output_map[self.bookKeeper.recover_outputfile_name(file)]
new_filename = output_map.get(self.bookKeeper.recover_outputfile_name(file))
if not new_filename:
self._logger.warning(f"Couldn't find new name for {file}, will not be staged out correctly")
continue
os.rename(os.path.join(self.merged_files_dir, file), os.path.join(self.merged_files_dir, new_filename))

def produce_final_report(self, output_map: Dict[str, str]):
Expand Down Expand Up @@ -783,24 +792,34 @@ def hits_merge_transform(self, input_files: Iterable[str], output_file: str) ->
if not input_files:
return
tmp_dir = tempfile.mkdtemp()
file_list = ",".join(input_files)
file_list = "\n".join(input_files)
job_report_name = os.path.join(self.job_reports_dir, output_file) + ".json"
output_file = os.path.join(self.merged_files_dir, output_file)

transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", file_list, self.merge_transform_params)
file_list_path = os.path.join(tmp_dir, "file_list.txt")
with open(file_list_path, 'w') as f:
f.write(file_list)

transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", f"@/srv/{os.path.basename(file_list_path)}", self.merge_transform_params)
transform_params = re.sub(r"--inputHitsFile=", "--inputHitsFile ", transform_params)
transform_params = re.sub(r"--inputHITSFile=", "--inputHITSFile ", transform_params)
transform_params = re.sub(r"\$\{OUTPUT0\}", output_file, transform_params, count=1)
transform_params = re.sub(r"--autoConfiguration=everything", "", transform_params)
transform_params = re.sub(r"--DBRelease=current", "", transform_params)

endtoken = "" if self.config.payload['containerextrasetup'].strip().endswith(";") else ";"
container_script = f"{self.config.payload['containerextrasetup']}{endtoken}{self.merge_transform} {transform_params}"
merge_script_path = os.path.join(tmp_dir, "merge_transform.sh")
with open(merge_script_path, 'w') as f:
f.write(container_script)
os.chmod(merge_script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
cmd = str()
cmd += "export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;"
cmd += f"export thePlatform=\"{self.container_name}\";"
endtoken = "" if self.config.payload['containerextraargs'].strip().endswith(";") else ";"
cmd += f"{self.config.payload['containerextraargs']}{endtoken}"
cmd += f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype {self.config.payload['containerengine']} -c $thePlatform -d -s none"
cmd += f" -r \"{container_script}\" -e \"{self.container_options}\";RETURN_VAL=$?;cp jobReport.json {job_report_name} ;exit $RETURN_VAL;"
cmd += f" -r /srv/merge_transform.sh -e \"{self.container_options}\";RETURN_VAL=$?;cp jobReport.json {job_report_name} ;exit $RETURN_VAL;"
return (Popen(cmd,
stdin=DEVNULL,
stdout=DEVNULL,
Expand Down

0 comments on commit af87819

Please sign in to comment.