Skip to content

Commit

Permalink
Merge pull request #69 from timatbw/fix-skip-child-docs
Browse files Browse the repository at this point in the history
Fix handling of atomic updates to child documents in SkipExistingDocu…
  • Loading branch information
timatbw authored Jul 24, 2024
2 parents f116790 + e1bf5c1 commit baa2888
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ boolean isSkipUpdateIfMissing() {
return this.skipUpdateIfMissing;
}

boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
boolean doesDocumentExist(BytesRef indexedDocId) {
assert null != indexedDocId;

// we don't need any fields populated, we just need to know if the doc is in the tlog...
Expand Down Expand Up @@ -214,6 +214,17 @@ boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
}
}

boolean doesChildDocumentExist(AddUpdateCommand cmd) throws IOException {
return RealTimeGetComponent.getInputDocument(
core,
core.getLatestSchema().indexableUniqueKey(cmd.getChildDocIdStr()),
cmd.getIndexedId(),
null,
null,
RealTimeGetComponent.Resolution.DOC
) != null;
}

boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
Expand All @@ -231,11 +242,6 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {

boolean isUpdate = AtomicUpdateDocumentMerger.isAtomicUpdate(cmd);

// boolean existsByLookup = (RealTimeGetComponent.getInputDocument(core, indexedDocId) != null);
// if (docExists != existsByLookup) {
// log.error("Found docExists {} but existsByLookup {} for doc {}", docExists, existsByLookup, indexedDocId.utf8ToString());
// }

if (log.isDebugEnabled()) {
log.debug("Document ID {} ... exists already? {} ... isAtomicUpdate? {} ... isLeader? {}",
indexedDocId.utf8ToString(), doesDocumentExist(indexedDocId), isUpdate, isLeader(cmd));
Expand All @@ -248,11 +254,20 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {
return;
}

if (skipUpdateIfMissing && isUpdate && isLeader(cmd) && !doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
if (skipUpdateIfMissing && isUpdate && isLeader(cmd)) {
if (!cmd.getChildDocIdStr().equals(cmd.getIndexedIdStr())) {
if (!doesChildDocumentExist(cmd)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent child document ID {}", cmd.getChildDocIdStr());
}
return;
}
} else if (!doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
}
return;
}
return;
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ public void testSkippableUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
verify(next, never()).processAdd(cmd);
}

@Test
public void testSkippableChildDocUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(false).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next, never()).processAdd(cmd);
}

@Test
public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
Expand All @@ -324,6 +340,22 @@ public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOExcep
verify(next).processAdd(cmd);
}

@Test
public void testNonSkippableChildDocUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(true).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next).processAdd(cmd);
}

private AddUpdateCommand createInsertUpdateCmd(SolrQueryRequest req) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.setIndexedId(docId);
Expand Down

0 comments on commit baa2888

Please sign in to comment.