From 8b75a79ca098b7df43b0a52618ebd71093b91926 Mon Sep 17 00:00:00 2001 From: Gemma Lamont Date: Mon, 25 Sep 2023 13:05:36 +0200 Subject: [PATCH] [3X06NMak] Update Scan to PartitionedScan as Scan is deprecated --- .../MultiThreadedGlobalGraphOperations.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java b/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java index db6213804..1549c8fde 100644 --- a/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java +++ b/common/src/main/java/apoc/util/kernel/MultiThreadedGlobalGraphOperations.java @@ -27,9 +27,9 @@ import org.neo4j.internal.kernel.api.NodeCursor; import org.neo4j.internal.kernel.api.Read; -import org.neo4j.internal.kernel.api.Scan; -import org.neo4j.internal.kernel.api.security.AccessMode; +import org.neo4j.internal.kernel.api.PartitionedScan; import org.neo4j.internal.kernel.api.security.LoginContext; +import org.neo4j.kernel.api.ExecutionContext; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.internal.GraphDatabaseAPI; @@ -41,8 +41,8 @@ public static BatchJobResult forAllNodes(GraphDatabaseAPI db, ExecutorService ex AtomicInteger processing = new AtomicInteger(); try ( InternalTransaction tx = db.beginTransaction( KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED ) ) { KernelTransaction ktx = tx.kernelTransaction(); - Function> scanFunction = Read::allNodesScan; - Scan scan = scanFunction.apply( ktx.dataRead() ); + Read dataRead = ktx.dataRead(); + PartitionedScan scan = dataRead.allNodesScan(1, ktx.cursorContext()); Function cursorAllocator = ktx2 -> ktx2.cursors().allocateNodeCursor( ktx2.cursorContext() ); executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); } @@ -80,7 +80,7 @@ public long getFailures() { } private static class BatchJob implements Callable { - private final Scan scan; + private final PartitionedScan scan; private final int batchSize; private final GraphDatabaseAPI db; private final Consumer consumer; @@ -89,7 +89,7 @@ private static class BatchJob implements Callable { private final ExecutorService executorService; private final AtomicInteger processing; - public BatchJob(Scan scan, int batchSize, GraphDatabaseAPI db, Consumer consumer, + public BatchJob(PartitionedScan scan, int batchSize, GraphDatabaseAPI db, Consumer consumer, BatchJobResult result, Function cursorAllocator, ExecutorService executorService, AtomicInteger processing ) { this.scan = scan; this.batchSize = batchSize; @@ -106,17 +106,21 @@ public BatchJob(Scan scan, int batchSize, GraphDatabaseAPI db, Consu public Void call() { try (InternalTransaction tx = db.beginTransaction(KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED)) { KernelTransaction ktx = tx.kernelTransaction(); + ktx.acquireStatement(); + ExecutionContext executionContext = ktx.createExecutionContext(); try ( NodeCursor cursor = cursorAllocator.apply( ktx )) { - if (scan.reserveBatch( cursor, batchSize, ktx.cursorContext(), AccessMode.Static.FULL )) { + while (scan.reservePartition(cursor, executionContext)) { // Branch out so that all available threads will get saturated - executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); - executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) ); + executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing)); + executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing)); while (processAndReport(cursor)) { // just continue processing... } } } tx.commit(); + executionContext.complete(); + executionContext.close(); return null; } finally { result.batches.incrementAndGet();