diff --git a/navis/nbl/base.py b/navis/nbl/base.py index c3ad4203..ef632ad5 100644 --- a/navis/nbl/base.py +++ b/navis/nbl/base.py @@ -13,10 +13,13 @@ """Module containing base classes for BLASTING.""" +import atexit + import numpy as np import pandas as pd from abc import ABC, abstractmethod +from multiprocessing import shared_memory from .. import utils, config @@ -25,7 +28,7 @@ class Blaster(ABC): - """Base class for blasting.""" + """Base class for Blasting.""" def __init__(self, dtype=np.float64, progress=True): """Initialize class.""" @@ -58,15 +61,7 @@ def dtype(self): @dtype.setter def dtype(self, dtype): - try: - self._dtype = np.dtype(dtype) - except TypeError: - try: - self._dtype = FLOAT_DTYPES[dtype] - except KeyError: - raise ValueError( - f'Unknown precision/dtype {dtype}. Expected on of the following: 16, 32 or 64 (default)' - ) + self._dtype = parse_precision(dtype) def pair_query_target(self, pairs, scores='forward'): """BLAST multiple pairs. @@ -101,7 +96,7 @@ def pair_query_target(self, pairs, scores='forward'): return scr - def multi_query_target(self, q_idx, t_idx, scores='forward'): + def multi_query_target(self, q_idx, t_idx, scores='forward', out=None): """BLAST multiple queries against multiple targets. Parameters @@ -110,6 +105,8 @@ def multi_query_target(self, q_idx, t_idx, scores='forward'): Iterable of query/target neuron indices to BLAST. scores : "forward" | "mean" | "min" | "max" Which scores to return. + out : np.ndarray, optional + Array to write results to. """ if utils.is_jupyter() and config.tqdm == config.tqdm_notebook: @@ -124,18 +121,24 @@ def multi_query_target(self, q_idx, t_idx, scores='forward'): else: position = getattr(self, 'pbar_position', 0) - res = np.zeros((len(q_idx), len(t_idx)), - dtype=self.dtype) + shape = (len(q_idx), len(t_idx)) + if not isinstance(out, np.ndarray): + out = np.zeros(shape, dtype=self.dtype) + elif out.shape != shape: + raise TypeError(f'Expected out array of shape {shape}, got {out.shape}') + elif out.dtype != self.dtype: + raise TypeError(f'Expected out array to be {self.dtype}, got {out.dtype}') + for i, q in enumerate(config.tqdm(q_idx, desc=self.desc, leave=False, position=position, disable=not self.progress)): for k, t in enumerate(t_idx): - res[i, k] = self.single_query_target(q, t, scores=scores) + out[i, k] = self.single_query_target(q, t, scores=scores) # Generate results - res = pd.DataFrame(res) + res = pd.DataFrame(out) res.columns = [self.ids[t] for t in t_idx] res.index = [self.ids[q] for q in q_idx] @@ -157,3 +160,109 @@ def all_by_all(self, scores='forward'): res.loc[:, :] = np.dstack((res, res.T)).max(axis=2) return res + + +class SharedBlaster(Blaster, ABC): + """Version of the Blaster that works with shared memory buffers.""" + + def create_array(self, shm, shape, offset=None): + """Create array from shared memory buffer.""" + # Generate the out array from shared memory buffer + arr = np.ndarray(shape, dtype=self.dtype, buffer=shm.buf) + + if not isinstance(offset, type(None)): + arr = arr[offset] + + return arr + + def multi_query_target(self, q_idx, t_idx, shm, shape, offset=None, scores='forward'): + """BLAST multiple queries against multiple targets. + + Parameters + ---------- + q_idx,t_idx : iterable + Iterable of query/target neuron indices to BLAST. + shm : multiprocessing.shared_memory.SharedMemory + shape : tuple (N, M) + Shape of the array in the memory buffer. + offset : tuple (slice, slice) + The view inside the full array to pass through as `out` to + `multi_query_target`. + scores : "forward" | "mean" | "min" | "max" + Which scores to produce. + + """ + out = self.create_array(shm, shape, offset=offset) + + _ = super().multi_query_target(q_idx, t_idx, scores=scores, out=out) + + def all_by_all(self, shm, shape, offset=None, scores='forward'): + """BLAST all-by-all neurons.""" + out = self.create_array(shm, shape, offset=offset) + + _ = super().multi_query_target(range(len(self.neurons)), + range(len(self.neurons)), + scores='forward', + out=out) + + # For all-by-all BLAST we can get the mean score by + # transposing the scores + if scores == 'mean': + out[:, :] = (out + out.T) / 2 + elif scores == 'min': + out[:, :] = np.dstack((out, out.T)).min(axis=2) + elif scores == 'max': + out[:, :] = np.dstack((out, out.T)).max(axis=2) + + +def create_shared_array(shape, dtype): + """Create shared array. + + Parameters + ---------- + shape : tuple + dtype : str | np.dtype + + Returns + ------- + multiprocessing.shared_memory.SharedMemory + The shared memory buffer for the array. + np.ndarray + A numpy array accessing the shared memory buffer. + + """ + # Get the number of items in the requested array + if utils.is_iterable(shape): + items = np.prod(shape) + else: + items = shape + + # Force dtype to numpy dtype + dtype = np.dtype(dtype) + # Calculate required size for memory buffer + size = dtype.itemsize * items + + # Create shared memory buffer + shm = shared_memory.SharedMemory(create=True, size=size) + + # We need to make sure that the memory is released on exit of this process + # Note: order is reverse -> last registered is executed first + atexit.register(shm.unlink) + atexit.register(shm.close) + + # Create array based on buffer + arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf) + + return shm, arr + + +def parse_precision(dtype): + """Parse precision into numpy dtype.""" + try: + return np.dtype(dtype) + except TypeError: + try: + return FLOAT_DTYPES[dtype] + except KeyError: + raise ValueError(f'Unknown precision/dtype {dtype}. Expected one ' + 'of the following: 16, 32 or 64 (default)') diff --git a/navis/nbl/nblast_funcs.py b/navis/nbl/nblast_funcs.py index 32dae6fa..101886a6 100644 --- a/navis/nbl/nblast_funcs.py +++ b/navis/nbl/nblast_funcs.py @@ -27,7 +27,7 @@ from .. import utils, config from ..core import NeuronList, Dotprops, make_dotprops -from .base import Blaster +from .base import Blaster, SharedBlaster, create_shared_array, parse_precision __all__ = ['nblast', 'nblast_smart', 'nblast_allbyall', 'sim_to_dist'] @@ -234,6 +234,11 @@ def single_query_target(self, q_idx, t_idx, scores='forward'): return scr +class SharedNBlaster(NBlaster, SharedBlaster): + """An NBLASTER working with shared memory.""" + pass + + def nblast_smart(query: Union[Dotprops, NeuronList], target: Optional[str] = None, t: Union[int, float] = 90, @@ -316,7 +321,7 @@ def nblast_smart(query: Union[Dotprops, NeuronList], precision : int [16, 32, 64] | str [e.g. "float64"] | np.dtype Precision for scores. Defaults to 64 bit (double) floats. This is useful to reduce the memory footprint for very large - matrices. In real-world scenarios 32 bit (single)- and + matrices. In real-world scenarios 32 bit (single) - and depending on the purpose even 16 bit (half) - are typically sufficient. n_cores : int, optional @@ -685,21 +690,30 @@ def nblast(query: Union[Dotprops, NeuronList], else: n_rows, n_cols = find_batch_partition(batch_size, query_dps, target_dps) + # Force precision into numpy dtype + dtype = parse_precision(precision) + + # Create scores array + shm, arr = create_shared_array(shape=(len(query_dps), len(target_dps)), + dtype=dtype) + nblasters = [] with config.tqdm(desc='Preparing', total=n_rows * n_cols, leave=False, disable=not progress) as pbar: + offset_x = 0 for q in np.array_split(query_dps, n_rows): + offset_y = 0 for t in np.array_split(target_dps, n_cols): # Initialize NBlaster - this = NBlaster(use_alpha=use_alpha, - normalized=normalized, - smat=smat, - limit_dist=limit_dist, - dtype=precision, - approx_nn=approx_nn, - progress=progress) + this = SharedNBlaster(use_alpha=use_alpha, + normalized=normalized, + smat=smat, + limit_dist=limit_dist, + dtype=dtype, + approx_nn=approx_nn, + progress=progress) # Use better description if we process in batches if batch_size: @@ -713,32 +727,41 @@ def nblast(query: Union[Dotprops, NeuronList], # Keep track of indices of queries and targets this.queries = np.arange(len(q)) this.targets = np.arange(len(t)) + len(q) + + # Make sure progress bars don't overlap this.pbar_position = len(nblasters) % n_cores + # This is the offset into the shared array + this.offset_x = slice(offset_x, offset_x + len(q)) + this.offset_y = slice(offset_y, offset_y + len(t)) + nblasters.append(this) pbar.update(1) + offset_y += len(t) + offset_x += len(q) # If only one core, we don't need to break out the multiprocessing if n_cores == 1: - return this.multi_query_target(this.queries, - this.targets, - scores=scores) - - with ProcessPoolExecutor(max_workers=n_cores) as pool: - # Each nblaster is passed to its own process - futures = [pool.submit(this.multi_query_target, - q_idx=this.queries, - t_idx=this.targets, - scores=scores) for this in nblasters] - - results = [f.result() for f in futures] + _ = this.multi_query_target(this.queries, + this.targets, + shm=shm, + shape=arr.shape, + offset=None, + scores=scores) + else: + with ProcessPoolExecutor(max_workers=n_cores) as pool: + # Each nblaster is passed to its own process + futures = [pool.submit(this.multi_query_target, + q_idx=this.queries, + t_idx=this.targets, + shm=shm, + shape=arr.shape, + offset=(this.offset_x, this.offset_y), + scores=scores) for this in nblasters] - scores = pd.DataFrame(np.zeros((len(query_dps), len(target_dps)), - dtype=this.dtype), - index=query_dps.id, columns=target_dps.id) + _ = [f.result() for f in futures] - for res in results: - scores.loc[res.index, res.columns] = res.values + scores = pd.DataFrame(arr, index=query_dps.id, columns=target_dps.id) return scores