Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre-commit update and style fixes #41

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ci:

repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.3
rev: v0.8.1
hooks:
- id: ruff
args: [ --fix ]
Expand Down
6 changes: 3 additions & 3 deletions disbatch/dbMon.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def popYNC(msg, parent, inq, title='Confirm'):
if len(msgw) > h:
missing = 1 + len(msgw) - h
msgw = msgw[: h - 1]
msgw.append('%d lines elided.' % missing)
msgw.append(f'{missing:d} lines elided.')

nw = curses.newwin(h + 2, w + 2, ro, co)
nw.border()
Expand Down Expand Up @@ -198,7 +198,7 @@ def display(S, kvsc, inq):
S.clear()

if tooSmall:
S.addstr(0, 0, 'Screen must be at least %dX%d' % (MinLines, MinCols), CPRB)
S.addstr(0, 0, f'Screen must be at least {MinLines:d}X{MinCols:d}', CPRB)
else:
# Header
for r, (L, cp) in enumerate(header):
Expand Down Expand Up @@ -308,7 +308,7 @@ def display(S, kvsc, inq):
except OSError:
pass
else:
msg = 'Got unrecognized key: %d' % k
msg = f'Got unrecognized key: {k:d}'
elif tag == 'status':
engines, contexts, header, content = o
# Adjust cursor location if needed.
Expand Down
53 changes: 23 additions & 30 deletions disbatch/disBatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def killPatiently(sub, name, timeout=15):
def register(kvs, which):
# Contact the controller to be assigned an identifier via a random
# key.
key = '%d' % (10e7 * random.random())
key = str(int(10e7 * random.random()))
kvs.put('.controller', ('register', (which, key)))
return kvs.get(key)

Expand Down Expand Up @@ -183,7 +183,7 @@ class BatchContext:

def __init__(self, sysid, dbInfo, rank, nodes, cylinders, cores_per_cylinder, args, contextLabel=None):
if contextLabel is None:
contextLabel = 'context%05d' % rank
contextLabel = f'context{rank:05d}'
self.sysid = sysid
self.dbInfo = dbInfo
self.rank = rank
Expand All @@ -194,7 +194,7 @@ def __init__(self, sysid, dbInfo, rank, nodes, cylinders, cores_per_cylinder, ar
self.label = contextLabel

self.error = False # engine errors (non-zero return values)
self.kvsKey = '.context_%d' % rank
self.kvsKey = f'.context_{rank:d}'
self.retireCmd = None

def __str__(self):
Expand Down Expand Up @@ -651,7 +651,7 @@ def __init__(self, dbInfo, rank, args):
self.for_log = []

nodelist = args.ssh_node if args.ssh_node else os.getenv('DISBATCH_SSH_NODELIST')
contextLabel = args.label if args.label else 'SSH%d' % rank
contextLabel = args.label if args.label else f'SSH{rank:d}'

core_count, node_set, nodes = [], set(), []
if type(nodelist) is not str:
Expand Down Expand Up @@ -1059,7 +1059,7 @@ def _task_generator(self):
def peEndListTasks():
for when in ['START', 'STOP']:
yield TaskInfo(
peCounters[when], tsx, -1, b'#ENDLIST', '.per engine %s %d' % (when, peCounters[when]), kind='P'
peCounters[when], tsx, -1, b'#ENDLIST', f'.per engine {when:s} {peCounters[when]:d}', kind='P'
)

OK = True
Expand Down Expand Up @@ -1098,7 +1098,7 @@ def peEndListTasks():
when = when.decode('ascii')
cmd = prefix + cmd + suffix
yield TaskInfo(
peCounters[when], tsx, -1, cmd, '.per engine %s %d' % (when, peCounters[when]), kind='P'
peCounters[when], tsx, -1, cmd, f'.per engine {when:s} {peCounters[when]:d}', kind='P'
)
peCounters[when] += 1
continue
Expand Down Expand Up @@ -1281,10 +1281,10 @@ def sendNotification(self):

self.statusFile.seek(self.statusLastOffset)
mailTo = self.db_info.args.mailTo
msg = MIMEText('Last %d:\n\n' % self.db_info.args.mailFreq + self.statusFile.read())
msg['Subject'] = '%s has completed %d tasks' % (self.db_info.uniqueId, self.finished)
msg = MIMEText(f'Last {self.db_info.args.mailFreq + self.statusFile.read():d}:\n\n')
msg['Subject'] = f'{self.db_info.uniqueId:s} has completed {self.finished:d} tasks'
if self.failed:
msg['Subject'] += ' (%d failed)' % self.failed
msg['Subject'] += f' ({self.failed:d} failed)'
msg['From'] = mailTo
msg['To'] = mailTo
s = smtplib.SMTP()
Expand Down Expand Up @@ -1331,19 +1331,10 @@ def __init__(self, rank, cRank, hostname, pid, start, kvs):

def __str__(self):
return (
'Engine %d: Context %d, Host %s, PID %d, Started at %.2f, Last heard from %.2f, Cylinders %d, Assigned %d, Finished %d, Failed %d'
% (
self.rank,
self.cRank,
self.hostname,
self.pid,
self.start,
time.time() - self.last,
len(self.cylinders),
self.assigned,
self.finished,
self.failed,
)
f'Engine {self.rank:d}: Context {self.cRank:d}, Host {self.hostname:s}, PID {self.pid:d}, '
f'Started at {self.start:.2f}, Last heard from {time.time() - self.last:.2f}, '
f'Cylinders {len(self.cylinders):d}, Assigned {self.assigned:d}, Finished {self.finished:d}, '
f'Failed {self.failed:d}'
)

def addCylinder(self, pid, pgid, ckey):
Expand Down Expand Up @@ -1794,7 +1785,7 @@ def run(self):
logger.exception('Cylinder %d exception: ', self.cylinderRank)
finally:
logger.info('Cylinder %d stopping.', self.cylinderRank)
killPatiently(self.taskProc, 'cylinder %d subproc' % self.cylinderRank, 2)
killPatiently(self.taskProc, f'cylinder {self.cylinderRank:d} subproc', 2)

def main(self):
self.pid = os.getpid() # TODO: Remove
Expand Down Expand Up @@ -1824,9 +1815,9 @@ def main(self):
self.localEnv['DISBATCH_STREAM_INDEX'] = str(ti.taskStreamIndex)
self.localEnv['DISBATCH_REPEAT_INDEX'] = str(ti.taskRepIndex)
self.localEnv['DISBATCH_TASKID'] = str(ti.taskId)
self.localEnv['DISBATCH_STREAM_INDEX_ZP'] = '%06d' % ti.taskStreamIndex
self.localEnv['DISBATCH_REPEAT_INDEX_ZP'] = '%06d' % ti.taskRepIndex
self.localEnv['DISBATCH_TASKID_ZP'] = '%06d' % ti.taskId
self.localEnv['DISBATCH_STREAM_INDEX_ZP'] = f'{ti.taskStreamIndex:06d}'
self.localEnv['DISBATCH_REPEAT_INDEX_ZP'] = f'{ti.taskRepIndex:06d}'
self.localEnv['DISBATCH_TASKID_ZP'] = f'{ti.taskId:06d}'

logger.info('Cylinder %d executing %s.', self.cylinderRank, ti)
t0 = time.time()
Expand Down Expand Up @@ -1945,7 +1936,7 @@ def constantKeyGen(template):
self.hbQueue,
self.rank,
x,
self.FetchTask('.cylinder %d %d' % (self.rank, x), constantKeyGen, kvsstcp.KVSClient.get),
self.FetchTask(f'.cylinder {self.rank:d} {x:d}', constantKeyGen, kvsstcp.KVSClient.get),
)
for x in range(cylinders)
]
Expand Down Expand Up @@ -2111,7 +2102,7 @@ def main(kvsq=None):
context.setNode(args.node)
logger = logging.getLogger('DisBatch Engine')
lconf = {'format': '%(asctime)s %(levelname)-8s %(name)-15s: %(message)s', 'level': dbInfo.args.loglevel}
lconf['filename'] = '%s_%s_%s_engine_%d.log' % (dbInfo.uniqueId, context.label, args.node, rank)
lconf['filename'] = f'{dbInfo.uniqueId:s}_{context.label:s}_{args.node:s}_engine_{rank:d}.log'
logging.basicConfig(**lconf)
logger.info('Starting engine %s (%d) on %s (%d) in %s.', context.node, rank, myHostname, myPid, os.getcwd())
logger.info('argv: %r', sys.argv)
Expand Down Expand Up @@ -2350,7 +2341,9 @@ def shutdown(s=None, f=None):
forceDir = args.prefix[-1] == '/'
rp = os.path.realpath(args.prefix)
if os.path.isdir(rp):
uniqueId = rp + '/%s_disBatch_%s_%03d' % (tfn, time.strftime('%y%m%d%H%M%S'), int(random.random() * 1000))
uniqueId = rp + '/{:s}_disBatch_{:s}_{:03d}'.format(
tfn, time.strftime('%y%m%d%H%M%S'), int(random.random() * 1000)
)
else:
if not forceDir:
rpp, name = os.path.split(rp)
Expand Down Expand Up @@ -2382,7 +2375,7 @@ def shutdown(s=None, f=None):
else:
host, port = socket.gethostname(), 0
kvsst = kvsstcp.KVSServerThread(host, port)
kvsserver = '%s:%d' % kvsst.cinfo
kvsserver = '{:s}:{:d}'.format(*kvsst.cinfo)
kvsinfotxt = uniqueId + '_kvsinfo.txt'
with open(kvsinfotxt, 'w') as kvsi:
kvsi.write(kvsserver)
Expand Down
2 changes: 1 addition & 1 deletion disbatch/kvsstcp/kvsstcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def env(self, env=os.environ.copy()):
logging.basicConfig(**lconf)

t = KVSServer(args.host, args.port)
addr = '%s:%d' % t.cinfo
addr = '{:s}:{:d}'.format(*t.cinfo)
logger.info('Server running at %s.', addr)
if args.addrfile:
args.addrfile.write(addr)
Expand Down
10 changes: 7 additions & 3 deletions exampleTaskFiles/dberTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@
tid2status = db.syncTasks(tasks)
for tid in tasks:
print(
'task %d: %s returned %d, matched: %s'
% (tid, repr(tasks[tid]), tid2status[tid]['ReturnCode'], repr(tasks[tid]) == tid2status[tid]['TaskCmd'])
'task {:d}: {:s} returned {:d}, matched: {:s}'.format(
tid,
repr(tasks[tid]),
tid2status[tid]['ReturnCode'],
repr(tasks[tid]) == tid2status[tid]['TaskCmd'],
)
)

# Now try a repeat construct. Force an error for the index 112.
Expand All @@ -60,7 +64,7 @@
# Wait for one task and return its status info.
s = db.wait_one_task()
assert s['TaskId'] in target_tids
print('task %d: returned %d, "%s"' % (s['TaskId'], s['ReturnCode'], s['TaskCmd']))
print(f'task {s["TaskId"]:d}: returned {s["ReturnCode"]:d}, "{s["TaskCmd"]:s}"')

# Tell DisBatcher no more tasks are coming.
db.done()
3 changes: 2 additions & 1 deletion tests/test_slurm/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ cd - > /dev/null

trap - ERR
echo "Slurm test passed."
rm -rf $workdir
# NFS sometimes leaves stale file handles, but don't fail the test
rm -rf $workdir || true
3 changes: 2 additions & 1 deletion tests/test_ssh/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ cd - > /dev/null

trap - ERR
echo "SSH test passed."
rm -rf $workdir
# NFS sometimes leaves stale file handles, but don't fail the test
rm -rf $workdir || true
Loading