diff --git a/core/src/main/java/org/neo4j/gds/core/concurrency/DefaultPool.java b/core/src/main/java/org/neo4j/gds/core/concurrency/DefaultPool.java index 1ba55b09b2..f16efe316b 100644 --- a/core/src/main/java/org/neo4j/gds/core/concurrency/DefaultPool.java +++ b/core/src/main/java/org/neo4j/gds/core/concurrency/DefaultPool.java @@ -22,25 +22,14 @@ import org.neo4j.gds.concurrency.PoolSizes; import org.neo4j.gds.concurrency.PoolSizesService; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public final class DefaultPool { public static final ExecutorService INSTANCE = createDefaultPool(PoolSizesService.poolSizes()); private static ExecutorService createDefaultPool(PoolSizes poolSizes) { - return new ThreadPoolExecutor( - poolSizes.corePoolSize(), - poolSizes.maxPoolSize(), - 30L, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(poolSizes.corePoolSize() * 50), - ExecutorServiceUtil.DEFAULT_THREAD_FACTORY, - new ExecutorServiceUtil.CallerBlocksPolicy() - ); + return ExecutorServiceUtil.createThreadPool(poolSizes.corePoolSize(), poolSizes.maxPoolSize()); } private DefaultPool() {} diff --git a/core/src/main/java/org/neo4j/gds/core/concurrency/ExecutorServiceUtil.java b/core/src/main/java/org/neo4j/gds/core/concurrency/ExecutorServiceUtil.java index 111de7433e..ae6906da22 100644 --- a/core/src/main/java/org/neo4j/gds/core/concurrency/ExecutorServiceUtil.java +++ b/core/src/main/java/org/neo4j/gds/core/concurrency/ExecutorServiceUtil.java @@ -21,6 +21,7 @@ import org.neo4j.internal.helpers.NamedThreadFactory; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -48,6 +49,22 @@ public static ExecutorService createSingleThreadPool(String threadPrefix) { return Executors.newSingleThreadExecutor(NamedThreadFactory.daemon(threadPrefix)); } + static ExecutorService createThreadPool(int corePoolSize, int maxPoolSize) { + return createThreadPool(THREAD_NAME_PREFIX, corePoolSize, maxPoolSize); + } + + public static ExecutorService createThreadPool(String threadPrefix, int corePoolSize, int maxPoolSize) { + return new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + 30L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(corePoolSize * 50), + NamedThreadFactory.daemon(threadPrefix), + new ExecutorServiceUtil.CallerBlocksPolicy() + ); + } + public static ForkJoinPool createForkJoinPool(int concurrency) { return new ForkJoinPool(concurrency, FJ_WORKER_THREAD_FACTORY, null, false); } diff --git a/io/core/src/main/java/org/neo4j/gds/core/io/file/GraphStoreToFileExporter.java b/io/core/src/main/java/org/neo4j/gds/core/io/file/GraphStoreToFileExporter.java index c86360ae2f..6fd89a74a1 100644 --- a/io/core/src/main/java/org/neo4j/gds/core/io/file/GraphStoreToFileExporter.java +++ b/io/core/src/main/java/org/neo4j/gds/core/io/file/GraphStoreToFileExporter.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; public class GraphStoreToFileExporter extends GraphStoreExporter { @@ -66,6 +67,7 @@ public class GraphStoreToFileExporter extends GraphStoreExporter graphPropertyVisitorSupplier, TaskRegistryFactory taskRegistryFactory, Log log, - String rootTaskName + String rootTaskName, + ExecutorService executorService ) { super(graphStore, config, neoNodeProperties, nodeLabelMapping); this.nodeVisitorSupplier = nodeVisitorSupplier; @@ -100,6 +103,7 @@ public GraphStoreToFileExporter( this.taskRegistryFactory = taskRegistryFactory; this.log = log; this.rootTaskName = rootTaskName; + this.executorService = executorService; } @Override @@ -168,6 +172,7 @@ private void exportNodes( RunWithConcurrency.builder() .concurrency(config.writeConcurrency()) .tasks(tasks) + .executor(executorService) .run(); progressTracker.endSubTask(); } @@ -192,6 +197,7 @@ private void exportRelationships( RunWithConcurrency.builder() .concurrency(config.writeConcurrency()) .tasks(tasks) + .executor(executorService) .mayInterruptIfRunning(false) .run(); progressTracker.endSubTask(); @@ -218,6 +224,7 @@ private void exportGraphProperties( RunWithConcurrency.builder() .concurrency(config.writeConcurrency()) .tasks(tasks) + .executor(executorService) .run(); progressTracker.endSubTask(); } diff --git a/io/csv/src/main/java/org/neo4j/gds/core/io/file/GraphStoreExporterUtil.java b/io/csv/src/main/java/org/neo4j/gds/core/io/file/GraphStoreExporterUtil.java index ca948e3447..e6f585e2d4 100644 --- a/io/csv/src/main/java/org/neo4j/gds/core/io/file/GraphStoreExporterUtil.java +++ b/io/csv/src/main/java/org/neo4j/gds/core/io/file/GraphStoreExporterUtil.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static org.neo4j.gds.core.io.GraphStoreExporter.DIRECTORY_IS_WRITABLE; @@ -49,10 +50,11 @@ public static ExportToCsvResult export( GraphStoreToFileExporterConfig config, Optional neoNodeProperties, TaskRegistryFactory taskRegistryFactory, - Log log + Log log, + ExecutorService executorService ) { try { - var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log); + var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log, executorService); var start = System.nanoTime(); var exportedProperties = exporter.run(); diff --git a/io/csv/src/main/java/org/neo4j/gds/core/io/file/csv/GraphStoreToCsvExporter.java b/io/csv/src/main/java/org/neo4j/gds/core/io/file/csv/GraphStoreToCsvExporter.java index 6e94b7f086..fee3f2a604 100644 --- a/io/csv/src/main/java/org/neo4j/gds/core/io/file/csv/GraphStoreToCsvExporter.java +++ b/io/csv/src/main/java/org/neo4j/gds/core/io/file/csv/GraphStoreToCsvExporter.java @@ -23,6 +23,7 @@ import org.neo4j.gds.api.GraphStore; import org.neo4j.gds.api.nodeproperties.ValueType; import org.neo4j.gds.api.schema.MutableNodeSchema; +import org.neo4j.gds.core.concurrency.DefaultPool; import org.neo4j.gds.core.io.NeoNodeProperties; import org.neo4j.gds.core.io.NodeLabelMapping; import org.neo4j.gds.core.io.file.GraphStoreToFileExporter; @@ -35,6 +36,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; public final class GraphStoreToCsvExporter { @@ -44,7 +46,14 @@ public static GraphStoreToFileExporter create( GraphStoreToFileExporterConfig config, Path exportPath ) { - return create(graphStore, config, exportPath, Optional.empty(), TaskRegistryFactory.empty(), NullLog.getInstance()); + return create(graphStore, + config, + exportPath, + Optional.empty(), + TaskRegistryFactory.empty(), + NullLog.getInstance(), + DefaultPool.INSTANCE + ); } public static GraphStoreToFileExporter create( @@ -53,7 +62,8 @@ public static GraphStoreToFileExporter create( Path exportPath, Optional neoNodeProperties, TaskRegistryFactory taskRegistryFactory, - Log log + Log log, + ExecutorService executorService ) { Set headerFiles = ConcurrentHashMap.newKeySet(); @@ -79,7 +89,7 @@ public static GraphStoreToFileExporter create( config, neoNodeProperties, nodeLabelMapping, - () -> new UserInfoVisitor(exportPath), + () -> new UserInfoVisitor(exportPath), () -> new CsvGraphInfoVisitor(exportPath), () -> new CsvNodeSchemaVisitor(exportPath), () -> new CsvNodeLabelMappingVisitor(exportPath), @@ -102,7 +112,8 @@ public static GraphStoreToFileExporter create( ), taskRegistryFactory, log, - "Csv" + "Csv", + executorService ); } diff --git a/proc/catalog/src/main/java/org/neo4j/gds/catalog/GraphStoreExportProc.java b/proc/catalog/src/main/java/org/neo4j/gds/catalog/GraphStoreExportProc.java index 4c071d2aaa..36ce053ce4 100644 --- a/proc/catalog/src/main/java/org/neo4j/gds/catalog/GraphStoreExportProc.java +++ b/proc/catalog/src/main/java/org/neo4j/gds/catalog/GraphStoreExportProc.java @@ -27,6 +27,7 @@ import org.neo4j.gds.compat.GraphDatabaseApiProxy; import org.neo4j.gds.core.CypherMapWrapper; import org.neo4j.gds.core.GraphDimensions; +import org.neo4j.gds.core.concurrency.DefaultPool; import org.neo4j.gds.core.io.GraphStoreExporterBaseConfig; import org.neo4j.gds.core.io.NeoNodeProperties; import org.neo4j.gds.core.io.db.GraphStoreToDatabaseExporter; @@ -160,7 +161,8 @@ public Stream csv( exportConfig, neoNodeProperties(exportConfig, graphStore), executionContext().taskRegistryFactory(), - executionContext().log() + executionContext().log(), + DefaultPool.INSTANCE ); return Stream.of(new FileExportResult(