Skip to content

Commit

Permalink
implement more ruff fix
Browse files Browse the repository at this point in the history
  • Loading branch information
esseivaju committed Oct 4, 2024
1 parent ec1d818 commit ab6e2da
Show file tree
Hide file tree
Showing 15 changed files with 46 additions and 61 deletions.
8 changes: 4 additions & 4 deletions example/standalone_ray_test_hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def main(redis_ip: str, redis_port: str, redis_password: str):
redis_address = f"{redis_ip}:{redis_port}"
ray.init(
ignore_reinit_error=True,
address="%s" % redis_address,
_redis_password="%s" % redis_password,
address=f"{redis_address}",
_redis_password=f"{redis_password}",
)

# show the ray cluster
Expand Down Expand Up @@ -83,10 +83,10 @@ def main(redis_ip: str, redis_port: str, redis_password: str):
description="Wait on ray head node or workers to connect"
)
parser.add_argument(
"--redis-ip", default="%s" % (os.environ["RAYTHENA_RAY_HEAD_IP"])
"--redis-ip", default="{}".format(os.environ["RAYTHENA_RAY_HEAD_IP"])
)
parser.add_argument(
"--redis-port", default="%s" % (os.environ["RAYTHENA_RAY_REDIS_PORT"])
"--redis-port", default="{}".format(os.environ["RAYTHENA_RAY_REDIS_PORT"])
)
parser.add_argument(
"--redis-password", default=os.environ["RAYTHENA_RAY_REDIS_PASSWORD"]
Expand Down
2 changes: 1 addition & 1 deletion src/raythena/actors/esworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import Mapping, Sequence
from socket import gethostname
from time import sleep
from typing import Any, Optional, Union, tuple
from typing import Any, Optional, Union

import ray

Expand Down
2 changes: 1 addition & 1 deletion src/raythena/actors/payloads/basePayload.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, dict
from typing import Any, Optional

from raythena.utils.config import Config
from raythena.utils.eventservice import PandaJob
Expand Down
2 changes: 1 addition & 1 deletion src/raythena/actors/payloads/eventservice/esPayload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod
from collections.abc import Sequence
from typing import Optional, dict
from typing import Optional

from raythena.actors.payloads.basePayload import BasePayload
from raythena.utils.config import Config
Expand Down
2 changes: 1 addition & 1 deletion src/raythena/actors/payloads/eventservice/pilothttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from asyncio import Event, Queue, QueueEmpty
from collections.abc import Iterable, Mapping
from subprocess import DEVNULL, Popen
from typing import Callable, Optional, dict, list
from typing import Callable, Optional
from urllib.parse import parse_qs

import uvloop
Expand Down
13 changes: 4 additions & 9 deletions src/raythena/drivers/communicators/harvesterFileMessenger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import configparser
import contextlib
import json
import os
import shutil
Expand Down Expand Up @@ -133,14 +134,10 @@ def request_job(self, request: PandaJobRequest) -> None:
with open(self.jobspecfile) as f:
job = json.load(f)

try:
with contextlib.suppress(FileNotFoundError):
os.remove(self.jobrequestfile)
except FileNotFoundError:
pass
try:
with contextlib.suppress(FileNotFoundError):
os.rename(self.jobspecfile, f"{self.jobspecfile}.read")
except FileNotFoundError:
pass
if job:
self.job_queue.put(job)

Expand Down Expand Up @@ -189,10 +186,8 @@ def request_event_ranges(self, request: EventRangeRequest) -> None:
):
self.ranges_requests_count += 1

try:
with contextlib.suppress(FileNotFoundError):
os.remove(self.eventrequestfile)
except FileNotFoundError:
pass

self.ranges_requests_count += 1
self.event_ranges_queue.put(ranges)
Expand Down
3 changes: 0 additions & 3 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
from typing import (
Any,
Optional,
dict,
list,
tuple,
)

import ray
Expand Down
13 changes: 3 additions & 10 deletions src/raythena/utils/bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
Any,
Optional,
Union,
dict,
list,
set,
tuple,
)

from raythena.utils.config import Config
Expand Down Expand Up @@ -280,7 +276,7 @@ def _set_file_merged(
self._status[TaskStatus.MERGED][inputfile] = merged_dict
for merged_outputfile in self._status[TaskStatus.MERGING][
inputfile
].keys():
]:
merged_dict[merged_outputfile] = {
"path": os.path.join(
self.merged_files_dir, merged_outputfile
Expand Down Expand Up @@ -462,7 +458,7 @@ def _cleaner_thead_run(self):
for task_status in self.taskstatus.values():
for merged_file in task_status._status[
TaskStatus.MERGED
].keys():
]:
if self.stop_cleaner.is_set():
break
for temp_file in files:
Expand Down Expand Up @@ -709,10 +705,7 @@ def _generate_event_ranges(self, job: PandaJob, task_status: TaskStatus):
guids = job["GUID"].split(",")
for file, guid in zip(files, guids):
self.files_guids[file] = guid
if "scopeIn" in job:
scope = job["scopeIn"]
else:
scope = ""
scope = job.get("scopeIn", "")
event_ranges = []
merged_files = task_status._status[TaskStatus.MERGED]
merging_files = task_status._status[TaskStatus.MERGING]
Expand Down
20 changes: 11 additions & 9 deletions src/raythena/utils/eventservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
Any,
Optional,
Union,
dict,
list,
set,
)

# Types aliases
Expand Down Expand Up @@ -112,7 +109,7 @@ def __setitem__(self, k: str, v: "PandaJob") -> None:
if isinstance(v, PandaJob):
self.jobs[k] = v
else:
raise Exception(f"{v} is not of type {PandaJob}")
raise ValueError(f"{v} is not of type {PandaJob}")

def __iter__(self) -> Iterable[str]:
return iter(self.jobs)
Expand Down Expand Up @@ -368,9 +365,9 @@ def __getitem__(self, k: str) -> "EventRange":

def __setitem__(self, k: str, v: "EventRange") -> None:
if not isinstance(v, EventRange):
raise Exception(f"{v} should be of type {EventRange}")
raise ValueError(f"{v} should be of type {EventRange}")
if k != v.eventRangeID:
raise Exception(
raise KeyError(
f"Specified key '{k}' should be equals to the event range id '{v.eventRangeID}' "
)
if k in self.event_ranges_by_id:
Expand Down Expand Up @@ -416,7 +413,7 @@ def update_range_state(self, range_id: str, new_state: str) -> "EventRange":
the updated event range
"""
if range_id not in self.event_ranges_by_id:
raise Exception(
raise KeyError(
f"Trying to update non-existing eventrange {range_id}"
)

Expand Down Expand Up @@ -718,7 +715,7 @@ def __init__(
else:
for v in range_update.values():
if not isinstance(v, list):
raise Exception(f"Expecting type list for element {v}")
raise ValueError(f"Expecting type list for element {v}")
self.range_update: dict[str, HarvesterEventRangeUpdateDef] = (
range_update
)
Expand All @@ -737,7 +734,7 @@ def __getitem__(self, k: str) -> HarvesterEventRangeUpdateDef:

def __setitem__(self, k: str, v: HarvesterEventRangeUpdateDef) -> None:
if not isinstance(v, list):
raise Exception(f"Expecting type list for element {v}")
raise ValueError(f"Expecting type list for element {v}")
self.range_update[k] = v

def merge_update(self, other: "EventRangeUpdate") -> None:
Expand Down Expand Up @@ -1062,6 +1059,11 @@ def get_id(self) -> str:
"""
return self["PandaID"]

def get(self, k: str, default: Any = "") -> Builtin:
if k in self.job:
return self.job[k]
return default

def __str__(self) -> str:
return json.dumps(self.job)

Expand Down
2 changes: 1 addition & 1 deletion src/raythena/utils/ray.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Mapping
from typing import Any, list
from typing import Any

import ray

Expand Down
2 changes: 1 addition & 1 deletion src/raythena/utils/timing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import time
from threading import Event
from typing import Any, Union, dict, list
from typing import Any, Union

import psutil

Expand Down
16 changes: 7 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def is_eventservice(request):
@pytest.fixture
def pandaids(njobs):
res = []
for i in range(njobs):
for _ in range(njobs):
hash = hashlib.md5()
hash.update(str(time.time()).encode("utf-8"))
res.append(hash.hexdigest())
Expand Down Expand Up @@ -223,14 +223,13 @@ def sample_multijobs(
"destinationDblock": job_name,
"dispatchDBlockToken": "NULL",
"jobPars": (
'--eventService=%s --skipEvents=0 --firstEvent=1 --preExec "from AthenaCommon.DetFlags '
f'--eventService={str(is_eventservice)} --skipEvents=0 --firstEvent=1 --preExec "from AthenaCommon.DetFlags '
"import DetFlags;DetFlags.ID_setOn();DetFlags.Calo_setOff();"
'DetFlags.Muon_setOff();DetFlags.Lucid_setOff();DetFlags.Truth_setOff() "'
"--athenaopts=--preloadlib=${ATLASMKLLIBDIR_PRELOAD}/libimf.so "
"--preInclude sim:SimulationJobOptions/preInclude.FrozenShowersFCalOnly.py,SimulationJobOptions/preInclude.BeamPipeKill.py "
"--geometryVersion ATLAS-R2-2016-01-00-00_VALIDATION --physicsList QGSP_BERT --randomSeed 1234 --conditionsTag OFLCOND-MC12-SIM-00 "
"--maxEvents=-1 --inputEvgenFile %s --outputHitsFile HITS_%s.pool.root)"
% (str(is_eventservice), inFiles, outFilesShort)
f"--maxEvents=-1 --inputEvgenFile {inFiles} --outputHitsFile HITS_{outFilesShort}.pool.root)"
),
"attemptNr": 0,
"swRelease": "Atlas-21.0.15",
Expand All @@ -247,7 +246,7 @@ def sample_multijobs(
"jobName": job_name,
"ddmEndPointIn": "UTA_SWT2_DATADISK",
"taskID": taskId,
"logFile": "%s.job.log.tgz" % job_name,
"logFile": f"{job_name}.job.log.tgz",
}
return res

Expand Down Expand Up @@ -315,14 +314,13 @@ def sample_job(
"destinationDblock": job_name,
"dispatchDBlockToken": "NULL",
"jobPars": (
'--eventService=%s --skipEvents=0 --firstEvent=1 --preExec "from AthenaCommon.DetFlags '
f'--eventService={str(is_eventservice)} --skipEvents=0 --firstEvent=1 --preExec "from AthenaCommon.DetFlags '
"import DetFlags;DetFlags.ID_setOn();DetFlags.Calo_setOff();"
'DetFlags.Muon_setOff();DetFlags.Lucid_setOff();DetFlags.Truth_setOff() "'
"--athenaopts=--preloadlib=${ATLASMKLLIBDIR_PRELOAD}/libimf.so "
"--preInclude sim:SimulationJobOptions/preInclude.FrozenShowersFCalOnly.py,SimulationJobOptions/preInclude.BeamPipeKill.py "
"--geometryVersion ATLAS-R2-2016-01-00-00_VALIDATION --physicsList QGSP_BERT --randomSeed 1234 --conditionsTag OFLCOND-MC12-SIM-00 "
"--maxEvents=-1 --inputEvgenFile %s --outputHitsFile HITS_%s.pool.root)"
% (str(is_eventservice), inFiles, outFilesShort)
f"--maxEvents=-1 --inputEvgenFile {inFiles} --outputHitsFile HITS_{outFilesShort}.pool.root)"
),
"attemptNr": 0,
"swRelease": "Atlas-21.0.15",
Expand All @@ -339,6 +337,6 @@ def sample_job(
"jobName": job_name,
"ddmEndPointIn": "UTA_SWT2_DATADISK",
"taskID": taskId,
"logFile": "%s.job.log.tgz" % job_name,
"logFile": f"{job_name}.job.log.tgz",
}
}
4 changes: 2 additions & 2 deletions tests/harvester/test_harvesterMock.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def test_get_ranges(
ranges = ranges_queue.get(timeout=5)
assert ranges is not None
assert isinstance(ranges, dict)
for pandaID, job_ranges in ranges.items():
for _pandaID, job_ranges in ranges.items():
assert len(job_ranges) == n_events

# should return 0 ranges per job
request_queue.put(evnt_request)
ranges = ranges_queue.get(timeout=5)
assert ranges is not None
assert isinstance(ranges, dict)
for pandaID, job_ranges in ranges.items():
for _pandaID, job_ranges in ranges.items():
assert len(job_ranges) == 0
6 changes: 3 additions & 3 deletions tests/test_bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_assign_job_to_actor(
actor_id = "a1"
if not is_eventservice:
job = None
for i in range(njobs):
for _ in range(njobs):
job_tmp = bookKeeper.assign_job_to_actor(actor_id)
if job:
assert job["PandaID"] != job_tmp["PandaID"]
Expand All @@ -39,7 +39,7 @@ def test_assign_job_to_actor(
else:
bookKeeper.add_event_ranges(sample_ranges)
job = None
for i in range(njobs):
for _ in range(njobs):
job_tmp = bookKeeper.assign_job_to_actor(actor_id)
if job:
assert job["PandaID"] == job_tmp["PandaID"]
Expand Down Expand Up @@ -129,7 +129,7 @@ def __inner__(range_update, failed=False):
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)

for i in range(njobs):
for _ in range(njobs):
job = bookKeeper.assign_job_to_actor(actor_id)
_ = bookKeeper.fetch_event_ranges(actor_id, nevents)
print(job.event_ranges_queue.rangesID_by_state)
Expand Down
12 changes: 6 additions & 6 deletions tests/test_eventservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ def test_build_range_update(
and "fsize" not in r
)

with pytest.raises(Exception):
with pytest.raises(ValueError):
ranges_update.range_update[pandaID] = None
EventRangeUpdate(ranges_update.range_update)

with pytest.raises(Exception):
with pytest.raises(ValueError):
ranges_update[pandaID] = None
ranges_update[pandaID] = []
assert not ranges_update[pandaID]
Expand All @@ -115,7 +115,7 @@ def test_new(self, nevents, sample_job, sample_ranges):
== 0
)

with pytest.raises(Exception):
with pytest.raises(ValueError):
ranges_queue["key"] = None

ranges_queue_2 = EventRangeQueue()
Expand Down Expand Up @@ -179,7 +179,7 @@ def test_update(
assert ranges_queue.nranges_assigned() == 0
assert ranges_queue.nranges_remaining() == 0

with pytest.raises(Exception):
with pytest.raises(KeyError):
ranges_queue.update_range_state("unknown", EventRange.ASSIGNED)

def test_get_next(self, sample_job, sample_ranges):
Expand Down Expand Up @@ -269,7 +269,7 @@ def test_build_pandajob_queue(
if is_eventservice:
assert job
else:
for i in range(1, njobs):
for _ in range(1, njobs):
next_job = pandajob_queue.next_job_to_process()
assert job["PandaID"] != next_job["PandaID"]
job = next_job
Expand All @@ -279,7 +279,7 @@ def test_build_pandajob_queue(
assert isinstance(event_ranges, EventRangeQueue)
assert len(event_ranges) == 0
assert pandajob_queue.has_job(pandaID)
with pytest.raises(Exception):
with pytest.raises(ValueError):
pandajob_queue[pandaID] = None

pandajob_queue_2 = PandaJobQueue()
Expand Down

0 comments on commit ab6e2da

Please sign in to comment.