Skip to content

Commit

Permalink
Fix for stuck update action in a bulk with retry_on_conflict property (
Browse files Browse the repository at this point in the history
…#11153)

* Bugfix for update staying stuck when sent as part of bulk with retry_on_conflict specified

Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj authored Nov 23, 2023
1 parent 72e63f2 commit fe2d585
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix SuggestSearch.testSkipDuplicates by forceing refresh when indexing its test documents ([#11068](https://github.com/opensearch-project/OpenSearch/pull/11068))
- Delegating CachingWeightWrapper#count to internal weight object ([#10543](https://github.com/opensearch-project/OpenSearch/pull/10543))
- Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934))
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public void resetForExecutionForRetry() {
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
retryCounter++;
assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -947,6 +951,82 @@ public void testRetries() throws Exception {
latch.await();
}

public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

int nItems = randomIntBetween(2, 5);
List<BulkItemRequest> items = new ArrayList<>(nItems);
for (int i = 0; i < nItems; i++) {
int retryOnConflictCount = randomIntBetween(0, 3);
logger.debug("Setting retryCount for item {}: {}", i, retryOnConflictCount);
UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")
.retryOnConflict(retryOnConflictCount);
items.add(new BulkItemRequest(i, updateRequest));
}

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);

IndexShard shard = mock(IndexShard.class);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(
ir -> conflictedResult
);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
updateResponse,
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new));

final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertEquals(nItems, result.replicaRequest().items().length);
for (BulkItemRequest item : result.replicaRequest().items()) {
assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass());
}
}), latch),
threadPool,
Names.WRITE
);

// execute the runnable on a separate thread so that the infinite loop can be detected
new Thread(runnable).start();

// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));

items.forEach(item -> {
assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);

// this assertion is based on the assumption that all bulk item requests are updates and are hence calling
// UpdateRequest::prepareRequest
UpdateRequest updateRequest = (UpdateRequest) item.request();
verify(updateHelper, times(updateRequest.retryOnConflict() + 1)).prepare(
eq(updateRequest),
any(IndexShard.class),
any(LongSupplier.class)
);
});
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Expand Down

0 comments on commit fe2d585

Please sign in to comment.