Skip to content

Commit

Permalink
Merge pull request #8186 from FlorentinD/backup-different-threadpool
Browse files Browse the repository at this point in the history
Use dedicated thread pool for Aura backups
  • Loading branch information
FlorentinD authored Oct 9, 2023
2 parents 1101b4c + e8cb4d4 commit 2fc7a87
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,14 @@
import org.neo4j.gds.concurrency.PoolSizes;
import org.neo4j.gds.concurrency.PoolSizesService;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public final class DefaultPool {

public static final ExecutorService INSTANCE = createDefaultPool(PoolSizesService.poolSizes());

private static ExecutorService createDefaultPool(PoolSizes poolSizes) {
return new ThreadPoolExecutor(
poolSizes.corePoolSize(),
poolSizes.maxPoolSize(),
30L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(poolSizes.corePoolSize() * 50),
ExecutorServiceUtil.DEFAULT_THREAD_FACTORY,
new ExecutorServiceUtil.CallerBlocksPolicy()
);
return ExecutorServiceUtil.createThreadPool(poolSizes.corePoolSize(), poolSizes.maxPoolSize());
}

private DefaultPool() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.internal.helpers.NamedThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -48,6 +49,22 @@ public static ExecutorService createSingleThreadPool(String threadPrefix) {
return Executors.newSingleThreadExecutor(NamedThreadFactory.daemon(threadPrefix));
}

static ExecutorService createThreadPool(int corePoolSize, int maxPoolSize) {
return createThreadPool(THREAD_NAME_PREFIX, corePoolSize, maxPoolSize);
}

public static ExecutorService createThreadPool(String threadPrefix, int corePoolSize, int maxPoolSize) {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
30L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(corePoolSize * 50),
NamedThreadFactory.daemon(threadPrefix),
new ExecutorServiceUtil.CallerBlocksPolicy()
);
}

public static ForkJoinPool createForkJoinPool(int concurrency) {
return new ForkJoinPool(concurrency, FJ_WORKER_THREAD_FACTORY, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class GraphStoreToFileExporter extends GraphStoreExporter<GraphStoreToFileExporterConfig> {
Expand All @@ -66,6 +67,7 @@ public class GraphStoreToFileExporter extends GraphStoreExporter<GraphStoreToFil
private final TaskRegistryFactory taskRegistryFactory;
private final Log log;
private final String rootTaskName;
private ExecutorService executorService;

public GraphStoreToFileExporter(
GraphStore graphStore,
Expand All @@ -84,7 +86,8 @@ public GraphStoreToFileExporter(
VisitorProducer<GraphPropertyVisitor> graphPropertyVisitorSupplier,
TaskRegistryFactory taskRegistryFactory,
Log log,
String rootTaskName
String rootTaskName,
ExecutorService executorService
) {
super(graphStore, config, neoNodeProperties, nodeLabelMapping);
this.nodeVisitorSupplier = nodeVisitorSupplier;
Expand All @@ -100,6 +103,7 @@ public GraphStoreToFileExporter(
this.taskRegistryFactory = taskRegistryFactory;
this.log = log;
this.rootTaskName = rootTaskName;
this.executorService = executorService;
}

@Override
Expand Down Expand Up @@ -168,6 +172,7 @@ private void exportNodes(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.run();
progressTracker.endSubTask();
}
Expand All @@ -192,6 +197,7 @@ private void exportRelationships(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.mayInterruptIfRunning(false)
.run();
progressTracker.endSubTask();
Expand All @@ -218,6 +224,7 @@ private void exportGraphProperties(
RunWithConcurrency.builder()
.concurrency(config.writeConcurrency())
.tasks(tasks)
.executor(executorService)
.run();
progressTracker.endSubTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.neo4j.gds.core.io.GraphStoreExporter.DIRECTORY_IS_WRITABLE;
Expand All @@ -49,10 +50,11 @@ public static ExportToCsvResult export(
GraphStoreToFileExporterConfig config,
Optional<NeoNodeProperties> neoNodeProperties,
TaskRegistryFactory taskRegistryFactory,
Log log
Log log,
ExecutorService executorService
) {
try {
var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log);
var exporter = GraphStoreToCsvExporter.create(graphStore, config, path, neoNodeProperties, taskRegistryFactory, log, executorService);

var start = System.nanoTime();
var exportedProperties = exporter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.gds.api.GraphStore;
import org.neo4j.gds.api.nodeproperties.ValueType;
import org.neo4j.gds.api.schema.MutableNodeSchema;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.io.NeoNodeProperties;
import org.neo4j.gds.core.io.NodeLabelMapping;
import org.neo4j.gds.core.io.file.GraphStoreToFileExporter;
Expand All @@ -35,6 +36,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

public final class GraphStoreToCsvExporter {

Expand All @@ -44,7 +46,14 @@ public static GraphStoreToFileExporter create(
GraphStoreToFileExporterConfig config,
Path exportPath
) {
return create(graphStore, config, exportPath, Optional.empty(), TaskRegistryFactory.empty(), NullLog.getInstance());
return create(graphStore,
config,
exportPath,
Optional.empty(),
TaskRegistryFactory.empty(),
NullLog.getInstance(),
DefaultPool.INSTANCE
);
}

public static GraphStoreToFileExporter create(
Expand All @@ -53,7 +62,8 @@ public static GraphStoreToFileExporter create(
Path exportPath,
Optional<NeoNodeProperties> neoNodeProperties,
TaskRegistryFactory taskRegistryFactory,
Log log
Log log,
ExecutorService executorService
) {
Set<String> headerFiles = ConcurrentHashMap.newKeySet();

Expand All @@ -79,7 +89,7 @@ public static GraphStoreToFileExporter create(
config,
neoNodeProperties,
nodeLabelMapping,
() -> new UserInfoVisitor(exportPath),
() -> new UserInfoVisitor(exportPath),
() -> new CsvGraphInfoVisitor(exportPath),
() -> new CsvNodeSchemaVisitor(exportPath),
() -> new CsvNodeLabelMappingVisitor(exportPath),
Expand All @@ -102,7 +112,8 @@ public static GraphStoreToFileExporter create(
),
taskRegistryFactory,
log,
"Csv"
"Csv",
executorService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
import org.neo4j.gds.core.CypherMapWrapper;
import org.neo4j.gds.core.GraphDimensions;
import org.neo4j.gds.core.concurrency.DefaultPool;
import org.neo4j.gds.core.io.GraphStoreExporterBaseConfig;
import org.neo4j.gds.core.io.NeoNodeProperties;
import org.neo4j.gds.core.io.db.GraphStoreToDatabaseExporter;
Expand Down Expand Up @@ -160,7 +161,8 @@ public Stream<FileExportResult> csv(
exportConfig,
neoNodeProperties(exportConfig, graphStore),
executionContext().taskRegistryFactory(),
executionContext().log()
executionContext().log(),
DefaultPool.INSTANCE
);

return Stream.of(new FileExportResult(
Expand Down

0 comments on commit 2fc7a87

Please sign in to comment.