Skip to content

Commit

Permalink
style: 🎨 Black formatted.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhoadesScholar committed Jan 26, 2024
1 parent d45d741 commit c2d84b7
Show file tree
Hide file tree
Showing 40 changed files with 721 additions and 831 deletions.
30 changes: 15 additions & 15 deletions daisy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from __future__ import absolute_import
from .block import Block, BlockStatus # noqa
from .blocks import expand_roi_to_grid # noqa
from .blocks import expand_write_roi_to_grid # noqa
from .client import Client # noqa
from .context import Context # noqa
from .convenience import run_blockwise # noqa
from .coordinate import Coordinate # noqa
from .dependency_graph import DependencyGraph, BlockwiseDependencyGraph # noqa
from .logging import get_worker_log_basename, redirect_stdouterr # noqa
from .roi import Roi # noqa
from .scheduler import Scheduler # noqa
from .server import Server # noqa
from .task import Task # noqa
from .worker import Worker # noqa
from .worker_pool import WorkerPool # noqa
from .block import Block, BlockStatus # noqa
from .blocks import expand_roi_to_grid # noqa
from .blocks import expand_write_roi_to_grid # noqa
from .client import Client # noqa
from .context import Context # noqa
from .convenience import run_blockwise # noqa
from .coordinate import Coordinate # noqa
from .dependency_graph import DependencyGraph, BlockwiseDependencyGraph # noqa
from .logging import get_worker_log_basename, redirect_stdouterr # noqa
from .roi import Roi # noqa
from .scheduler import Scheduler # noqa
from .server import Server # noqa
from .task import Task # noqa
from .worker import Worker # noqa
from .worker_pool import WorkerPool # noqa
14 changes: 2 additions & 12 deletions daisy/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,8 @@ class Block(Freezable):
The id of the Task that this block belongs to. Defaults to None.
"""
def __init__(
self,
total_roi,
read_roi,
write_roi,
block_id=None,
task_id=None):

def __init__(self, total_roi, read_roi, write_roi, block_id=None, task_id=None):
self.read_roi = read_roi
self.write_roi = write_roi
self.requested_write_roi = write_roi # save original write_roi
Expand All @@ -88,21 +82,17 @@ def __init__(
self.freeze()

def copy(self):

return copy.deepcopy(self)

def __compute_block_id(self, total_roi, write_roi, shift=None):
block_index = (
write_roi.offset - total_roi.offset
) / write_roi.shape
block_index = (write_roi.offset - total_roi.offset) / write_roi.shape

# block_id will be the cantor number for this block index
block_id = int(cantor_number(block_index))

return block_id

def __repr__(self):

return "%s/%d with read ROI %s and write ROI %s" % (
self.block_id[0],
self.block_id[1],
Expand Down
34 changes: 16 additions & 18 deletions daisy/block_bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,51 @@


class BlockLog:

def __init__(self, block, stream):
self.block = block
self.stream = stream
self.time_sent = time.time()


class BlockBookkeeper:

def __init__(self, processing_timeout=None):
self.processing_timeout = processing_timeout
self.sent_blocks = {}

def notify_block_sent(self, block, stream):
'''Notify the bookkeeper that a block has been sent to a client (i.e.,
a stream to the client).'''
"""Notify the bookkeeper that a block has been sent to a client (i.e.,
a stream to the client)."""

assert block.block_id not in self.sent_blocks, \
f"Attempted to send block {block}, although it is already being " \
assert block.block_id not in self.sent_blocks, (
f"Attempted to send block {block}, although it is already being "
f"processed by {self.sent_blocks[block.block_id].stream}"
)

self.sent_blocks[block.block_id] = BlockLog(block, stream)

def notify_block_returned(self, block, stream):
'''Notify the bookkeeper that a block was returned.'''
"""Notify the bookkeeper that a block was returned."""

assert block.block_id in self.sent_blocks, \
f"Block {block} was returned by {stream}, but is not in list " \
assert block.block_id in self.sent_blocks, (
f"Block {block} was returned by {stream}, but is not in list "
"of sent blocks"
)

log = self.sent_blocks[block.block_id]
block.started_processing = log.time_sent
block.stopped_processing = time.time()

assert stream == log.stream, \
f"Block {block} was returned by {stream}, but was sent to" \
f"{log.stream}"
assert stream == log.stream, (
f"Block {block} was returned by {stream}, but was sent to" f"{log.stream}"
)

del self.sent_blocks[block.block_id]

def is_valid_return(self, block, stream):
'''Check whether the block from the given client (i.e., stream) is
"""Check whether the block from the given client (i.e., stream) is
expected to be returned from this client. This is to avoid double
returning blocks that have already been returned as lost blocks, but
still come back from the client due to race conditions.'''
still come back from the client due to race conditions."""

# block was never sent or already returned
if block.block_id not in self.sent_blocks:
Expand All @@ -59,14 +59,13 @@ def is_valid_return(self, block, stream):
return True

def get_lost_blocks(self):
'''Return a list of blocks that were sent and are lost, either because
"""Return a list of blocks that were sent and are lost, either because
the stream to the client closed or the processing timed out. Those
blocks are removed from the sent-list with the call of this
function.'''
function."""

lost_block_ids = []
for block_id, log in self.sent_blocks.items():

# is the stream to the client still alive?
if log.stream.closed():
lost_block_ids.append(block_id)
Expand All @@ -78,7 +77,6 @@ def get_lost_blocks(self):

lost_blocks = []
for block_id in lost_block_ids:

lost_block = self.sent_blocks[block_id].block
lost_blocks.append(lost_block)
del self.sent_blocks[block_id]
Expand Down
80 changes: 20 additions & 60 deletions daisy/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def compute_level_conflicts(self):
prev_level_offset = None

for level, level_offset in enumerate(self.level_offsets):

# get conflicts to previous level
if prev_level_offset is not None and self.read_write_conflict:
conflict_offsets = self.get_conflict_offsets(
Expand All @@ -121,13 +120,11 @@ def compute_level_conflicts(self):
return level_conflict_offsets

def create_dependency_graph(self):

blocks = []

for level_offset, level_conflicts in zip(
self.level_offsets, self.level_conflicts
):

# all block offsets of the current level (relative to total ROI
# start)
block_dim_offsets = [
Expand All @@ -138,16 +135,11 @@ def create_dependency_graph(self):
]
# TODO: can we do this part lazily? This might be a lot of
# Coordinates
block_offsets = [
Coordinate(o)
for o in product(*block_dim_offsets)]
block_offsets = [Coordinate(o) for o in product(*block_dim_offsets)]

# convert to global coordinates
block_offsets = [
o + (
self.total_roi.get_begin() -
self.block_read_roi.get_begin()
)
o + (self.total_roi.get_begin() - self.block_read_roi.get_begin())
for o in block_offsets
]

Expand All @@ -173,12 +165,8 @@ def compute_level_stride(self):
self.block_write_roi
), "Read ROI must contain write ROI."

context_ul = (
self.block_write_roi.get_begin() -
self.block_read_roi.get_begin())
context_lr = (
self.block_read_roi.get_end() -
self.block_write_roi.get_end())
context_ul = self.block_write_roi.get_begin() - self.block_read_roi.get_begin()
context_lr = self.block_read_roi.get_end() - self.block_write_roi.get_end()

max_context = Coordinate(
(max(ul, lr) for ul, lr in zip(context_ul, context_lr))
Expand All @@ -195,14 +183,14 @@ def compute_level_stride(self):
# to avoid overlapping write ROIs, increase the stride to the next
# multiple of write shape
write_shape = self.block_write_roi.get_shape()
level_stride = Coordinate((
((level - 1) // w + 1) * w
for level, w in zip(min_level_stride, write_shape)
))
level_stride = Coordinate(
(
((level - 1) // w + 1) * w
for level, w in zip(min_level_stride, write_shape)
)
)

logger.debug(
"final level stride (multiples of write size) is %s",
level_stride)
logger.debug("final level stride (multiples of write size) is %s", level_stride)

return level_stride

Expand All @@ -219,26 +207,16 @@ def compute_level_offsets(self):
)

dim_offsets = [
range(0, e, step)
for e, step in zip(self.level_stride, write_stride)
range(0, e, step) for e, step in zip(self.level_stride, write_stride)
]

level_offsets = list(
reversed([
Coordinate(o)
for o in product(*dim_offsets)
])
)
level_offsets = list(reversed([Coordinate(o) for o in product(*dim_offsets)]))

logger.debug("level offsets: %s", level_offsets)

return level_offsets

def get_conflict_offsets(
self,
level_offset,
prev_level_offset,
level_stride):
def get_conflict_offsets(self, level_offset, prev_level_offset, level_stride):
"""Get the offsets to all previous level blocks that are in conflict
with the current level blocks."""

Expand All @@ -250,22 +228,15 @@ def get_conflict_offsets(
for op, ls in zip(offset_to_prev, level_stride)
]

conflict_offsets = [
Coordinate(o)
for o in product(*conflict_dim_offsets)
]
logger.debug(
"conflict offsets to previous level: %s",
conflict_offsets)
conflict_offsets = [Coordinate(o) for o in product(*conflict_dim_offsets)]
logger.debug("conflict offsets to previous level: %s", conflict_offsets)

return conflict_offsets

def enumerate_dependencies(self, conflict_offsets, block_offsets):

inclusion_criteria = {
"valid": lambda b: self.total_roi.contains(b.read_roi),
"overhang": lambda b: self.total_roi.contains(
b.write_roi.get_begin()),
"overhang": lambda b: self.total_roi.contains(b.write_roi.get_begin()),
"shrink": lambda b: self.shrink_possible(b),
}[self.fit]

Expand All @@ -278,7 +249,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
blocks = []

for block_offset in block_offsets:

# create a block shifted by the current offset
block = Block(
self.total_roi,
Expand All @@ -294,7 +264,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
# get all blocks in conflict with the current block
conflicts = []
for conflict_offset in conflict_offsets:

conflict = Block(
self.total_roi,
block.read_roi + conflict_offset,
Expand All @@ -314,7 +283,6 @@ def enumerate_dependencies(self, conflict_offsets, block_offsets):
return blocks

def shrink_possible(self, block):

if not self.total_roi.contains(block.write_roi.get_begin()):
return False

Expand Down Expand Up @@ -388,10 +356,7 @@ def get_subgraph_blocks(self, sub_roi):

def expand_roi_to_grid(sub_roi, total_roi, read_roi, write_roi):
"""Expands given roi so that its write region is aligned to write_roi"""
offset = (
write_roi.get_begin() +
total_roi.get_begin() -
read_roi.get_begin())
offset = write_roi.get_begin() + total_roi.get_begin() - read_roi.get_begin()

begin = sub_roi.get_begin() - offset
end = sub_roi.get_end() - offset
Expand All @@ -405,10 +370,7 @@ def expand_roi_to_grid(sub_roi, total_roi, read_roi, write_roi):

def expand_request_roi_to_grid(req_roi, total_roi, read_roi, write_roi):
"""Expands given roi so that its write region is aligned to write_roi"""
offset = (
write_roi.get_begin() +
total_roi.get_begin() -
read_roi.get_begin())
offset = write_roi.get_begin() + total_roi.get_begin() - read_roi.get_begin()

begin = req_roi.get_begin() - offset
end = req_roi.get_end() - offset
Expand All @@ -430,7 +392,5 @@ def expand_write_roi_to_grid(roi, write_roi):
-(-roi.get_end() // write_roi.get_shape()),
) # `ceildiv`

roi = Roi(
roi[0] * write_roi.get_shape(),
(roi[1] - roi[0]) * write_roi.get_shape())
roi = Roi(roi[0] * write_roi.get_shape(), (roi[1] - roi[0]) * write_roi.get_shape())
return roi
Loading

0 comments on commit c2d84b7

Please sign in to comment.