Skip to content

Commit

Permalink
add method to downsample single Mag (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
rschwanhold authored Apr 22, 2021
1 parent be6deed commit 7a24d71
Showing 1 changed file with 88 additions and 49 deletions.
137 changes: 88 additions & 49 deletions wkcuber/api/Layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,25 @@ def _initialize_mag_from_other_mag(
raise NotImplemented

def _pad_existing_mags_for_downsampling(
self, from_mag: Mag, max_mag: Mag, scale: Optional[Tuple[float, float, float]]
self,
from_mag: Mag,
max_mag: Mag,
scale: Optional[Tuple[float, float, float]],
only_max_mag: bool = False,
) -> None:
# pad all existing mags if necessary
# during each downsampling step, the data shape or offset of the new mag should never need to be rounded
existing_mags = sorted([Mag(mag) for mag in self.mags.keys()])
all_mags_after_downsampling = existing_mags.copy()
cur_mag = get_next_mag(from_mag, scale)
while cur_mag <= max_mag:
all_mags_after_downsampling += [cur_mag]
cur_mag = get_next_mag(cur_mag, scale)
if only_max_mag:
all_mags_after_downsampling += [max_mag]
else:
cur_mag = get_next_mag(from_mag, scale)
while cur_mag <= max_mag:
all_mags_after_downsampling += [cur_mag]
cur_mag = get_next_mag(cur_mag, scale)

all_mags_after_downsampling = sorted(all_mags_after_downsampling)

bb_mag1 = BoundingBox(
topleft=self.dataset.properties.data_layers[
Expand Down Expand Up @@ -216,63 +225,93 @@ def downsample(

self._pad_existing_mags_for_downsampling(from_mag, max_mag, scale)

parsed_interpolation_mode = parse_interpolation_mode(
interpolation_mode, self.dataset.properties.data_layers[self.name].category
)
prev_mag = from_mag
target_mag = get_next_mag(prev_mag, scale)

while target_mag <= max_mag:
assert prev_mag < target_mag
assert target_mag.to_layer_name() not in self.mags
self.downsample_mag(
from_mag=prev_mag,
target_mag=target_mag,
interpolation_mode=interpolation_mode,
compress=compress,
buffer_edge_len=buffer_edge_len,
pad_data=False,
args=args,
)

prev_mag_ds = self.mags[prev_mag.to_layer_name()]
prev_mag = target_mag
target_mag = get_next_mag(target_mag, scale)

mag_factors = [
t // s for (t, s) in zip(target_mag.to_array(), prev_mag.to_array())
]
def downsample_mag(
self,
from_mag: Mag,
target_mag: Mag,
interpolation_mode: str,
compress: bool,
buffer_edge_len: Optional[int] = None,
pad_data: bool = True,
args: Optional[Namespace] = None,
) -> None:
assert (
from_mag.to_layer_name() in self.mags.keys()
), f"Failed to downsample data. The from_mag ({from_mag}) does not exist."

# initialize the new mag
target_mag_ds = self._initialize_mag_from_other_mag(
target_mag, prev_mag_ds, compress
if pad_data:
self._pad_existing_mags_for_downsampling(
from_mag, target_mag, None, only_max_mag=True
)

# Get target view
target_mag_view = target_mag_ds.get_view(is_bounded=not compress)
parsed_interpolation_mode = parse_interpolation_mode(
interpolation_mode, self.dataset.properties.data_layers[self.name].category
)

# perform downsampling
with get_executor_for_args(args) as executor:
voxel_count_per_cube = np.prod(prev_mag_ds._get_file_dimensions())
job_count_per_log = math.ceil(
1024 ** 3 / voxel_count_per_cube
) # log every gigavoxel of processed data
assert from_mag < target_mag
assert target_mag.to_layer_name() not in self.mags

if buffer_edge_len is None:
buffer_edge_len = determine_buffer_edge_len(
prev_mag_ds.view
) # DEFAULT_EDGE_LEN
func = named_partial(
downsample_cube_job,
mag_factors=mag_factors,
interpolation_mode=parsed_interpolation_mode,
buffer_edge_len=buffer_edge_len,
compress=compress,
job_count_per_log=job_count_per_log,
)
prev_mag_ds.get_view().for_zipped_chunks(
# this view is restricted to the bounding box specified in the properties
func,
target_view=target_mag_view,
source_chunk_size=np.array(target_mag_ds._get_file_dimensions())
* mag_factors,
target_chunk_size=target_mag_ds._get_file_dimensions(),
executor=executor,
)
prev_mag_ds = self.mags[from_mag.to_layer_name()]

logging.info("Mag {0} successfully cubed".format(target_mag))
mag_factors = [
t // s for (t, s) in zip(target_mag.to_array(), from_mag.to_array())
]

prev_mag = target_mag
target_mag = get_next_mag(target_mag, scale)
# initialize the new mag
target_mag_ds = self._initialize_mag_from_other_mag(
target_mag, prev_mag_ds, compress
)

# Get target view
target_mag_view = target_mag_ds.get_view(is_bounded=not compress)

# perform downsampling
with get_executor_for_args(args) as executor:
voxel_count_per_cube = np.prod(prev_mag_ds._get_file_dimensions())
job_count_per_log = math.ceil(
1024 ** 3 / voxel_count_per_cube
) # log every gigavoxel of processed data

if buffer_edge_len is None:
buffer_edge_len = determine_buffer_edge_len(
prev_mag_ds.view
) # DEFAULT_EDGE_LEN
func = named_partial(
downsample_cube_job,
mag_factors=mag_factors,
interpolation_mode=parsed_interpolation_mode,
buffer_edge_len=buffer_edge_len,
compress=compress,
job_count_per_log=job_count_per_log,
)
prev_mag_ds.get_view().for_zipped_chunks(
# this view is restricted to the bounding box specified in the properties
func,
target_view=target_mag_view,
source_chunk_size=np.array(target_mag_ds._get_file_dimensions())
* mag_factors,
target_chunk_size=target_mag_ds._get_file_dimensions(),
executor=executor,
)

logging.info("Mag {0} successfully cubed".format(target_mag))

def setup_mag(self, mag: Union[str, Mag]) -> None:
pass
Expand Down

0 comments on commit 7a24d71

Please sign in to comment.