Skip to content

Commit

Permalink
[Carz4Ljs] Safe use of transactions in export csv
Browse files Browse the repository at this point in the history
  • Loading branch information
loveleif authored Oct 30, 2024
1 parent 98094f4 commit 7a5ec7e
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 137 deletions.
9 changes: 5 additions & 4 deletions common/src/main/java/apoc/export/util/ExportUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import apoc.util.Util;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.procedure.TerminationGuard;

public class ExportUtils {
Expand All @@ -42,7 +43,7 @@ public static Stream<ExportProgressInfo> getProgressInfoStream(
ExportConfig exportConfig,
ProgressReporter reporter,
ExportFileManager cypherFileManager,
Consumer<ProgressReporter> dump) {
BiConsumer<Transaction, ProgressReporter> dump) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ExportProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer((pi) -> QueueUtil.put(
Expand All @@ -56,8 +57,8 @@ public static Stream<ExportProgressInfo> getProgressInfoStream(
null,
executorService,
db,
tx -> {
dump.accept(reporterWithConsumer);
threadBoundTx -> {
dump.accept(threadBoundTx, reporterWithConsumer);
return true;
},
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,21 @@ public class NodesAndRelsSubGraph implements SubGraph {
private final Set<String> types = new HashSet<>(20);

public NodesAndRelsSubGraph(Transaction tx, Collection<Node> nodes, Collection<Relationship> rels) {
this(tx, nodes, rels, false);
}

public NodesAndRelsSubGraph(Transaction tx, Collection<Node> nodes, Collection<Relationship> rels, boolean rebind) {
this.tx = tx;
this.nodes = new ArrayList<>(nodes.size());
for (Node node : nodes) {
if (rebind) node = Util.rebind(tx, node);
for (Label label : node.getLabels()) labels.add(label.name());
this.nodes.add(node);
}
this.rels = new HashSet<>(rels);
this.rels = new HashSet<>(rels.size());
for (Relationship rel : rels) {
if (rebind) rel = Util.rebind(tx, rel);
this.rels.add(rel);
this.types.add(rel.getType().name());
}
}
Expand Down
32 changes: 32 additions & 0 deletions common/src/main/java/org/neo4j/cypher/export/ExportData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.cypher.export;

import java.util.Collection;
import java.util.Map;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;

public sealed interface ExportData {
record Database() implements ExportData {}

record NodesAndRels(Collection<Node> nodes, Collection<Relationship> rels) implements ExportData {}

record Query(String cypher, Map<String, Object> params) implements ExportData {}
}
Loading

0 comments on commit 7a5ec7e

Please sign in to comment.