Skip to content

Commit

Permalink
[jMd3ZuqI] Fixes arrow stream strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ncordon committed Nov 1, 2023
1 parent 241cc5a commit 7c930cd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
3 changes: 1 addition & 2 deletions core/src/main/java/apoc/export/arrow/ExportArrow.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public Stream<ByteArrayResult> query(@Name("query") String query, @Name(value =
@Procedure("apoc.export.arrow.all")
@Description("Exports the full database as an arrow file.")
public Stream<ProgressInfo> all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
var stream = new ExportArrowService(db, pools, terminationGuard, logger).file(fileName, new DatabaseSubGraph(tx), new ArrowConfig(config));
return stream;
return new ExportArrowService(db, pools, terminationGuard, logger).file(fileName, new DatabaseSubGraph(tx), new ArrowConfig(config));
}

@NotThreadSafe
Expand Down
60 changes: 39 additions & 21 deletions core/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import apoc.convert.Json;
import apoc.result.ByteArrayResult;
import apoc.util.QueueBasedSpliterator;
import apoc.util.QueueUtil;
import apoc.util.Util;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -38,8 +36,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -72,34 +68,56 @@ default byte[] writeBatch(BufferAllocator bufferAllocator, List<Map<String, Obje
}

default Stream<ByteArrayResult> export(IN data, ArrowConfig config) {
final BlockingQueue<ByteArrayResult> queue = new ArrayBlockingQueue<>(100);
Util.inTxFuture(getExecutorService(), getGraphDatabaseApi(), txInThread -> {
class ExportIterator implements Iterator<ByteArrayResult> {
ByteArrayResult current;
int batchCount = 0;
List<Map<String, Object>> rows = new ArrayList<>(config.getBatchSize());
try {
Iterator<Map<String, Object>> it = toIterator(data);
while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext()) {
Iterator<Map<String, Object>> it;

public ExportIterator(IN data) {
it = toIterator(data);
current = null;
computeBatch();
}

@Override
public boolean hasNext()
{
return current != null;
}

@Override
public ByteArrayResult next()
{
ByteArrayResult result = current;
current = null;
computeBatch();
return result;
}

private void computeBatch() {
boolean keepIterating = true;
List<Map<String, Object>> rows = new ArrayList<>(config.getBatchSize());

while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext() && keepIterating) {
rows.add(it.next());
if (batchCount > 0 && batchCount % config.getBatchSize() == 0) {
final byte[] bytes = writeBatch(getBufferAllocator(), rows);
QueueUtil.put(queue, new ByteArrayResult(bytes), 10);
rows.clear();
current = new ByteArrayResult(bytes);
keepIterating = false;
}
++batchCount;
}

if (!rows.isEmpty()) {
final byte[] bytes = writeBatch(getBufferAllocator(), rows);
QueueUtil.put(queue, new ByteArrayResult(bytes), 10);
current = new ByteArrayResult(bytes);
}
} catch (Exception e) {
getLogger().error("Exception while extracting Arrow data:", e);
} finally {
QueueUtil.put(queue, ByteArrayResult.NULL, 10);
}
return true;
});
QueueBasedSpliterator<ByteArrayResult> spliterator = new QueueBasedSpliterator<>(queue, ByteArrayResult.NULL, getTerminationGuard(), Integer.MAX_VALUE);
return StreamSupport.stream(spliterator, false);
}

var streamIterator = new ExportIterator(data);
Iterable<ByteArrayResult> iterable = () -> streamIterator;
return StreamSupport.stream(iterable.spliterator(), false);
}

default Object convertValue(Object data) {
Expand Down

0 comments on commit 7c930cd

Please sign in to comment.