From ae991cb526f72f971825f9b653fa0dd05fd74e44 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 1 Aug 2024 22:44:36 +0200 Subject: [PATCH 1/6] Add P2P shuffle for arrays --- distributed/shuffle/_core.py | 6 +- distributed/shuffle/_rechunk.py | 2 +- distributed/shuffle/_shuffle_array.py | 405 ++++++++++++++++++ .../shuffle/tests/test_shuffle_array.py | 55 +++ 4 files changed, 464 insertions(+), 4 deletions(-) create mode 100644 distributed/shuffle/_shuffle_array.py create mode 100644 distributed/shuffle/tests/test_shuffle_array.py diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index b0c4fc17e1..ddc7233e72 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -340,7 +340,7 @@ async def _receive(self, data: list[tuple[_T_partition_id, Any]]) -> None: """Receive shards belonging to output partitions of this shuffle run""" def add_partition( - self, data: _T_partition_type, partition_id: _T_partition_id + self, data: _T_partition_type, partition_id: _T_partition_id, **kwargs: Any ) -> int: self.raise_if_closed() if self.transferred: @@ -351,13 +351,13 @@ def add_partition( context_meter.meter("p2p-shard-partition-noncpu"), context_meter.meter("p2p-shard-partition-cpu", func=thread_time), ): - shards = self._shard_partition(data, partition_id) + shards = self._shard_partition(data, partition_id, **kwargs) sync(self._loop, self._write_to_comm, shards) return self.run_id @abc.abstractmethod def _shard_partition( - self, data: _T_partition_type, partition_id: _T_partition_id + self, data: _T_partition_type, partition_id: _T_partition_id, **kwargs: Any ) -> dict[str, tuple[_T_partition_id, Any]]: """Shard an input partition by the assigned output workers""" diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 1ff0daa97c..f541a5b1ca 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -907,7 +907,7 @@ async def _receive( raise def _shard_partition( - self, data: np.ndarray, partition_id: NDIndex + self, data: np.ndarray, partition_id: NDIndex, **kwargs: Any ) -> dict[str, tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]]: out: dict[str, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]] = defaultdict( list diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py new file mode 100644 index 0000000000..6bb3b4807b --- /dev/null +++ b/distributed/shuffle/_shuffle_array.py @@ -0,0 +1,405 @@ +from __future__ import annotations + +import mmap +import os +from collections import defaultdict +from collections.abc import Generator, Hashable, Sequence +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from itertools import count, product +from pathlib import Path +from typing import Any, Callable + +import numpy as np +from tornado.ioloop import IOLoop + +from dask.sizeof import sizeof +from dask.typing import Key + +from distributed.core import PooledRPCCall +from distributed.metrics import context_meter +from distributed.shuffle import ShuffleWorkerPlugin +from distributed.shuffle._core import ( + NDIndex, + ShuffleId, + ShuffleRun, + ShuffleSpec, + barrier_key, + get_worker_plugin, + handle_transfer_errors, +) +from distributed.shuffle._limiter import ResourceLimiter +from distributed.shuffle._pickle import unpickle_bytestream +from distributed.shuffle._rechunk import rechunk_unpack +from distributed.shuffle._shuffle import shuffle_barrier +from distributed.utils_comm import DoNotUnpack + + +def shuffle_name(token: str) -> str: + return f"shuffle-p2p-{token}" + + +def _p2p_shuffle( # type: ignore[no-untyped-def] + chunks, new_chunks, axis, in_name: str, out_name: str, disk: bool = True +) -> dict[Key, Any]: + from dask.array._shuffle import convert_key + + arrays = [] + for i, new_chunk in enumerate(new_chunks): + arrays.append(np.array([new_chunk, [i] * len(new_chunk)])) + + result = np.concatenate(arrays, axis=1) + sorter = np.argsort(result[0, :]) + sorted_indexer = result[:, sorter] + chunk_boundaries = np.cumsum((0,) + chunks[axis]) + + dsk: dict[Key, Any] = {} + + # Use `token` to generate a canonical group for the entire rechunk + token = out_name.split("-")[-1] + transfer_group = f"shuffle-transfer-{token}" + unpack_group = out_name + _barrier_key = barrier_key(ShuffleId(token)) + + # Get existing chunk tuple locations + chunk_tuples = list( + product(*(range(len(c)) for i, c in enumerate(chunks) if i != axis)) + ) + chunk_lengths = [len(c) for c in chunks] + chunk_lengths[axis] = len(np.unique(result[1, :])) + chunk_lengths_not_unpack = DoNotUnpack(chunk_lengths) + + transfer_keys = [] + suffixes = count() + + for i, (start, stop) in enumerate(zip(chunk_boundaries[:-1], chunk_boundaries[1:])): + chunk_indexer = sorted_indexer[:, start:stop].copy() + if len(chunk_indexer) == 0: + # skip output chunks that don't get any data + continue + + chunk_indexer[0, :] -= chunk_boundaries[i] + chunk_indexer_not_unpack = DoNotUnpack(chunk_indexer) + + for chunk_tuple in chunk_tuples: + key = (transfer_group,) + (next(suffixes),) + transfer_keys.append(key) + dsk[key] = ( + shuffle_transfer, + (in_name,) + convert_key(chunk_tuple, i, axis), + token, + chunk_indexer_not_unpack, + DoNotUnpack(chunk_tuple), + chunk_lengths_not_unpack, + axis, + convert_key(chunk_tuple, i, axis), + disk, + ) + + dsk[_barrier_key] = (shuffle_barrier, token, transfer_keys) + + for axis_chunk in np.unique(result[1, :]): + sorter = np.argsort(result[0, result[1, :] == axis_chunk]) + + for chunk_tuple in chunk_tuples: + chunk_key = convert_key(chunk_tuple, int(axis_chunk), axis) + + dsk[(unpack_group,) + chunk_key] = ( + _shuffle_unpack, + token, + chunk_key, + _barrier_key, + sorter, + axis, + ) + return dsk + + +def _shuffle_unpack( + id: ShuffleId, + output_chunk: NDIndex, + barrier_run_id: int, + sorter: np.ndarray, + axis: int, +) -> np.ndarray: + result = rechunk_unpack(id, output_chunk, barrier_run_id) + slicer = [ + slice(None), + ] * len(result.shape) + slicer[axis] = np.argsort(sorter) # type: ignore[call-overload] + return result[*slicer] + + +def shuffle_transfer( + input: np.ndarray, + id: ShuffleId, + chunk_indexer: tuple[np.ndarray, np.ndarray], + chunk_tuple: tuple[int, ...], + chunk_lengths: tuple[int, ...], + axis: int, + input_chunk: Any, + disk: bool, +) -> int: + with handle_transfer_errors(id): + return get_worker_plugin().add_partition( + input, + partition_id=input_chunk, + spec=ArrayShuffleSpec( + id=id, chunk_lengths=chunk_lengths, axis=axis, disk=disk + ), + chunk_indexer=chunk_indexer, + chunk_tuple=chunk_tuple, + ) + + +@dataclass(frozen=True) +class ArrayShuffleSpec(ShuffleSpec[NDIndex]): + chunk_lengths: tuple[int, ...] + axis: int + + @property + def output_partitions(self) -> Generator[NDIndex, None, None]: + yield from product(*(range(c) for c in self.chunk_lengths)) + + def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str: + npartitions = 1 + for c in self.chunk_lengths: + npartitions *= c + ix = 0 + for dim, pos in enumerate(partition): + if dim > 0: + ix += self.chunk_lengths[dim - 1] * pos + else: + ix += pos + i = len(workers) * ix // npartitions + return workers[i] + + def create_run_on_worker( + self, + run_id: int, + span_id: str | None, + worker_for: dict[NDIndex, str], + plugin: ShuffleWorkerPlugin, + ) -> ShuffleRun: + return ArrayShuffleRun( + worker_for=worker_for, + axis=self.axis, + id=self.id, + run_id=run_id, + span_id=span_id, + directory=os.path.join( + plugin.worker.local_directory, + f"shuffle-{self.id}-{run_id}", + ), + executor=plugin._executor, + local_address=plugin.worker.address, + rpc=plugin.worker.rpc, + digest_metric=plugin.worker.digest_metric, + scheduler=plugin.worker.scheduler, + memory_limiter_disk=plugin.memory_limiter_disk, + memory_limiter_comms=plugin.memory_limiter_comms, + disk=self.disk, + loop=plugin.worker.loop, + ) + + +class ArrayShuffleRun(ShuffleRun[NDIndex, "np.ndarray"]): + """State for a single active rechunk execution + + This object is responsible for splitting, sending, receiving and combining + data shards. + + It is entirely agnostic to the distributed system and can perform a rechunk + with other run instances using `rpc``. + + The user of this needs to guarantee that only `ArrayRechunkRun`s of the same unique + `ShuffleID` and `run_id` interact. + + Parameters + ---------- + worker_for: + A mapping partition_id -> worker_address. + axis: int + Axis to shuffle along. + id: + A unique `ShuffleID` this belongs to. + run_id: + A unique identifier of the specific execution of the shuffle this belongs to. + span_id: + Span identifier; see :doc:`spans` + local_address: + The local address this Shuffle can be contacted by using `rpc`. + directory: + The scratch directory to buffer data in. + executor: + Thread pool to use for offloading compute. + rpc: + A callable returning a PooledRPCCall to contact other Shuffle instances. + Typically a ConnectionPool. + digest_metric: + A callable to ingest a performance metric. + Typically Server.digest_metric. + scheduler: + A PooledRPCCall to contact the scheduler. + memory_limiter_disk: + memory_limiter_comm: + A ``ResourceLimiter`` limiting the total amount of memory used in either + buffer. + """ + + def __init__( + self, + worker_for: dict[NDIndex, str], + axis: int, + id: ShuffleId, + run_id: int, + span_id: str | None, + local_address: str, + directory: str, + executor: ThreadPoolExecutor, + rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], + scheduler: PooledRPCCall, + memory_limiter_disk: ResourceLimiter, + memory_limiter_comms: ResourceLimiter, + disk: bool, + loop: IOLoop, + ): + super().__init__( + id=id, + run_id=run_id, + span_id=span_id, + local_address=local_address, + directory=directory, + executor=executor, + rpc=rpc, + digest_metric=digest_metric, + scheduler=scheduler, + memory_limiter_comms=memory_limiter_comms, + memory_limiter_disk=memory_limiter_disk, + disk=disk, + loop=loop, + ) + self.axis = axis + partitions_of = defaultdict(list) + for part, addr in worker_for.items(): + partitions_of[addr].append(part) + self.partitions_of = dict(partitions_of) + self.worker_for = worker_for + + def _shard_partition( # type: ignore[override] + self, + data: np.ndarray, + partition_id: NDIndex, + chunk_indexer: tuple[np.ndarray, np.ndarray], + chunk_tuple: tuple[int, ...], + ) -> dict[str, tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]]: + from dask.array._shuffle import convert_key + + out: dict[str, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]] = defaultdict( + list + ) + shards_size = 0 + shards_count = 0 + + target_chunk_nrs, taker_boundary = np.unique( + chunk_indexer[1], return_index=True + ) + + for target_chunk in target_chunk_nrs: + ndslice = [ + slice(None), + ] * len(data.shape) + ndslice[self.axis] = chunk_indexer[0][chunk_indexer[1] == target_chunk] + shard = data[*ndslice] + # Don't wait until all shards have been transferred over the network + # before data can be released + if shard.base is not None: + shard = shard.copy() + + shards_size += shard.nbytes + shards_count += 1 + chunk_index = convert_key(chunk_tuple, target_chunk, self.axis) + + out[self.worker_for[chunk_index]].append( + (chunk_index, (partition_id, shard)) + ) + + context_meter.digest_metric("p2p-shards", shards_size, "bytes") + context_meter.digest_metric("p2p-shards", shards_count, "count") + return {k: (partition_id, v) for k, v in out.items()} + + async def _receive( + self, + data: list[tuple[NDIndex, list[tuple[NDIndex, tuple[NDIndex, np.ndarray]]]]], + ) -> None: + self.raise_if_closed() + + # Repartition shards and filter out already received ones + shards = defaultdict(list) + for d in data: + id1, payload = d + if id1 in self.received: + continue + self.received.add(id1) + for id2, shard in payload: + shards[id2].append(shard) + self.total_recvd += sizeof(d) + del data + if not shards: + return + + try: + await self._write_to_disk(shards) + except Exception as e: + self._exception = e + raise + + def _get_output_partition( + self, partition_id: NDIndex, key: Key, **kwargs: Any + ) -> np.ndarray: + # Quickly read metadata from disk. + # This is a bunch of seek()'s interleaved with short reads. + data = self._read_from_disk(partition_id) + # Copy the memory-mapped buffers from disk into memory. + # This is where we'll spend most time. + return _convert_chunk(data, self.axis) + + def deserialize(self, buffer: Any) -> Any: + return buffer + + def read(self, path: Path) -> tuple[list[list[tuple[NDIndex, np.ndarray]]], int]: + """Open a memory-mapped file descriptor to disk, read all metadata, and unpickle + all arrays. This is a fast sequence of short reads interleaved with seeks. + Do not read in memory the actual data; the arrays' buffers will point to the + memory-mapped area. + + The file descriptor will be automatically closed by the kernel when all the + returned arrays are dereferenced, which will happen after the call to + concatenate3. + """ + with path.open(mode="r+b") as fh: + buffer = memoryview(mmap.mmap(fh.fileno(), 0)) + + # The file descriptor has *not* been closed! + shards = list(unpickle_bytestream(buffer)) + return shards, buffer.nbytes + + def _get_assigned_worker(self, id: NDIndex) -> str: + return self.worker_for[id] + + +def _convert_chunk( + shards: list[list[tuple[NDIndex, np.ndarray]]], axis: int +) -> np.ndarray: + import numpy as np + + indexed: dict[NDIndex, np.ndarray] = {} + for sublist in shards: + for index, shard in sublist: + indexed[index] = shard + + arrs = [indexed[i] for i in sorted(indexed.keys())] + # This may block for several seconds, as it physically reads the memory-mapped + # buffers from disk + return np.concatenate(arrs, axis=axis) diff --git a/distributed/shuffle/tests/test_shuffle_array.py b/distributed/shuffle/tests/test_shuffle_array.py new file mode 100644 index 0000000000..12ef9f4657 --- /dev/null +++ b/distributed/shuffle/tests/test_shuffle_array.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import numpy as np +import pytest + +import dask.array as da +from dask.core import flatten + +from distributed.utils_test import gen_cluster + + +@pytest.fixture() +def arr(): + return np.arange(0, 24).reshape(8, 3).T.copy() + + +@pytest.fixture() +def darr(arr): + return da.from_array(arr, chunks=((2, 1), (4, 4))) + + +@pytest.mark.parametrize( + "indexer, chunks", + [ + ([[1, 5, 6], [0, 2, 3, 4, 7]], (3, 5)), + ([[1, 5, 6], [0, 3], [4, 2, 7]], (5, 3)), + ([[1], [0, 6, 5, 3, 2, 4], [7]], (1, 6, 1)), + ([[1, 5, 1, 5, 1, 5], [1, 6, 4, 2, 7]], (6, 5)), + ], +) +@gen_cluster(client=True) +async def test_shuffle(c, s, *ws, arr, darr, indexer, chunks): + result = darr.shuffle(indexer, axis=1) + expected = arr[:, list(flatten(indexer))] + x = await c.compute(result) + np.testing.assert_array_equal(x, expected) + assert result.chunks[0] == darr.chunks[0] + assert result.chunks[1] == chunks + + +@gen_cluster(client=True) +async def test_shuffle_larger_array(c, s, *ws): + arr = da.random.random((15, 15, 15), chunks=(5, 5, 5)) + indexer = np.arange(0, 15) + np.random.shuffle(indexer) + indexer = [indexer[0:6], indexer[6:8], indexer[8:9], indexer[9:]] + indexer = list(map(list, indexer)) + take_indexer = list(flatten(indexer)) + + x = await c.compute(arr.shuffle(indexer, axis=1)) + y = await c.compute(arr[..., take_indexer, :]) + np.testing.assert_array_equal(x, y) + z = await c.compute(arr) + z = z[..., take_indexer, :] + np.testing.assert_array_equal(x, z) From f316cec68635ddb6fe33110b497bf7eb9d512ca7 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 1 Aug 2024 23:07:57 +0200 Subject: [PATCH 2/6] Update --- distributed/shuffle/_shuffle_array.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py index 6bb3b4807b..16b1f91bcd 100644 --- a/distributed/shuffle/_shuffle_array.py +++ b/distributed/shuffle/_shuffle_array.py @@ -32,7 +32,6 @@ from distributed.shuffle._pickle import unpickle_bytestream from distributed.shuffle._rechunk import rechunk_unpack from distributed.shuffle._shuffle import shuffle_barrier -from distributed.utils_comm import DoNotUnpack def shuffle_name(token: str) -> str: @@ -67,7 +66,6 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] ) chunk_lengths = [len(c) for c in chunks] chunk_lengths[axis] = len(np.unique(result[1, :])) - chunk_lengths_not_unpack = DoNotUnpack(chunk_lengths) transfer_keys = [] suffixes = count() @@ -79,7 +77,6 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] continue chunk_indexer[0, :] -= chunk_boundaries[i] - chunk_indexer_not_unpack = DoNotUnpack(chunk_indexer) for chunk_tuple in chunk_tuples: key = (transfer_group,) + (next(suffixes),) @@ -88,9 +85,9 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] shuffle_transfer, (in_name,) + convert_key(chunk_tuple, i, axis), token, - chunk_indexer_not_unpack, - DoNotUnpack(chunk_tuple), - chunk_lengths_not_unpack, + chunk_indexer, + chunk_tuple, + chunk_lengths, axis, convert_key(chunk_tuple, i, axis), disk, From 344215141a3448d04025e16b45f5a13ed6950162 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 2 Aug 2024 12:30:07 +0200 Subject: [PATCH 3/6] Rename keys --- distributed/shuffle/_shuffle_array.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py index 16b1f91bcd..d7c72e8daa 100644 --- a/distributed/shuffle/_shuffle_array.py +++ b/distributed/shuffle/_shuffle_array.py @@ -6,7 +6,7 @@ from collections.abc import Generator, Hashable, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from itertools import count, product +from itertools import product from pathlib import Path from typing import Any, Callable @@ -68,7 +68,6 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] chunk_lengths[axis] = len(np.unique(result[1, :])) transfer_keys = [] - suffixes = count() for i, (start, stop) in enumerate(zip(chunk_boundaries[:-1], chunk_boundaries[1:])): chunk_indexer = sorted_indexer[:, start:stop].copy() @@ -79,7 +78,7 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] chunk_indexer[0, :] -= chunk_boundaries[i] for chunk_tuple in chunk_tuples: - key = (transfer_group,) + (next(suffixes),) + key = (transfer_group,) + convert_key(chunk_tuple, i, axis) transfer_keys.append(key) dsk[key] = ( shuffle_transfer, From abec1805d4c32358816c1f6d4ff4c57efe9069ff Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:08:03 +0200 Subject: [PATCH 4/6] Fixup worker calculation --- distributed/shuffle/_shuffle_array.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py index d7c72e8daa..c08ec965c7 100644 --- a/distributed/shuffle/_shuffle_array.py +++ b/distributed/shuffle/_shuffle_array.py @@ -157,16 +157,17 @@ class ArrayShuffleSpec(ShuffleSpec[NDIndex]): def output_partitions(self) -> Generator[NDIndex, None, None]: yield from product(*(range(c) for c in self.chunk_lengths)) + @property + def positions(self) -> list[int]: + return [1] + np.cumprod(self.chunk_lengths).tolist() + def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str: npartitions = 1 for c in self.chunk_lengths: npartitions *= c ix = 0 for dim, pos in enumerate(partition): - if dim > 0: - ix += self.chunk_lengths[dim - 1] * pos - else: - ix += pos + ix += self.positions[dim] * pos i = len(workers) * ix // npartitions return workers[i] From f7270338215c755f5cd227ea8f8b409cde14935c Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 2 Aug 2024 14:30:19 +0200 Subject: [PATCH 5/6] Use cached property --- distributed/shuffle/_shuffle_array.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py index c08ec965c7..09b05714b9 100644 --- a/distributed/shuffle/_shuffle_array.py +++ b/distributed/shuffle/_shuffle_array.py @@ -1,5 +1,6 @@ from __future__ import annotations +import functools import mmap import os from collections import defaultdict @@ -157,7 +158,7 @@ class ArrayShuffleSpec(ShuffleSpec[NDIndex]): def output_partitions(self) -> Generator[NDIndex, None, None]: yield from product(*(range(c) for c in self.chunk_lengths)) - @property + @functools.cached_property def positions(self) -> list[int]: return [1] + np.cumprod(self.chunk_lengths).tolist() From 938ec20bdca01ea28aafa700dc4fc0a375700711 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 7 Aug 2024 13:00:12 +0200 Subject: [PATCH 6/6] Fixup p2p bugs --- distributed/shuffle/_shuffle_array.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/shuffle/_shuffle_array.py b/distributed/shuffle/_shuffle_array.py index 09b05714b9..dc66d43ec7 100644 --- a/distributed/shuffle/_shuffle_array.py +++ b/distributed/shuffle/_shuffle_array.py @@ -71,8 +71,11 @@ def _p2p_shuffle( # type: ignore[no-untyped-def] transfer_keys = [] for i, (start, stop) in enumerate(zip(chunk_boundaries[:-1], chunk_boundaries[1:])): + start = np.searchsorted(sorted_indexer[0, :], start) + stop = np.searchsorted(sorted_indexer[0, :], stop) + chunk_indexer = sorted_indexer[:, start:stop].copy() - if len(chunk_indexer) == 0: + if chunk_indexer.shape[1] == 0: # skip output chunks that don't get any data continue