From ec1d81823c72d8b9f079ae7b674db60c8becea5d Mon Sep 17 00:00:00 2001 From: Julien Esseiva Date: Fri, 4 Oct 2024 14:05:42 -0700 Subject: [PATCH] implement ruff fix --- src/raythena/actors/esworker.py | 46 +++++++++---------- .../actors/payloads/eventservice/pilothttp.py | 2 +- .../drivers/communicators/harvesterMock.py | 8 ++-- .../communicators/harvesterMock2205.py | 8 ++-- 4 files changed, 29 insertions(+), 35 deletions(-) diff --git a/src/raythena/actors/esworker.py b/src/raythena/actors/esworker.py index ef3b70a..ca94ff3 100644 --- a/src/raythena/actors/esworker.py +++ b/src/raythena/actors/esworker.py @@ -194,8 +194,8 @@ def check_time(self) -> None: f"Failed to copy ray logs to actor directory: {e}" ) if time_elapsed > self.time_limit - self.pilot_kill_time: - killsignal = open(self.pilot_kill_file, "w") - killsignal.close() + with open(self.pilot_kill_file, "w") as f: + f.write("KILL") self._logger.info("killsignal sent to payload") break else: @@ -297,23 +297,21 @@ def stagein(self) -> None: self.payload_actor_process_dir, "ray_logs" ) try: - time_limit_monitor = open( - os.path.join(self.workdir, self.time_monitor_file) - ) - start_time = time_limit_monitor.readline().split(":") - self.start_time = ( - int(start_time[0]) * 3600 - + int(start_time[1]) * 60 - + int(start_time[2]) - ) - time_limit = time_limit_monitor.readline().split(":") - if len(time_limit) < 3: - time_limit = ["0"] + time_limit - self.time_limit = ( - int(time_limit[0]) * 3600 - + int(time_limit[1]) * 60 - + int(time_limit[2]) - ) + with open(os.path.join(self.workdir, self.time_monitor_file)) as time_limit_monitor: + start_time = time_limit_monitor.readline().split(":") + self.start_time = ( + int(start_time[0]) * 3600 + + int(start_time[1]) * 60 + + int(start_time[2]) + ) + time_limit = time_limit_monitor.readline().split(":") + if len(time_limit) < 3: + time_limit = ["0"] + time_limit + self.time_limit = ( + int(time_limit[0]) * 3600 + + int(time_limit[1]) * 60 + + int(time_limit[2]) + ) timer_thread = threading.Thread( name="timer", target=self.check_time, daemon=True ) @@ -341,7 +339,7 @@ def stagein(self) -> None: os.mkdir(self.payload_actor_output_dir) except Exception as e: self._logger.warning(f"Exception when creating dir: {e}") - raise StageInFailed(self.id) + raise StageInFailed(self.id) from e # self.cpu_monitor = CPUMonitor(os.path.join(self.payload_actor_process_dir, "cpu_monitor.json")) # self.cpu_monitor.start() try: @@ -349,7 +347,7 @@ def stagein(self) -> None: self.payload.start(self.modify_job(self.job)) except Exception as e: self._logger.warning(f"Failed to stagein payload: {e}") - raise StageInFailed(self.id) + raise StageInFailed(self.id) from e self.transition_state( ESWorker.READY_FOR_EVENTS if self.is_event_service_job() @@ -426,7 +424,7 @@ def receive_job(self, reply: int, job: PandaJob) -> WorkerResponse: except BaseRaythenaException: raise except Exception as e: - raise WrappedException(self.id, e) + raise WrappedException(self.id, e) from e else: self.transition_state(ESWorker.DONE) self._logger.error("Could not fetch job. Set state to done.") @@ -581,7 +579,7 @@ def stageout_event_service_files( self._logger.error( f"Failed to move file {cfile} to {dst}: errno {e.errno}: {e.strerror}" ) - raise StageOutFailed(self.id) + raise StageOutFailed(self.id) from e range_update[cfile_key] = dst else: self._logger.warning( @@ -669,4 +667,4 @@ def get_message(self) -> WorkerResponse: except BaseRaythenaException: raise except Exception as e: - raise WrappedException(self.id, e) + raise WrappedException(self.id, e) from e diff --git a/src/raythena/actors/payloads/eventservice/pilothttp.py b/src/raythena/actors/payloads/eventservice/pilothttp.py index 0eebc09..0b4789f 100644 --- a/src/raythena/actors/payloads/eventservice/pilothttp.py +++ b/src/raythena/actors/payloads/eventservice/pilothttp.py @@ -482,7 +482,7 @@ async def handle_get_event_ranges( else: n_ranges = int(body["nRanges"][0]) if not self.no_more_ranges: - for i in range(n_ranges): + for _ in range(n_ranges): crange = await self.ranges_queue.get() if crange is None: self.no_more_ranges = True diff --git a/src/raythena/drivers/communicators/harvesterMock.py b/src/raythena/drivers/communicators/harvesterMock.py index e37d0ee..dedf8a3 100644 --- a/src/raythena/drivers/communicators/harvesterMock.py +++ b/src/raythena/drivers/communicators/harvesterMock.py @@ -242,15 +242,13 @@ def request_job(self, job_request: PandaJobRequest) -> None: "SimulationJobOptions/preInclude.BeamPipeKill.py " "--geometryVersion ATLAS-R2-2016-01-00-00_VALIDATION --physicsList QGSP_BERT " "--randomSeed 1234 --conditionsTag OFLCOND-MC12-SIM-00 " - "--maxEvents=-1 --inputEvgenFile %s --outputHitsFile HITS_%s.pool.root" - % (self.inFiles, job_name) + f"--maxEvents=-1 --inputEvgenFile {self.inFiles} --outputHitsFile HITS_{job_name}.pool.root" ), "attemptNr": 0, "swRelease": "Atlas-21.0.15", "nucleus": "NULL", "maxCpuCount": 0, - "outFiles": "HITS_%s.pool.root,%s.job.log.tgz" - % (job_name, job_name), + "outFiles": f"HITS_{job_name}.pool.root,{job_name}.job.log.tgz", "currentPriority": 1000, "scopeIn": self.scope, "PandaID": self.pandaID, @@ -261,7 +259,7 @@ def request_job(self, job_request: PandaJobRequest) -> None: "jobName": job_name, "ddmEndPointIn": "UTA_SWT2_DATADISK", "taskID": self.taskId, - "logFile": "%s.job.log.tgz" % job_name, + "logFile": f"{job_name}.job.log.tgz", } } ) diff --git a/src/raythena/drivers/communicators/harvesterMock2205.py b/src/raythena/drivers/communicators/harvesterMock2205.py index 58e51f3..4f7cdc1 100644 --- a/src/raythena/drivers/communicators/harvesterMock2205.py +++ b/src/raythena/drivers/communicators/harvesterMock2205.py @@ -116,15 +116,13 @@ def request_job(self, job_request: PandaJobRequest) -> None: "--geometryVersion default:ATLAS-R2-2016-01-00-01_VALIDATION " "--physicsList FTFP_BERT_ATL_VALIDATION --randomSeed 1234 " "--conditionsTag default:OFLCOND-MC16-SDR-14 " - "--maxEvents=-1 --inputEvgenFile %s --outputHitsFile HITS_%s.pool.root" - % (self.inFiles, job_name) + f"--maxEvents=-1 --inputEvgenFile {self.inFiles} --outputHitsFile HITS_{job_name}.pool.root" ), "attemptNr": 0, "swRelease": "Atlas-22.0.5", "nucleus": "NULL", "maxCpuCount": 0, - "outFiles": "HITS_%s.pool.root,%s.job.log.tgz" - % (job_name, job_name), + "outFiles": f"HITS_{job_name}.pool.root,{job_name}.job.log.tgz", "currentPriority": 1000, "scopeIn": self.scope, "PandaID": self.pandaID, @@ -135,7 +133,7 @@ def request_job(self, job_request: PandaJobRequest) -> None: "jobName": job_name, "ddmEndPointIn": "UTA_SWT2_DATADISK", "taskID": self.taskId, - "logFile": "%s.job.log.tgz" % job_name, + "logFile": f"{job_name}.job.log.tgz", } } )