Skip to content

Commit

Permalink
style: 🎨 Black formatted.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhoadesScholar committed Feb 23, 2024
1 parent af675f9 commit 32a2715
Show file tree
Hide file tree
Showing 40 changed files with 793 additions and 777 deletions.
30 changes: 15 additions & 15 deletions daisy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from __future__ import absolute_import
from .block import Block, BlockStatus # noqa
from .blocks import expand_roi_to_grid # noqa
from .blocks import expand_write_roi_to_grid # noqa
from .client import Client # noqa
from .context import Context # noqa
from .convenience import run_blockwise # noqa
from .coordinate import Coordinate # noqa
from .dependency_graph import DependencyGraph, BlockwiseDependencyGraph # noqa
from .logging import get_worker_log_basename, redirect_stdouterr # noqa
from .roi import Roi # noqa
from .scheduler import Scheduler # noqa
from .server import Server # noqa
from .task import Task # noqa
from .worker import Worker # noqa
from .worker_pool import WorkerPool # noqa
from .block import Block, BlockStatus # noqa
from .blocks import expand_roi_to_grid # noqa
from .blocks import expand_write_roi_to_grid # noqa
from .client import Client # noqa
from .context import Context # noqa
from .convenience import run_blockwise # noqa
from .coordinate import Coordinate # noqa
from .dependency_graph import DependencyGraph, BlockwiseDependencyGraph # noqa
from .logging import get_worker_log_basename, redirect_stdouterr # noqa
from .roi import Roi # noqa
from .scheduler import Scheduler # noqa
from .server import Server # noqa
from .task import Task # noqa
from .worker import Worker # noqa
from .worker_pool import WorkerPool # noqa
13 changes: 3 additions & 10 deletions daisy/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,8 @@ class Block(Freezable):
The id of the Task that this block belongs to. Defaults to None.
"""
def __init__(
self,
total_roi,
read_roi,
write_roi,
block_id=None,
task_id=None):

def __init__(self, total_roi, read_roi, write_roi, block_id=None, task_id=None):

self.read_roi = read_roi
self.write_roi = write_roi
Expand All @@ -92,9 +87,7 @@ def copy(self):
return copy.deepcopy(self)

def __compute_block_id(self, total_roi, write_roi, shift=None):
block_index = (
write_roi.offset - total_roi.offset
) / write_roi.shape
block_index = (write_roi.offset - total_roi.offset) / write_roi.shape

# block_id will be the cantor number for this block index
block_id = int(cantor_number(block_index))
Expand Down
30 changes: 16 additions & 14 deletions daisy/block_bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,39 @@ def __init__(self, processing_timeout=None):
self.sent_blocks = {}

def notify_block_sent(self, block, stream):
'''Notify the bookkeeper that a block has been sent to a client (i.e.,
a stream to the client).'''
"""Notify the bookkeeper that a block has been sent to a client (i.e.,
a stream to the client)."""

assert block.block_id not in self.sent_blocks, \
f"Attempted to send block {block}, although it is already being " \
assert block.block_id not in self.sent_blocks, (
f"Attempted to send block {block}, although it is already being "
f"processed by {self.sent_blocks[block.block_id].stream}"
)

self.sent_blocks[block.block_id] = BlockLog(block, stream)

def notify_block_returned(self, block, stream):
'''Notify the bookkeeper that a block was returned.'''
"""Notify the bookkeeper that a block was returned."""

assert block.block_id in self.sent_blocks, \
f"Block {block} was returned by {stream}, but is not in list " \
assert block.block_id in self.sent_blocks, (
f"Block {block} was returned by {stream}, but is not in list "
"of sent blocks"
)

log = self.sent_blocks[block.block_id]
block.started_processing = log.time_sent
block.stopped_processing = time.time()

assert stream == log.stream, \
f"Block {block} was returned by {stream}, but was sent to" \
f"{log.stream}"
assert stream == log.stream, (
f"Block {block} was returned by {stream}, but was sent to" f"{log.stream}"
)

del self.sent_blocks[block.block_id]

def is_valid_return(self, block, stream):
'''Check whether the block from the given client (i.e., stream) is
"""Check whether the block from the given client (i.e., stream) is
expected to be returned from this client. This is to avoid double
returning blocks that have already been returned as lost blocks, but
still come back from the client due to race conditions.'''
still come back from the client due to race conditions."""

# block was never sent or already returned
if block.block_id not in self.sent_blocks:
Expand All @@ -59,10 +61,10 @@ def is_valid_return(self, block, stream):
return True

def get_lost_blocks(self):
'''Return a list of blocks that were sent and are lost, either because
"""Return a list of blocks that were sent and are lost, either because
the stream to the client closed or the processing timed out. Those
blocks are removed from the sent-list with the call of this
function.'''
function."""

lost_block_ids = []
for block_id, log in self.sent_blocks.items():
Expand Down
73 changes: 20 additions & 53 deletions daisy/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,11 @@ def create_dependency_graph(self):
]
# TODO: can we do this part lazily? This might be a lot of
# Coordinates
block_offsets = [
Coordinate(o)
for o in product(*block_dim_offsets)]
block_offsets = [Coordinate(o) for o in product(*block_dim_offsets)]

# convert to global coordinates
block_offsets = [
o + (
self.total_roi.get_begin() -
self.block_read_roi.get_begin()
)
o + (self.total_roi.get_begin() - self.block_read_roi.get_begin())
for o in block_offsets
]

Expand All @@ -173,12 +168,8 @@ def compute_level_stride(self):
self.block_write_roi
), "Read ROI must contain write ROI."

context_ul = (
self.block_write_roi.get_begin() -
self.block_read_roi.get_begin())
context_lr = (
self.block_read_roi.get_end() -
self.block_write_roi.get_end())
context_ul = self.block_write_roi.get_begin() - self.block_read_roi.get_begin()
context_lr = self.block_read_roi.get_end() - self.block_write_roi.get_end()

max_context = Coordinate(
(max(ul, lr) for ul, lr in zip(context_ul, context_lr))
Expand All @@ -195,14 +186,14 @@ def compute_level_stride(self):
# to avoid overlapping write ROIs, increase the stride to the next
# multiple of write shape
write_shape = self.block_write_roi.get_shape()
level_stride = Coordinate((
((level - 1) // w + 1) * w
for level, w in zip(min_level_stride, write_shape)
))
level_stride = Coordinate(
(
((level - 1) // w + 1) * w
for level, w in zip(min_level_stride, write_shape)
)
)

logger.debug(
"final level stride (multiples of write size) is %s",
level_stride)
logger.debug("final level stride (multiples of write size) is %s", level_stride)

return level_stride

Expand All @@ -219,26 +210,16 @@ def compute_level_offsets(self):
)

dim_offsets = [
range(0, e, step)
for e, step in zip(self.level_stride, write_stride)
range(0, e, step) for e, step in zip(self.level_stride, write_stride)
]

level_offsets = list(
reversed([
Coordinate(o)
for o in product(*dim_offsets)
])
)
level_offsets = list(reversed([Coordinate(o) for o in product(*dim_offsets)]))

logger.debug("level offsets: %s", level_offsets)

return level_offsets

def get_conflict_offsets(
self,
level_offset,
prev_level_offset,
level_stride):
def get_conflict_offsets(self, level_offset, prev_level_offset, level_stride):
"""Get the offsets to all previous level blocks that are in conflict
with the current level blocks."""

Expand All @@ -250,22 +231,16 @@ def get_conflict_offsets(
for op, ls in zip(offset_to_prev, level_stride)
]

conflict_offsets = [
Coordinate(o)
for o in product(*conflict_dim_offsets)
]
logger.debug(
"conflict offsets to previous level: %s",
conflict_offsets)
conflict_offsets = [Coordinate(o) for o in product(*conflict_dim_offsets)]
logger.debug("conflict offsets to previous level: %s", conflict_offsets)

return conflict_offsets

def enumerate_dependencies(self, conflict_offsets, block_offsets):

inclusion_criteria = {
"valid": lambda b: self.total_roi.contains(b.read_roi),
"overhang": lambda b: self.total_roi.contains(
b.write_roi.get_begin()),
"overhang": lambda b: self.total_roi.contains(b.write_roi.get_begin()),
"shrink": lambda b: self.shrink_possible(b),
}[self.fit]

Expand Down Expand Up @@ -388,10 +363,7 @@ def get_subgraph_blocks(self, sub_roi):

def expand_roi_to_grid(sub_roi, total_roi, read_roi, write_roi):
"""Expands given roi so that its write region is aligned to write_roi"""
offset = (
write_roi.get_begin() +
total_roi.get_begin() -
read_roi.get_begin())
offset = write_roi.get_begin() + total_roi.get_begin() - read_roi.get_begin()

begin = sub_roi.get_begin() - offset
end = sub_roi.get_end() - offset
Expand All @@ -405,10 +377,7 @@ def expand_roi_to_grid(sub_roi, total_roi, read_roi, write_roi):

def expand_request_roi_to_grid(req_roi, total_roi, read_roi, write_roi):
"""Expands given roi so that its write region is aligned to write_roi"""
offset = (
write_roi.get_begin() +
total_roi.get_begin() -
read_roi.get_begin())
offset = write_roi.get_begin() + total_roi.get_begin() - read_roi.get_begin()

begin = req_roi.get_begin() - offset
end = req_roi.get_end() - offset
Expand All @@ -430,7 +399,5 @@ def expand_write_roi_to_grid(roi, write_roi):
-(-roi.get_end() // write_roi.get_shape()),
) # `ceildiv`

roi = Roi(
roi[0] * write_roi.get_shape(),
(roi[1] - roi[0]) * write_roi.get_shape())
roi = Roi(roi[0] * write_roi.get_shape(), (roi[1] - roi[0]) * write_roi.get_shape())
return roi
49 changes: 27 additions & 22 deletions daisy/cl_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@


class TqdmLoggingHandler:
'''A logging handler that uses ``tqdm.tqdm.write`` in ``emit()``, such that
"""A logging handler that uses ``tqdm.tqdm.write`` in ``emit()``, such that
logging doesn't interfere with tqdm's progress bar.
Heavily inspired by the fantastic
https://github.com/EpicWink/tqdm-logging-wrapper/
'''
"""

def __init__(self, handler):
self.handler = handler
Expand Down Expand Up @@ -42,8 +42,10 @@ def __init__(self, block, exception, worker_id):
self.worker_id = worker_id

def __repr__(self):
return f"block {self.block.block_id[1]} in worker " \
return (
f"block {self.block.block_id[1]} in worker "
f"{self.worker_id} with exception {repr(self.exception)}"
)


class CLMonitor(ServerObserver):
Expand All @@ -56,13 +58,13 @@ def __init__(self, server):
self._wrap_logging_handlers()

def _wrap_logging_handlers(self):
'''This adds a TqdmLoggingHandler around each logging handler that has
"""This adds a TqdmLoggingHandler around each logging handler that has
a TTY stream attached to it, so that logging doesn't interfere with the
progress bar.
Heavily inspired by the fantastic
https://github.com/EpicWink/tqdm-logging-wrapper/
'''
"""

logger = logging.root
for i in range(len(logger.handlers)):
Expand All @@ -87,7 +89,8 @@ def on_block_failure(self, block, exception, context):

task_id = block.block_id[0]
self.summaries[task_id].block_failures.append(
BlockFailure(block, exception, context['worker_id']))
BlockFailure(block, exception, context["worker_id"])
)

def on_task_start(self, task_id, task_state):

Expand Down Expand Up @@ -130,8 +133,10 @@ def on_server_exit(self):
print()
state = summary.state
print(f" num blocks : {state.total_block_count}")
print(f" completed βœ”: {state.completed_count} "
f"(skipped {state.skipped_count})")
print(
f" completed βœ”: {state.completed_count} "
f"(skipped {state.skipped_count})"
)
print(f" failed βœ—: {state.failed_count}")
print(f" orphaned βˆ…: {state.orphaned_count}")
print()
Expand All @@ -151,8 +156,8 @@ def on_server_exit(self):
print()
for block_failure in summary.block_failures[:10]:
log_basename = daisy_logging.get_worker_log_basename(
block_failure.worker_id,
block_failure.block.block_id[0])
block_failure.worker_id, block_failure.block.block_id[0]
)
print(f" {log_basename}.err / .out")
if num_block_failures > 10:
print(" ...")
Expand All @@ -165,18 +170,18 @@ def _update_state(self, task_id, task_state):
if task_id not in self.progresses:
total = task_state.total_block_count
self.progresses[task_id] = tqdm_auto(
total=total,
desc=task_id + " β–Ά",
unit='blocks',
leave=True)

self.progresses[task_id].set_postfix({
'β§—': task_state.pending_count,
'β–Ά': task_state.processing_count,
'βœ”': task_state.completed_count,
'βœ—': task_state.failed_count,
'βˆ…': task_state.orphaned_count
})
total=total, desc=task_id + " β–Ά", unit="blocks", leave=True
)

self.progresses[task_id].set_postfix(
{
"β§—": task_state.pending_count,
"β–Ά": task_state.processing_count,
"βœ”": task_state.completed_count,
"βœ—": task_state.failed_count,
"βˆ…": task_state.orphaned_count,
}
)

completed = task_state.completed_count
delta = completed - self.progresses[task_id].n
Expand Down
Loading

0 comments on commit 32a2715

Please sign in to comment.