-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Retry bulk request to OpenSearch (#572)
* Add retry to bulk request Signed-off-by: Tomoyuki Morita <[email protected]> * Retry only failed items Signed-off-by: Tomoyuki Morita <[email protected]> * Address comments Signed-off-by: Tomoyuki Morita <[email protected]> * Fix isCreateConflict Signed-off-by: Tomoyuki Morita <[email protected]> * Add and fix unit tests Signed-off-by: Tomoyuki Morita <[email protected]> --------- Signed-off-by: Tomoyuki Morita <[email protected]> (cherry picked from commit 3db16ec) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
3d59e6b
commit 87eab54
Showing
5 changed files
with
299 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
105 changes: 105 additions & 0 deletions
105
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package org.opensearch.flint.core.storage; | ||
|
||
import dev.failsafe.Failsafe; | ||
import dev.failsafe.FailsafeException; | ||
import dev.failsafe.RetryPolicy; | ||
import dev.failsafe.function.CheckedPredicate; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.logging.Logger; | ||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.bulk.BulkItemResponse; | ||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.client.RequestOptions; | ||
import org.opensearch.client.RestHighLevelClient; | ||
import org.opensearch.flint.core.http.FlintRetryOptions; | ||
import org.opensearch.rest.RestStatus; | ||
|
||
public class OpenSearchBulkRetryWrapper { | ||
|
||
private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName()); | ||
|
||
private final RetryPolicy<BulkResponse> retryPolicy; | ||
|
||
public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) { | ||
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate); | ||
} | ||
|
||
/** | ||
* Delegate bulk request to the client, and retry the request if the response contains retryable | ||
* failure. It won't retry when bulk call thrown exception. | ||
* @param client used to call bulk API | ||
* @param bulkRequest requests passed to bulk method | ||
* @param options options passed to bulk method | ||
* @return Last result | ||
*/ | ||
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest, | ||
RequestOptions options) { | ||
try { | ||
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest); | ||
return Failsafe | ||
.with(retryPolicy) | ||
.get(() -> { | ||
BulkResponse response = client.bulk(nextRequest.get(), options); | ||
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test( | ||
response)) { | ||
nextRequest.set(getRetryableRequest(nextRequest.get(), response)); | ||
} | ||
return response; | ||
}); | ||
} catch (FailsafeException ex) { | ||
LOG.severe("Request failed permanently. Re-throwing original exception."); | ||
|
||
// unwrap original exception and throw | ||
throw new RuntimeException(ex.getCause()); | ||
} | ||
} | ||
|
||
private BulkRequest getRetryableRequest(BulkRequest request, BulkResponse response) { | ||
List<DocWriteRequest<?>> bulkItemRequests = request.requests(); | ||
BulkItemResponse[] bulkItemResponses = response.getItems(); | ||
BulkRequest nextRequest = new BulkRequest() | ||
.setRefreshPolicy(request.getRefreshPolicy()); | ||
nextRequest.setParentTask(request.getParentTask()); | ||
for (int i = 0; i < bulkItemRequests.size(); i++) { | ||
if (isItemRetryable(bulkItemResponses[i])) { | ||
verifyIdMatch(bulkItemRequests.get(i), bulkItemResponses[i]); | ||
nextRequest.add(bulkItemRequests.get(i)); | ||
} | ||
} | ||
LOG.info(String.format("Added %d requests to nextRequest", nextRequest.requests().size())); | ||
return nextRequest; | ||
} | ||
|
||
private static void verifyIdMatch(DocWriteRequest<?> request, BulkItemResponse response) { | ||
if (request.id() != null && !request.id().equals(response.getId())) { | ||
throw new RuntimeException("id doesn't match: " + request.id() + " / " + response.getId()); | ||
} | ||
} | ||
|
||
/** | ||
* A predicate to decide if a BulkResponse is retryable or not. | ||
*/ | ||
private static final CheckedPredicate<BulkResponse> bulkItemRetryableResultPredicate = bulkResponse -> | ||
bulkResponse.hasFailures() && isRetryable(bulkResponse); | ||
|
||
private static boolean isRetryable(BulkResponse bulkResponse) { | ||
if (Arrays.stream(bulkResponse.getItems()) | ||
.anyMatch(itemResp -> isItemRetryable(itemResp))) { | ||
LOG.info("Found retryable failure in the bulk response"); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
private static boolean isItemRetryable(BulkItemResponse itemResponse) { | ||
return itemResponse.isFailed() && !isCreateConflict(itemResponse); | ||
} | ||
|
||
private static boolean isCreateConflict(BulkItemResponse itemResp) { | ||
return itemResp.getOpType() == DocWriteRequest.OpType.CREATE && | ||
itemResp.getFailure().getStatus() == RestStatus.CONFLICT; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
160 changes: 160 additions & 0 deletions
160
...core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package org.opensearch.flint.core.storage; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import java.util.Map; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
import org.opensearch.action.DocWriteRequest.OpType; | ||
import org.opensearch.action.DocWriteResponse; | ||
import org.opensearch.action.bulk.BulkItemResponse; | ||
import org.opensearch.action.bulk.BulkItemResponse.Failure; | ||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.client.RequestOptions; | ||
import org.opensearch.client.RestHighLevelClient; | ||
import org.opensearch.flint.core.http.FlintRetryOptions; | ||
import org.opensearch.rest.RestStatus; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
class OpenSearchBulkRetryWrapperTest { | ||
|
||
@Mock | ||
BulkRequest bulkRequest; | ||
@Mock | ||
RequestOptions options; | ||
@Mock | ||
BulkResponse successResponse; | ||
@Mock | ||
BulkResponse failureResponse; | ||
@Mock | ||
BulkResponse conflictResponse; | ||
@Mock | ||
RestHighLevelClient client; | ||
@Mock | ||
DocWriteResponse docWriteResponse; | ||
@Mock | ||
IndexRequest indexRequest0, indexRequest1; | ||
@Mock IndexRequest docWriteRequest2; | ||
// BulkItemRequest[] bulkItemRequests = new BulkItemRequest[] { | ||
// new BulkItemRequest(0, docWriteRequest0), | ||
// new BulkItemRequest(1, docWriteRequest1), | ||
// new BulkItemRequest(2, docWriteRequest2), | ||
// }; | ||
BulkItemResponse successItem = new BulkItemResponse(0, OpType.CREATE, docWriteResponse); | ||
BulkItemResponse failureItem = new BulkItemResponse(0, OpType.CREATE, | ||
new Failure("index", "id", null, | ||
RestStatus.TOO_MANY_REQUESTS)); | ||
BulkItemResponse conflictItem = new BulkItemResponse(0, OpType.CREATE, | ||
new Failure("index", "id", null, | ||
RestStatus.CONFLICT)); | ||
|
||
FlintRetryOptions retryOptionsWithRetry = new FlintRetryOptions(Map.of("retry.max_retries", "2")); | ||
FlintRetryOptions retryOptionsWithoutRetry = new FlintRetryOptions( | ||
Map.of("retry.max_retries", "0")); | ||
|
||
@Test | ||
public void withRetryWhenCallSucceed() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithRetry); | ||
when(client.bulk(bulkRequest, options)).thenReturn(successResponse); | ||
when(successResponse.hasFailures()).thenReturn(false); | ||
|
||
BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); | ||
|
||
assertEquals(response, successResponse); | ||
verify(client).bulk(bulkRequest, options); | ||
} | ||
|
||
@Test | ||
public void withRetryWhenCallConflict() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithRetry); | ||
when(client.bulk(any(), eq(options))) | ||
.thenReturn(conflictResponse); | ||
mockConflictResponse(); | ||
when(conflictResponse.hasFailures()).thenReturn(true); | ||
|
||
BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); | ||
|
||
assertEquals(response, conflictResponse); | ||
verify(client).bulk(bulkRequest, options); | ||
} | ||
|
||
@Test | ||
public void withRetryWhenCallFailOnce() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithRetry); | ||
when(client.bulk(any(), eq(options))) | ||
.thenReturn(failureResponse) | ||
.thenReturn(successResponse); | ||
mockFailureResponse(); | ||
when(successResponse.hasFailures()).thenReturn(false); | ||
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); | ||
|
||
BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); | ||
|
||
assertEquals(response, successResponse); | ||
verify(client, times(2)).bulk(any(), eq(options)); | ||
} | ||
|
||
@Test | ||
public void withRetryWhenAllCallFail() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithRetry); | ||
when(client.bulk(any(), eq(options))) | ||
.thenReturn(failureResponse); | ||
mockFailureResponse(); | ||
|
||
BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); | ||
|
||
assertEquals(response, failureResponse); | ||
verify(client, times(3)).bulk(any(), eq(options)); | ||
} | ||
|
||
@Test | ||
public void withRetryWhenCallThrowsShouldNotRetry() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithRetry); | ||
when(client.bulk(bulkRequest, options)).thenThrow(new RuntimeException("test")); | ||
|
||
assertThrows(RuntimeException.class, | ||
() -> bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options)); | ||
|
||
verify(client).bulk(bulkRequest, options); | ||
} | ||
|
||
@Test | ||
public void withoutRetryWhenCallFail() throws Exception { | ||
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( | ||
retryOptionsWithoutRetry); | ||
when(client.bulk(bulkRequest, options)) | ||
.thenReturn(failureResponse); | ||
mockFailureResponse(); | ||
|
||
BulkResponse response = bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options); | ||
|
||
assertEquals(response, failureResponse); | ||
verify(client).bulk(bulkRequest, options); | ||
} | ||
|
||
private void mockFailureResponse() { | ||
when(failureResponse.hasFailures()).thenReturn(true); | ||
when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem}); | ||
} | ||
|
||
private void mockConflictResponse() { | ||
when(conflictResponse.hasFailures()).thenReturn(true); | ||
when(conflictResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, conflictItem}); | ||
} | ||
} |