Skip to content

Commit

Permalink
[bkym7TcZ] Optimise apoc.refactor.cloneSubgraph
Browse files Browse the repository at this point in the history
  • Loading branch information
loveleif authored Oct 29, 2024
1 parent 79d2349 commit 33f6907
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 183 deletions.
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
testImplementation group: 'org.neo4j.community', name: 'it-test-support', version: neo4jVersionEffective // , classifier: "tests"
testImplementation group: 'org.neo4j', name: 'log-test-utils', version: neo4jVersionEffective // , classifier: "tests"
testImplementation group: 'org.neo4j', name: 'neo4j-kernel', version: neo4jVersionEffective, classifier: "tests"
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.13.2'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '4.2.0'
testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1'

Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ dependencies {
testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '5.15.0', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
testImplementation 'org.assertj:assertj-core:3.26.3'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
143 changes: 79 additions & 64 deletions core/src/main/java/apoc/refactor/GraphRefactoring.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static apoc.refactor.util.RefactorConfig.RelationshipSelectionStrategy.MERGE;
import static apoc.refactor.util.RefactorUtil.*;
import static apoc.util.Util.withTransactionAndRebind;
import static java.util.stream.StreamSupport.stream;

import apoc.Pools;
import apoc.algo.Cover;
Expand All @@ -35,7 +36,6 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.schema.ConstraintType;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
Expand Down Expand Up @@ -248,93 +248,110 @@ public Stream<NodeRefactorResult> cloneSubgraph(
"""
{
standinNodes :: LIST<LIST<NODE>>,
skipProperties :: LIST<STRING>
skipProperties :: LIST<STRING>,
createNodesInNewTransactions = false :: BOOLEAN
}
""")
Map<String, Object> config) {

if (nodes == null || nodes.isEmpty()) return Stream.empty();

// empty or missing rels list means get all rels between nodes
if (rels == null || rels.isEmpty()) {
rels = Cover.coverNodes(nodes).collect(Collectors.toList());
}

Map<Node, Node> copyMap = new HashMap<>(nodes.size());
List<NodeRefactorResult> resultStream = new ArrayList<>();
final var newNodeByOldNode = new HashMap<Node, Node>(nodes.size());
final var resultStream = new ArrayList<NodeRefactorResult>();

Map<Node, Node> standinMap =
generateStandinMap((List<List<Node>>) config.getOrDefault("standinNodes", Collections.emptyList()));
List<String> skipProperties = (List<String>) config.getOrDefault("skipProperties", Collections.emptyList());
final var standinMap = asNodePairs(config.get("standinNodes"));
final var skipProps = asStringSet(config.get("skipProperties"));
final var createNodesInInnerTx =
Boolean.TRUE.equals(config.getOrDefault("createNodesInNewTransactions", false));

// clone nodes and populate copy map
for (Node node : nodes) {
if (node == null || standinMap.containsKey(node)) continue;
for (final var oldNode : nodes) {

// standinNodes will NOT be cloned
if (oldNode == null || standinMap.containsKey(oldNode)) continue;

NodeRefactorResult result = new NodeRefactorResult(node.getId());
final var result = new NodeRefactorResult(oldNode.getId());
try {
Node copy = withTransactionAndRebind(db, tx, transaction -> {
Node copyTemp = transaction.createNode();
Map<String, Object> properties = node.getAllProperties();
if (skipProperties != null && !skipProperties.isEmpty()) {
for (String skip : skipProperties) properties.remove(skip);
}
copyProperties(properties, copyTemp);
copyLabels(node, copyTemp);
return copyTemp;
});
resultStream.add(result.withOther(copy));
copyMap.put(node, copy);
final Node newNode;
if (!createNodesInInnerTx) newNode = cloneNode(tx, oldNode, skipProps);
else newNode = withTransactionAndRebind(db, tx, innerTx -> cloneNode(innerTx, oldNode, skipProps));
resultStream.add(result.withOther(newNode));
newNodeByOldNode.put(oldNode, newNode);
} catch (Exception e) {
resultStream.add(result.withError(e));
}
}

final Iterator<Relationship> relsIterator;
// empty or missing rels list means get all rels between nodes
if (rels == null || rels.isEmpty())
relsIterator = Cover.coverNodes(nodes).iterator();
else relsIterator = rels.iterator();

// clone relationships, will be between cloned nodes and/or standins
for (Relationship rel : rels) {
while (relsIterator.hasNext()) {
final var rel = relsIterator.next();
if (rel == null) continue;

Node oldStart = rel.getStartNode();
Node newStart = standinMap.getOrDefault(oldStart, copyMap.get(oldStart));
Node newStart = standinMap.getOrDefault(oldStart, newNodeByOldNode.get(oldStart));

Node oldEnd = rel.getEndNode();
Node newEnd = standinMap.getOrDefault(oldEnd, copyMap.get(oldEnd));
Node newEnd = standinMap.getOrDefault(oldEnd, newNodeByOldNode.get(oldEnd));

if (newStart != null && newEnd != null) {
Relationship newrel = newStart.createRelationshipTo(newEnd, rel.getType());
Map<String, Object> properties = rel.getAllProperties();
if (skipProperties != null && !skipProperties.isEmpty()) {
for (String skip : skipProperties) properties.remove(skip);
}
copyProperties(properties, newrel);
}
if (newStart != null && newEnd != null) cloneRel(rel, newStart, newEnd, skipProps);
}

return resultStream.stream();
}

private Map<Node, Node> generateStandinMap(List<List<Node>> standins) {
Map<Node, Node> standinMap = standins.isEmpty() ? Collections.emptyMap() : new HashMap<>(standins.size());

for (List<Node> pairing : standins) {
if (pairing == null) continue;
private static Node cloneNode(final Transaction tx, final Node node, final Set<String> skipProps) {
final var newNode =
tx.createNode(stream(node.getLabels().spliterator(), false).toArray(Label[]::new));
try {
node.getAllProperties().forEach((k, v) -> {
if (skipProps.isEmpty() || !skipProps.contains(k)) newNode.setProperty(k, v);
});
} catch (Exception e) {
newNode.delete();
throw e;
}
return newNode;
}

if (pairing.size() != 2) {
throw new IllegalArgumentException("'standinNodes' must be a list of node pairs");
}
private static void cloneRel(Relationship base, Node from, Node to, final Set<String> skipProps) {
final var rel = from.createRelationshipTo(to, base.getType());
rel.getAllProperties().forEach((k, v) -> {
if (skipProps.isEmpty() || !skipProps.contains(k)) rel.setProperty(k, v);
});
}

Node from = pairing.get(0);
Node to = pairing.get(1);
private Map<Node, Node> asNodePairs(Object o) {
if (o == null) return Collections.emptyMap();
else if (o instanceof List<?> list) {
return list.stream()
.filter(Objects::nonNull)
.map(GraphRefactoring::castNodePair)
.collect(Collectors.toUnmodifiableMap(l -> l.get(0), l -> l.get(1)));
} else {
throw new IllegalArgumentException("Expected a list of node pairs but got " + o);
}
}

if (from == null || to == null) {
throw new IllegalArgumentException("'standinNodes' must be a list of node pairs");
}
private static Set<String> asStringSet(Object o) {
if (o == null) return Collections.emptySet();
else if (o instanceof Collection<?> c && c.stream().allMatch(i -> i instanceof String)) {
return c.stream().map(Object::toString).collect(Collectors.toSet());
} else throw new IllegalArgumentException("Expected a list of string parameter keys but got " + o);
}

standinMap.put(from, to);
private static List<Node> castNodePair(Object o) {
if (o instanceof List<?> l && l.size() == 2 && l.get(0) instanceof Node && l.get(1) instanceof Node) {
//noinspection unchecked
return (List<Node>) l;
} else {
throw new IllegalArgumentException("Expected pair of nodes but got " + o);
}

return standinMap;
}

public record MergedNodeResult(@Description("The merged node.") Node node) {}
Expand Down Expand Up @@ -375,7 +392,7 @@ public Stream<MergedNodeResult> mergeNodes(

final Node first = nodes.get(0);
final List<String> existingSelfRelIds = conf.isPreservingExistingSelfRels()
? StreamSupport.stream(first.getRelationships().spliterator(), false)
? stream(first.getRelationships().spliterator(), false)
.filter(Util::isSelfRel)
.map(Entity::getElementId)
.collect(Collectors.toList())
Expand Down Expand Up @@ -637,11 +654,11 @@ public Stream<RefactorGraphResult> deleteAndReconnect(
return Stream.empty();
}

BiFunction<Node, Direction, Relationship> filterRel = (node, direction) -> StreamSupport.stream(
node.getRelationships(direction).spliterator(), false)
.filter(rels::contains)
.findFirst()
.orElse(null);
BiFunction<Node, Direction, Relationship> filterRel =
(node, direction) -> stream(node.getRelationships(direction).spliterator(), false)
.filter(rels::contains)
.findFirst()
.orElse(null);

nodesToRemove.forEach(node -> {
Relationship relationshipIn = filterRel.apply(node, Direction.INCOMING);
Expand Down Expand Up @@ -695,14 +712,12 @@ public Stream<RefactorGraphResult> deleteAndReconnect(
}

private boolean isUniqueConstraintDefinedFor(String label, String key) {
return StreamSupport.stream(
tx.schema().getConstraints(Label.label(label)).spliterator(), false)
return stream(tx.schema().getConstraints(Label.label(label)).spliterator(), false)
.anyMatch(c -> {
if (!c.isConstraintType(ConstraintType.UNIQUENESS)) {
return false;
}
return StreamSupport.stream(c.getPropertyKeys().spliterator(), false)
.allMatch(k -> k.equals(key));
return stream(c.getPropertyKeys().spliterator(), false).allMatch(k -> k.equals(key));
});
}

Expand Down
Loading

0 comments on commit 33f6907

Please sign in to comment.