Skip to content

Commit

Permalink
[3X06NMak] Update Scan to PartitionedScan as Scan is deprecated (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
gem-neo4j authored Sep 29, 2023
1 parent c594912 commit 91ce487
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

import org.neo4j.internal.kernel.api.NodeCursor;
import org.neo4j.internal.kernel.api.Read;
import org.neo4j.internal.kernel.api.Scan;
import org.neo4j.internal.kernel.api.security.AccessMode;
import org.neo4j.internal.kernel.api.PartitionedScan;
import org.neo4j.internal.kernel.api.security.LoginContext;
import org.neo4j.kernel.api.ExecutionContext;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
Expand All @@ -41,8 +41,8 @@ public static BatchJobResult forAllNodes(GraphDatabaseAPI db, ExecutorService ex
AtomicInteger processing = new AtomicInteger();
try ( InternalTransaction tx = db.beginTransaction( KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED ) ) {
KernelTransaction ktx = tx.kernelTransaction();
Function<Read, Scan<NodeCursor>> scanFunction = Read::allNodesScan;
Scan<NodeCursor> scan = scanFunction.apply( ktx.dataRead() );
Read dataRead = ktx.dataRead();
PartitionedScan<NodeCursor> scan = dataRead.allNodesScan(1, ktx.cursorContext());
Function<KernelTransaction,NodeCursor> cursorAllocator = ktx2 -> ktx2.cursors().allocateNodeCursor( ktx2.cursorContext() );
executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) );
}
Expand Down Expand Up @@ -80,7 +80,7 @@ public long getFailures() {
}

private static class BatchJob implements Callable<Void> {
private final Scan<NodeCursor> scan;
private final PartitionedScan<NodeCursor> scan;
private final int batchSize;
private final GraphDatabaseAPI db;
private final Consumer<NodeCursor> consumer;
Expand All @@ -89,7 +89,7 @@ private static class BatchJob implements Callable<Void> {
private final ExecutorService executorService;
private final AtomicInteger processing;

public BatchJob(Scan<NodeCursor> scan, int batchSize, GraphDatabaseAPI db, Consumer<NodeCursor> consumer,
public BatchJob(PartitionedScan<NodeCursor> scan, int batchSize, GraphDatabaseAPI db, Consumer<NodeCursor> consumer,
BatchJobResult result, Function<KernelTransaction,NodeCursor> cursorAllocator, ExecutorService executorService, AtomicInteger processing ) {
this.scan = scan;
this.batchSize = batchSize;
Expand All @@ -106,17 +106,21 @@ public BatchJob(Scan<NodeCursor> scan, int batchSize, GraphDatabaseAPI db, Consu
public Void call() {
try (InternalTransaction tx = db.beginTransaction(KernelTransaction.Type.EXPLICIT, LoginContext.AUTH_DISABLED)) {
KernelTransaction ktx = tx.kernelTransaction();
ktx.acquireStatement();
ExecutionContext executionContext = ktx.createExecutionContext();
try ( NodeCursor cursor = cursorAllocator.apply( ktx )) {
if (scan.reserveBatch( cursor, batchSize, ktx.cursorContext(), AccessMode.Static.FULL )) {
while (scan.reservePartition(cursor, executionContext)) {
// Branch out so that all available threads will get saturated
executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) );
executorService.submit( new BatchJob( scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing ) );
executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing));
executorService.submit(new BatchJob(scan, batchSize, db, consumer, result, cursorAllocator, executorService, processing));
while (processAndReport(cursor)) {
// just continue processing...
}
}
}
tx.commit();
executionContext.complete();
executionContext.close();
return null;
} finally {
result.batches.incrementAndGet();
Expand Down

0 comments on commit 91ce487

Please sign in to comment.