Skip to content

Commit

Permalink
Merge pull request #67 from timatbw/fix-atomic-add
Browse files Browse the repository at this point in the history
SOLR-15213 backport from Solr 9 - Atomic updates: "add" now uses add-…
  • Loading branch information
timatbw authored Jul 18, 2024
2 parents d2ac0e0 + 8c50b29 commit 0afc9ca
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -32,9 +33,9 @@
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
Expand All @@ -47,7 +48,6 @@
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
Expand Down Expand Up @@ -84,6 +84,10 @@ public AtomicUpdateDocumentMerger(SolrQueryRequest queryReq) {
*/
public static boolean isAtomicUpdate(final AddUpdateCommand cmd) {
SolrInputDocument sdoc = cmd.getSolrInputDocument();
return isAtomicUpdate(sdoc);
}

private static boolean isAtomicUpdate(SolrInputDocument sdoc) {
for (SolrInputField sif : sdoc.values()) {
Object val = sif.getValue();
if (val instanceof Map && !(val instanceof SolrDocumentBase)) {
Expand Down Expand Up @@ -353,26 +357,6 @@ public static boolean isDerivedFromDoc(SolrInputDocument fullDoc, SolrInputDocum
return true;
}

/**
*
* @param completeHierarchy SolrInputDocument that represents the nested document hierarchy from its root
* @param fieldPath the path to fetch, separated by a '/' e.g. /children/grandChildren
* @return the SolrInputField of fieldPath
*/
public static SolrInputField getFieldFromHierarchy(SolrInputDocument completeHierarchy, String fieldPath) {
// substr to remove first '/'
final List<String> docPaths = StrUtils.splitSmart(fieldPath.substring(1), '/');
Pair<String, Integer> subPath;
SolrInputField sifToReplace = null;
SolrInputDocument currDoc = completeHierarchy;
for (String subPathString: docPaths) {
subPath = getPathAndIndexFromNestPath(subPathString);
sifToReplace = currDoc.getField(subPath.getLeft());
currDoc = (SolrInputDocument) ((List)sifToReplace.getValues()).get(subPath.getRight());
}
return sifToReplace;
}

/**
* Given an AddUpdateCommand containing update operations (e.g. set, inc), merge and resolve the operations into
* a partial document that can be used for indexing the in-place updates. The AddUpdateCommand is modified to contain
Expand Down Expand Up @@ -452,7 +436,77 @@ protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVa

protected void doAdd(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
String name = sif.getName();
toDoc.addField(name, getNativeFieldValue(name, fieldVal));
Object nativeFieldValue = getNativeFieldValue(name, fieldVal);
if (isChildDoc(fieldVal)) {
// our child can be single update or a collection. Normalise to List to make any subsequent
// mapping easier
final List<SolrInputDocument> children = asChildren(fieldVal);
doAddChildren(toDoc, sif, children);
} else {
toDoc.addField(name, nativeFieldValue);
}
}

private List<SolrInputDocument> asChildren(Object fieldVal) {
if (fieldVal instanceof Collection) {
return ((Collection<?>) fieldVal)
.stream().map(SolrInputDocument.class::cast).collect(Collectors.toList());
} else {
return Collections.singletonList((SolrInputDocument) fieldVal);
}
}

private void doAddChildren(
SolrInputDocument toDoc, SolrInputField sif, List<SolrInputDocument> children) {
final String name = sif.getName();

final SolrInputField existingField =
Optional.ofNullable(toDoc.get(name))
.orElseGet(
() -> {
final SolrInputField replacement = new SolrInputField(name);
replacement.setValue(Collections.emptyList());
return replacement;
});

Map<BytesRef, SolrInputDocument> originalChildrenById =
existingField.getValues().stream()
.filter(SolrInputDocument.class::isInstance)
.map(SolrInputDocument.class::cast)
.filter(doc -> doc.containsKey(idField.getName()))
.collect(
Collectors.toMap(
this::readChildIdBytes, doc -> doc, (u, v) -> u, LinkedHashMap::new));

if (existingField.getValues().size() != originalChildrenById.size()) {
throw new SolrException(
ErrorCode.BAD_REQUEST,
"Can't add child document on field: "
+ existingField.getName()
+ " since it contains values which are either not SolrInputDocument's or do not have an id property");
}
for (SolrInputDocument child : children) {
if (isAtomicUpdate(child)) {
// When it is atomic update, update the nested document ONLY if it already exists
final BytesRef childIdBytes = readChildIdBytes(child);
SolrInputDocument original = originalChildrenById.get(childIdBytes);
if (original == null) {
throw new SolrException(
ErrorCode.BAD_REQUEST,
"A nested atomic update can only update an existing nested document");
}
SolrInputDocument merged = mergeDocHavingSameId(child, original);
originalChildrenById.put(childIdBytes, merged);
} else {
// If the child is not atomic, replace any existing nested document with the current one
originalChildrenById.put(readChildIdBytes(child), (child));
}
}
toDoc.setField(name, originalChildrenById.values());
}

private BytesRef readChildIdBytes(SolrInputDocument doc) {
return schema.indexableUniqueKey(doc.get(idField.getName()).getValue().toString());
}

protected void doAddDistinct(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
Expand Down Expand Up @@ -691,12 +745,4 @@ private Optional<Object> findObjectWithTypeFuzziness(Collection<Object> original
}
}

private static Pair<String, Integer> getPathAndIndexFromNestPath(String nestPath) {
List<String> splitPath = StrUtils.splitSmart(nestPath, '#');
if(splitPath.size() == 1) {
return Pair.of(splitPath.get(0), 0);
}
return Pair.of(splitPath.get(0), Integer.parseInt(splitPath.get(1)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -73,10 +74,100 @@ public void testMergeChildDoc() throws Exception {
assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), dummyBlock.getFieldValue("id"));
assertDocContainsSubset(preMergeDoc, dummyBlock);
assertDocContainsSubset(addedDoc, dummyBlock);
assertDocContainsSubset(newChildDoc, (SolrInputDocument) ((List) dummyBlock.getFieldValues("child")).get(1));
final List<SolrInputDocument> children = dummyBlock.getFieldValues("child").stream().map(SolrInputDocument.class::cast).collect(Collectors.toList());
assertDocContainsSubset(newChildDoc, children.get(1));
assertEquals(dummyBlock.getFieldValue("id"), dummyBlock.getFieldValue("id"));
}

@Test
public void testMergeChildDocsWithSameId() throws Exception {
SolrInputDocument existingChild = sdoc("id", "2", "cat_ss", "child");
SolrInputDocument existingDoc = sdoc("id", "1",
"cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")),
"_root_", "1", "child", new ArrayList<>(sdocs(existingChild)));

SolrInputDocument updatedChildDoc = sdoc("id", "2", "cat_ss", "updated child");
SolrInputDocument updateDoc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("add", "bbb"), // add value to collection on parent
"child", Collections.singletonMap("add", sdocs(updatedChildDoc))); // child with same id and updated "cat_ss" field


SolrInputDocument preMergeDoc = new SolrInputDocument(existingDoc);
AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req());
docMerger.merge(updateDoc, existingDoc);
assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), existingDoc.getFieldValue("id"));
assertDocContainsSubset(preMergeDoc, existingDoc);
assertDocContainsSubset(updateDoc, existingDoc);
assertEquals(1, existingDoc.getFieldValues("child").size());
assertDocContainsSubset(updatedChildDoc, ((SolrInputDocument) existingDoc.getFieldValues("child").toArray()[0]));
}

@Test
public void testMergeChildDocsWithSameAndNestedSet() throws Exception {
SolrInputDocument existingChild = sdoc("id", "2", "cat_ss", "child");
SolrInputDocument existingDoc = sdoc("id", "1",
"cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")),
"_root_", "1", "child", new ArrayList<>(sdocs(existingChild)));


SolrInputDocument updatedChildDoc = sdoc("id", "2", "cat_ss", Collections.singletonMap("set", "updated child"));
SolrInputDocument updateDoc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("add", "bbb"), // add value to collection on parent
"child", Collections.singletonMap("add", sdocs(updatedChildDoc))); // child with same id and nested set on "cat_ss" field


SolrInputDocument preMergeDoc = new SolrInputDocument(existingDoc);
AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req());
docMerger.merge(updateDoc, existingDoc);
assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), existingDoc.getFieldValue("id"));
assertDocContainsSubset(preMergeDoc, existingDoc);
assertDocContainsSubset(updateDoc, existingDoc);
assertEquals(1, existingDoc.getFieldValues("child").size());
assertDocContainsSubset(sdoc("id", "2", "cat_ss", "updated child"), ((SolrInputDocument) existingDoc.getFieldValues("child").toArray()[0]));
}

@Test
public void testMergeChildDocsWithMultipleChildDocs() throws Exception {
SolrInputDocument existingChild = sdoc("id", "2", "cat_ss", "child");
SolrInputDocument nonMatchingExistingChild = sdoc("id", "3", "cat_ss", "other");
SolrInputDocument existingDoc = sdoc("id", "1",
"cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")),
"_root_", "1", "child", new ArrayList<>(sdocs(existingChild, nonMatchingExistingChild)));

SolrInputDocument updatedChildDoc = sdoc("id", "2", "cat_ss", "updated child");
SolrInputDocument updateDoc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("add", "bbb"), // add value to collection on parent
"child", Collections.singletonMap("add", sdocs(updatedChildDoc))); // child with same id and updated "cat_ss" field


SolrInputDocument preMergeDoc = new SolrInputDocument(existingDoc);
AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req());
docMerger.merge(updateDoc, existingDoc);
assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), existingDoc.getFieldValue("id"));
assertDocContainsSubset(preMergeDoc, existingDoc);
assertDocContainsSubset(updateDoc, existingDoc);
assertEquals(2, existingDoc.getFieldValues("child").size());
assertDocContainsSubset(updatedChildDoc, ((SolrInputDocument) existingDoc.getFieldValues("child").toArray()[0]));
}

@Test
public void testAtomicUpdateNonExistingChildException() throws Exception {
SolrInputDocument existingChild = sdoc("id", "2", "cat_ss", "child");
SolrInputDocument existingDoc = sdoc("id", "1",
"cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")),
"_root_", "1", "child_ss", new ArrayList<>(sdocs(existingChild)));

SolrInputDocument updateDoc = sdoc("id", "1",
"child_ss", Collections.singletonMap("add", sdoc("id", "3", "cat_ss", Map.of("set", "child2")))); // an atomic update

AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req());

SolrException expected = expectThrows(SolrException.class, () -> {
docMerger.merge(updateDoc, existingDoc);
});
assertTrue(expected.getMessage().equals("A nested atomic update can only update an existing nested document"));
}

@Test
public void testBlockAtomicInplaceUpdates() throws Exception {
SolrInputDocument doc = sdoc("id", "1", "string_s", "root");
Expand Down Expand Up @@ -256,7 +347,7 @@ public void testBlockAtomicStack() throws Exception {

assertU(commit());

assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"),
assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"), //fails
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/child1/[0]/id=='2'",
Expand Down Expand Up @@ -334,7 +425,7 @@ public void testBlockAtomicAdd() throws Exception {

assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:[{\"id\":\"3\", \"cat_ss\": [\"child\"]}]," +
"child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
Expand All @@ -345,7 +436,7 @@ public void testBlockAtomicAdd() throws Exception {
// this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]")
, "=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:[{\"id\":\"3\", \"cat_ss\": [\"child\"]}]," +
"child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
Expand All @@ -356,13 +447,13 @@ public void testBlockAtomicAdd() throws Exception {

assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}]," +
"child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:[{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}]}]," +
"child2:[{\"id\":\"3\", \"cat_ss\": [\"child\"]}]" +
" }}"
);

assertJQ(req("qt","/get", "id","2", "fl","id, cat_ss, child, child3, [child]")
,"=={'doc':{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}" +
,"=={'doc':{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:[{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}]}" +
" }}"
);

Expand All @@ -375,13 +466,13 @@ public void testBlockAtomicAdd() throws Exception {

assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, child4, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]," +
" child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}}], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:[{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]," +
" child4:[{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}]}]}], child2:[{\"id\":\"3\", \"cat_ss\": [\"child\"]}]" +
" }}"
);

assertJQ(req("qt","/get", "id","4", "fl","id, cat_ss, child4, [child]")
,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}" +
,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:[{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}]}" +
" }}"
);

Expand Down Expand Up @@ -426,14 +517,14 @@ public void testBlockAtomicAdd() throws Exception {
"/response/docs/[0]/cat_ss/[2]==\"bbb\"",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/cat_ss/[0]=='child'",
"/response/docs/[0]/child1/[0]/child3/id=='4'",
"/response/docs/[0]/child1/[0]/child3/cat_ss/[0]=='grandChild'",
"/response/docs/[0]/child1/[0]/child3/child4/[0]/id=='5'",
"/response/docs/[0]/child1/[0]/child3/child4/[0]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child1/[0]/child3/child4/[1]/id=='6'",
"/response/docs/[0]/child1/[0]/child3/child4/[1]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child2/id=='3'",
"/response/docs/[0]/child2/cat_ss/[0]=='child'",
"/response/docs/[0]/child1/[0]/child3/[0]/id=='4'",
"/response/docs/[0]/child1/[0]/child3/[0]/cat_ss/[0]=='grandChild'",
"/response/docs/[0]/child1/[0]/child3/[0]/child4/[0]/id=='5'",
"/response/docs/[0]/child1/[0]/child3/[0]/child4/[0]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child1/[0]/child3/[0]/child4/[1]/id=='6'",
"/response/docs/[0]/child1/[0]/child3/[0]/child4/[1]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child2/[0]/id=='3'",
"/response/docs/[0]/child2/[0]/cat_ss/[0]=='child'",
"/response/docs/[0]/child5/[0]/id=='7'",
"/response/docs/[0]/child5/[0]/cat_ss/[0]=='child'",
"/response/docs/[0]/child5/[1]/id=='8'",
Expand Down Expand Up @@ -572,11 +663,9 @@ public void testAtomicUpdateDeleteNoRootField() throws Exception {
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"bbb\"",
"/response/docs/[0]/child1/id==\"2\"",
"/response/docs/[0]/child1/cat_ss/[0]==\"child\""
"/response/docs/[0]/child1/[0]/id==\"2\"",
"/response/docs/[0]/child1/[0]/cat_ss/[0]==\"child\""
);


}

@Test
Expand Down

0 comments on commit 0afc9ca

Please sign in to comment.