Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of atomic updates to child documents in SkipExistingDocu… #69

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ boolean isSkipUpdateIfMissing() {
return this.skipUpdateIfMissing;
}

boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
boolean doesDocumentExist(BytesRef indexedDocId) {
assert null != indexedDocId;

// we don't need any fields populated, we just need to know if the doc is in the tlog...
Expand Down Expand Up @@ -214,6 +214,17 @@ boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
}
}

boolean doesChildDocumentExist(AddUpdateCommand cmd) throws IOException {
return RealTimeGetComponent.getInputDocument(
core,
core.getLatestSchema().indexableUniqueKey(cmd.getChildDocIdStr()),
cmd.getIndexedId(),
null,
null,
RealTimeGetComponent.Resolution.DOC
) != null;
}

boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
Expand All @@ -231,11 +242,6 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {

boolean isUpdate = AtomicUpdateDocumentMerger.isAtomicUpdate(cmd);

// boolean existsByLookup = (RealTimeGetComponent.getInputDocument(core, indexedDocId) != null);
// if (docExists != existsByLookup) {
// log.error("Found docExists {} but existsByLookup {} for doc {}", docExists, existsByLookup, indexedDocId.utf8ToString());
// }

if (log.isDebugEnabled()) {
log.debug("Document ID {} ... exists already? {} ... isAtomicUpdate? {} ... isLeader? {}",
indexedDocId.utf8ToString(), doesDocumentExist(indexedDocId), isUpdate, isLeader(cmd));
Expand All @@ -248,11 +254,20 @@ public void processAdd(AddUpdateCommand cmd) throws IOException {
return;
}

if (skipUpdateIfMissing && isUpdate && isLeader(cmd) && !doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
if (skipUpdateIfMissing && isUpdate && isLeader(cmd)) {
if (!cmd.getChildDocIdStr().equals(cmd.getIndexedIdStr())) {
if (!doesChildDocumentExist(cmd)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent child document ID {}", cmd.getChildDocIdStr());
}
return;
}
} else if (!doesDocumentExist(indexedDocId)) {
if (log.isDebugEnabled()) {
log.debug("Skipping update to non-existent document ID {}", indexedDocId.utf8ToString());
}
return;
}
return;
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ public void testSkippableUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
verify(next, never()).processAdd(cmd);
}

@Test
public void testSkippableChildDocUpdateIsSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(false).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next, never()).processAdd(cmd);
}

@Test
public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
Expand All @@ -324,6 +340,22 @@ public void testNonSkippableUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOExcep
verify(next).processAdd(cmd);
}

@Test
public void testNonSkippableChildDocUpdateIsNotSkippedIfSkipUpdatesTrue() throws IOException {
UpdateRequestProcessor next = Mockito.mock(DistributedUpdateProcessor.class);
SkipExistingDocumentsUpdateProcessor processor
= Mockito.spy(new SkipExistingDocumentsUpdateProcessor(defaultRequest, next, false, true));

AddUpdateCommand cmd = Mockito.spy(createAtomicUpdateCmd(defaultRequest));
doReturn(true).when(processor).isLeader(cmd);
doReturn(true).when(processor).doesChildDocumentExist(cmd);
doReturn("123/child1").when(cmd).getChildDocIdStr();
doReturn("123").when(cmd).getIndexedIdStr();

processor.processAdd(cmd);
verify(next).processAdd(cmd);
}

private AddUpdateCommand createInsertUpdateCmd(SolrQueryRequest req) {
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.setIndexedId(docId);
Expand Down
Loading