diff --git a/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java b/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java index db6213804..1549c8fde 100644 --- a/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java +++ b/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java @@ -27,9 +27,9 @@ import org.neo4j.internal.kernel.api.NodeCursor; import org.neo4j.internal.kernel.api.Read; -import org.neo4j.internal.kernel.api.Scan; -import org.neo4j.internal.kernel.api.security.AccessMode; +import org.neo4j.internal.kernel.api.PartitionedScan; import org.neo4j.internal.kernel.api.security.LoginContext; +import org.neo4j.kernel.api.ExecutionContext; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.internal.GraphDatabaseAPI; @@ -41,8 +41,8 @@ public static BatchJobResult forAllNodes(GraphDatabaseAPI db, ExecutorService ex AtomicInteger processing = new AtomicInteger(); try ( InternalTransaction tx = db.beginTransaction( KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED ) ) { KernelTransaction ktx = tx.kernelTransaction(); - Function> scanFunction = Read::allNodesScan; - Scan scan = scanFunction.apply( ktx.dataRead() ); + Read dataRead = ktx.dataRead(); + PartitionedScan scan = dataRead.allNodesScan(1, ktx.cursorContext()); Function cursorAllocator = ktx2 -> ktx2.cursors().allocateNodeCursor( ktx2.cursorContext() ); executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); } @@ -80,7 +80,7 @@ public long getFailures() { } private static class BatchJob implements Callable { - private final Scan scan; + private final PartitionedScan scan; private final int batchSize; private final GraphDatabaseAPI db; private final Consumer consumer; @@ -89,7 +89,7 @@ private static class BatchJob implements Callable { private final ExecutorService executorService; private final AtomicInteger processing; - public BatchJob(Scan scan, int batchSize, GraphDatabaseAPI db, Consumer consumer, + public BatchJob(PartitionedScan scan, int batchSize, GraphDatabaseAPI db, Consumer consumer, BatchJobResult result, Function cursorAllocator, ExecutorService executorService, AtomicInteger processing ) { this.scan = scan; this.batchSize = batchSize; @@ -106,17 +106,21 @@ public BatchJob(Scan scan, int batchSize, GraphDatabaseAPI db, Consu public Void call() { try (InternalTransaction tx = db.beginTransaction(KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED)) { KernelTransaction ktx = tx.kernelTransaction(); + ktx.acquireStatement(); + ExecutionContext executionContext = ktx.createExecutionContext(); try ( NodeCursor cursor = cursorAllocator.apply( ktx )) { - if (scan.reserveBatch( cursor, batchSize, ktx.cursorContext(), AccessMode.Static.FULL )) { + while (scan.reservePartition(cursor, executionContext)) { // Branch out so that all available threads will get saturated - executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); - executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); + executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing)); + executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing)); while (processAndReport(cursor)) { // just continue processing... } } } tx.commit(); + executionContext.complete(); + executionContext.close(); return null; } finally { result.batches.incrementAndGet();