Skip to content

Commit

Permalink
Pack according to domains
Browse files Browse the repository at this point in the history
Resolves #1
  • Loading branch information
ma3ke committed Apr 2, 2024
1 parent 60571d4 commit bac62fe
Showing 1 changed file with 182 additions and 65 deletions.
247 changes: 182 additions & 65 deletions src/bentopy/pack/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import math
import time
import itertools
import pathlib

import numpy as np
Expand All @@ -23,6 +24,15 @@ def log(*args, **kwargs):
print(*args, **kwargs)


def trail_off(v: float) -> float:
"""
Takes a value `v` in the range `0.0 <= v <= 1.0` and returns a probability
according to a sigmoid density distribution, such that this probability
is 1.0 for `v == 0.0` and goes to 0.0 as `v` approaches 1.0.
"""
# See https://www.desmos.com/calculator/cdyqdxvjmy.
return 1 - 1 / (1 + np.exp(-12 * (v - 0.5)))

def placement_location(valid, selection, segment):
"""
Returns the valid voxel placement position for the segment.
Expand All @@ -46,77 +56,184 @@ def place(

start = time.time()

# First, we convolve the background to reveal the points where we can
# safely place a segment without overlapping them.
# TODO: Make sure this axis is either user-definable or just depends on the
# longest axis in `Space.dimensions`.
# FIXME(domains): Revisit and reformat this comment. Really needs to be correct ;)
# We break up the background into a set of layers over the z-axis. We call
# these layers 'domains'. Each of these domains will be packed
# sequentially, such that we place the segment over the entire space,
# eventually. This breaking up into domains serves to reduce the peak
# memory footprint during the convolution. For large spaces, the memory
# required for a convolution of the entire space may exceed the available
# amount. Hence, we want the ability to break up the space over which
# the convolution is applied.
# To make sure that the resulting packing remains well-stirred despite the
# divided procedure, two passes are performed. The first we call the
# 'white' pass, and it leaves clear division planes between the domains.
# The second pass, which we call the 'black' pass, goes over the space with
# an offset of half a domain size.
# To ensure the placement densities are uniform overall, a placement
# probability density function is used within a domain. It is shaped such
# that the first half of the domain is a constant, full density. In this
# first half, every valid placement that is considered will be accepted.
# Within the second half of a domain, the placement probability density
# will trail off to zero according to a sigmoid distribution. According to
# this distribution, the closer a valid placement is to the end of the
# domain, the less likely it becomes the placement is accepted.
n_domains = 5
# The size of the background in the direction of the domain layers.
background_size = background.shape[2]
assert (
background.shape[2] % n_domains == 0
), "For now, make sure we can neatly divide the background into domains"
domain_size = background.shape[2] // n_domains
assert (
domain_size % 2 == 0
), "For now, make sure we can neatly divide a domain into two halves"
half_domain_size = domain_size // 2
# # We first pack the domains at the even indices, followed by the odd ones.
# domain_indices = (*range(0, n_domains, 2), *range(1, n_domains, 2))
white_domains = [
((2 * i) * half_domain_size, (2 * i + 2) * half_domain_size)
for i in range(n_domains * 2)
]
# The black domains should also cover the first and last 'half' domains.
black_domains = [
(
max(
(2 * i - 1) * half_domain_size,
0,
),
min(
(2 * i + 1) * half_domain_size,
background_size,
),
)
for i in range((n_domains + 1) * 2)
]

query = np.flip(segment_voxels)
collisions = fftconvolve(background, query, mode="valid")
valid_offset = np.array(query.shape) // 2
convolution_duration = time.time() - start
log(f" (convolution took {convolution_duration:.3f} s)")

# The valid placement points will have a value of 0. Since the floating
# point operations leave some small errors laying around, we use a quite
# generous cutoff.
valid = np.array(np.where(collisions < 1e-4)) + valid_offset[:, None]
if valid.size == 0:
return None

placements = []
hits = 0
tries = 0 # Number of times segment placement was unsuccesful.
start = time.time()
previously_selected = set()
while hits < max_at_once:
if tries >= max_tries:
# Give up.
log(" ! tries is exceeding max_tries")
break
if len(previously_selected) == len(valid[0]):
return None
while True:
selection = RNG.integers(0, len(valid[0]))
if selection not in previously_selected:
previously_selected.add(selection)
for domain_start, domain_end in itertools.chain(white_domains, black_domains):
print("doing one domain")
domain = background[:, :, domain_start:domain_end]
domain_offset = np.array([0, 0, domain_start])
# First, we convolve the domain to reveal the points where we can
# safely place a segment without overlapping them.
collisions = fftconvolve(domain, query, mode="valid")
valid_offset = np.array(query.shape) // 2
convolution_duration = time.time() - start
log(f" (convolution took {convolution_duration:.3f} s)")

# The valid placement points will have a value of 0. Since the floating
# point operations leave some small errors laying around, we use a quite
# generous cutoff.
valid = np.array(np.where(collisions < 1e-4)) + valid_offset[:, None]
if valid.size == 0:
# If there are no valid placements, move on the next domain early.
placements.append(None)
continue

domain_placements = []
hits = 0
tries = 0 # Number of times segment placement was unsuccesful.
start = time.time()
previously_selected = set()
while hits < max_at_once:
if tries >= max_tries:
# Give up.
log(" ! tries is exceeding max_tries")
break

# Make sure that this placement does not overlap with another
# previously selected location.
location = placement_location(valid, selection, segment_voxels)
prospect = np.where(segment_voxels) + location[:, None]
# Check for collisions at the prospective site.
free = not np.any(background[prospect[0, :], prospect[1, :], prospect[2, :]])

if free:
start = time.time()

temp_selected_indices = prospect
background[
temp_selected_indices[0, :],
temp_selected_indices[1, :],
temp_selected_indices[2, :],
] = 1.0

placements.append(tuple(int(a) for a in location * resolution))
hits += 1
else:
tries += 1
log(
f" {tries = }/{max_tries},\thits = {hits}/{max_at_once}",
end="\r",
)
placement_duration = time.time() - start
if placement_duration * threshold_coefficient > convolution_duration:
# The placement is taking half as long as a convolution
# takes. At that point, it is probably cheaper to just run
# another convolution.
if len(previously_selected) == len(valid[0]):
domain_placements = None
break
while True:
selection = RNG.integers(0, len(valid[0]))
if selection not in previously_selected:
previously_selected.add(selection)
break

# Make sure that this placement does not overlap with another
# previously selected location.
location = placement_location(valid, selection, segment_voxels)
prospect = np.where(segment_voxels) + location[:, None]
# Check for collisions at the prospective site.
free = not np.any(domain[prospect[0, :], prospect[1, :], prospect[2, :]])

if free:
start = time.time()

z_location = location[2]
# FIXME(domains): This is truly weird and can probably all be integrated in one well-formed if statement.
is_half_domain = domain_end - domain_start == half_domain_size
if is_half_domain:
# We need to deal with a half-sized domain at the start or end of the black pass.
if domain_start == 0:
acceptance_probability = trail_off(
z_location / half_domain_size
)
elif domain_end == background_size:
acceptance_probability = 1.0
else:
assert False, "Unreachable! Encountered a half domain that is not at the start or end of the background. This should be impossible."
else:
# The common case.
if z_location < half_domain_size:
acceptance_probability = 1.0
if z_location >= half_domain_size:
acceptance_probability = trail_off(
(z_location - half_domain_size) / half_domain_size
)

# Only actually place it if we accept it according to the
# placement densitity distribution.
if RNG.random() < acceptance_probability:
temp_selected_indices = prospect
domain[
temp_selected_indices[0, :],
temp_selected_indices[1, :],
temp_selected_indices[2, :],
] = 1.0

domain_placements.append(
tuple(int(a) for a in (location + domain_offset) * resolution)
)
hits += 1
else:
tries += 1
log(
f" & placement_duration ({placement_duration:.6f}) * tc ({threshold_coefficient:.2f}) > convolution_duration"
f" {tries = }/{max_tries},\thits = {hits}/{max_at_once}",
end="\r",
)
break
log("\x1b[K", end="") # Clear the line since we used \r before.
log(f" . placed {hits} segments with {tries}/{max_tries} misses")

return placements
placement_duration = time.time() - start
if placement_duration * threshold_coefficient > convolution_duration:
# The placement is taking half as long as a convolution
# takes. At that point, it is probably cheaper to just run
# another convolution.
log(
f" & placement_duration ({placement_duration:.6f}) * tc ({threshold_coefficient:.2f}) > convolution_duration"
)
break
log("\x1b[K", end="") # Clear the line since we used \r before.
log(f" . placed {hits} segments with {tries}/{max_tries} misses")
placements.append(domain_placements)

print(f"placed {len(placements)} domains")

# If the placements for each domain are all None, we need to let the caller
# know. But if only some or none of them are None, we can just sum to
# communicate the number of segments that were placed.
if all(map(lambda domain_placements: domain_placements is None, placements)):
print("None")
return None
else:
r = []
for domain_placements in placements:
if domain_placements is not None:
r.extend(domain_placements)
print("return r")
return r


def pack(
Expand Down

0 comments on commit bac62fe

Please sign in to comment.