Skip to content

Commit

Permalink
[jMd3ZuqI] Fixes arrow stream strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ncordon committed Nov 1, 2023
1 parent 241cc5a commit 1812717
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions core/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import apoc.convert.Json;
import apoc.result.ByteArrayResult;
import apoc.util.QueueBasedSpliterator;
import apoc.util.QueueUtil;
import apoc.util.Util;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -38,8 +36,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -72,34 +68,56 @@ default byte[] writeBatch(BufferAllocator bufferAllocator, List<Map<String, Obje
}

default Stream<ByteArrayResult> export(IN data, ArrowConfig config) {
final BlockingQueue<ByteArrayResult> queue = new ArrayBlockingQueue<>(100);
Util.inTxFuture(getExecutorService(), getGraphDatabaseApi(), txInThread -> {
class ExportIterator implements Iterator<ByteArrayResult> {
ByteArrayResult current;
int batchCount = 0;
List<Map<String, Object>> rows = new ArrayList<>(config.getBatchSize());
try {
Iterator<Map<String, Object>> it = toIterator(data);
while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext()) {
Iterator<Map<String, Object>> it;

public ExportIterator(IN data) {
it = toIterator(data);
current = null;
computeBatch();
}

@Override
public boolean hasNext()
{
return current != null;
}

@Override
public ByteArrayResult next()
{
ByteArrayResult result = current;
current = null;
computeBatch();
return result;
}

private void computeBatch() {
boolean keepIterating = true;
List<Map<String, Object>> rows = new ArrayList<>(config.getBatchSize());

while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext() && keepIterating) {
rows.add(it.next());
if (batchCount > 0 && batchCount % config.getBatchSize() == 0) {
final byte[] bytes = writeBatch(getBufferAllocator(), rows);
QueueUtil.put(queue, new ByteArrayResult(bytes), 10);
rows.clear();
current = new ByteArrayResult(bytes);
keepIterating = false;
}
++batchCount;
}

if (!rows.isEmpty()) {
final byte[] bytes = writeBatch(getBufferAllocator(), rows);
QueueUtil.put(queue, new ByteArrayResult(bytes), 10);
current = new ByteArrayResult(bytes);
}
} catch (Exception e) {
getLogger().error("Exception while extracting Arrow data:", e);
} finally {
QueueUtil.put(queue, ByteArrayResult.NULL, 10);
}
return true;
});
QueueBasedSpliterator<ByteArrayResult> spliterator = new QueueBasedSpliterator<>(queue, ByteArrayResult.NULL, getTerminationGuard(), Integer.MAX_VALUE);
return StreamSupport.stream(spliterator, false);
}

var streamIterator = new ExportIterator(data);
Iterable<ByteArrayResult> iterable = () -> streamIterator;
return StreamSupport.stream(iterable.spliterator(), false);
}

default Object convertValue(Object data) {
Expand Down

0 comments on commit 1812717

Please sign in to comment.