Skip to content

Commit

Permalink
Fix handling of atomic updates to child documents in SkipExistingDocu…
Browse files Browse the repository at this point in the history
…mentsProcessorFactory

Now an atomic update for a child document will be quietly skipped if the child doc is missing
  • Loading branch information
timatbw committed Jul 23, 2024
1 parent c3180d7 commit e1bf5c1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ boolean isSkipUpdateIfMissing() {
return this.skipUpdateIfMissing;
}

boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
boolean doesDocumentExist(BytesRef indexedDocId) {
assert null != indexedDocId;

// we don't need any fields populated, we just need to know if the doc is in the tlog...
Expand Down Expand Up @@ -214,6 +214,17 @@ boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
}
}

boolean doesChildDocumentExist(AddUpdateCommand cmd) throws IOException {
return RealTimeGetComponent.getInputDocument(
core,
core.getLatestSchema().indexableUniqueKey(cmd.getChildDocIdStr()),
cmd.getIndexedId(),
null,
null,
RealTimeGetComponent.Resolution.DOC
) != null;
}

boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
Expand All @@ -231,11 +242,6 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {

boolean isUpdate = AtomicUpdateDocumentMerger.isAtomicUpdate(cmd);

// boolean existsByLookup = (RealTimeGetComponent.getInputDocument(core, indexedDocId) != null);
// if (docExists != existsByLookup) {
// log.error("Found docExists {} but existsByLookup {} for doc {}", docExists, existsByLookup, indexedDocId.utf8ToString());
// }

if (log.isDebugEnabled()) {
log.debug("Document ID {} ... exists already? {} ... isAtomicUpdate? {} ... isLeader? {}",
indexedDocId.utf8ToString(), doesDocumentExist(indexedDocId), isUpdate, isLeader(cmd));
Expand All @@ -248,11 +254,20 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {
return;
}

if (skipUpdateIfMissing && isUpdate && isLeader(cmd) && !doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
if (skipUpdateIfMissing && isUpdate && isLeader(cmd)) {
if (!cmd.getChildDocIdStr().equals(cmd.getIndexedIdStr())) {
if (!doesChildDocumentExist(cmd)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent child document ID {}", cmd.getChildDocIdStr());
}
return;
}
} else if (!doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
}
return;
}
return;
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ public void testSkippableUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
verify(next, never()).processAdd(cmd);
}

@Test
public void testSkippableChildDocUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(false).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next, never()).processAdd(cmd);
}

@Test
public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
Expand All @@ -324,6 +340,22 @@ public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOExcep
verify(next).processAdd(cmd);
}

@Test
public void testNonSkippableChildDocUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(true).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next).processAdd(cmd);
}

private AddUpdateCommand createInsertUpdateCmd(SolrQueryRequest req) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.setIndexedId(docId);
Expand Down

0 comments on commit e1bf5c1

Please sign in to comment.