-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry bulk request to OpenSearch #572
Retry bulk request to OpenSearch #572
Conversation
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java
Outdated
Show resolved
Hide resolved
.with(retryPolicy) | ||
.get(() -> { | ||
BulkResponse response = client.bulk(nextRequest.get(), options); | ||
if (retryPolicy.getConfig().allowsRetries() && bulkItemErrorResultPredicate.test( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is retryPolicy.getConfig().allowsRetries()? is it configuratble?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is coming from existing config: retry.max_retries
. When it is set to 0, retry is disabled and it would return false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, do we need to managed max_retries manually? does the RetryPolicy already handle it automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this logic, it checks if retry is enabled so not to generate next retryable request when retry is disabled.
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java
Show resolved
Hide resolved
BulkItemResponse[] bulkItemResponses = response.getItems(); | ||
BulkRequest nextRequest = new BulkRequest() | ||
.setRefreshPolicy(request.getRefreshPolicy()); | ||
nextRequest.setParentTask(request.getParentTask()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is parent task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That indicate the parent task associated with this request. I was not able to find good description from the OpenSearch doc. It looks working like a tag for requests when checking from _tasks
API. (we can filter tasks by parent taskId)
Copying the same value from original request to keep it same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not get it, tasks is OpenSearch internal concept, why the bulk request need to attach task info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't care task info, but as it is an attribute in BulkRequst, just inherit the value from original request so it would be consistent with original request. (inheriting as much as possible from the original request)
...core/src/main/scala/org/opensearch/flint/core/http/handler/BulkItemErrorResultPredicate.java
Outdated
Show resolved
Hide resolved
return false; | ||
} | ||
|
||
private boolean isCreateConflict(BulkItemResponse itemResp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no 429 exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename the method. isCreateConflict is odd..does this mean the request is create and resulted in conflict. Is the intention to only retry requests with conflict failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, it consider other than Conflict response for Create request is retryable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itemResp.getOpType() == DocWriteRequest.OpType.CREATE && (itemResp.getFailure() == null
|| itemResp.getFailure().getStatus() == RestStatus.CONFLICT);
}
Does conflict means throttled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, CONFLICT
means HTTP status 409 Conflict
, which indicates same request came to the same document at the same time, and we shouldn't retry. This logic is coming from original implementation to see the bulk request succeeded or not. itemResp.getFailure() == null
is not needed here, and I'll fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java
Show resolved
Hide resolved
If I understood correctly, whenever there is failure in bulk we will retry with exponential backoff...what is the retry policy earlier? Why do we need separate backoff strategy apart from rate limiter? Can you add parts of your design document to the PR description, so opensource users understand the change. |
Signed-off-by: Tomoyuki Morita <[email protected]>
Originally, retry policy was effective only when whole request was failed. It was not applied when bulk request itself returned with 200, and each request failed.
I put some description in the PR, but which part is missing or unclear? |
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
* Add retry to bulk request Signed-off-by: Tomoyuki Morita <[email protected]> * Retry only failed items Signed-off-by: Tomoyuki Morita <[email protected]> * Address comments Signed-off-by: Tomoyuki Morita <[email protected]> * Fix isCreateConflict Signed-off-by: Tomoyuki Morita <[email protected]> * Add and fix unit tests Signed-off-by: Tomoyuki Morita <[email protected]> --------- Signed-off-by: Tomoyuki Morita <[email protected]> (cherry picked from commit 3db16ec) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Add retry to bulk request * Retry only failed items * Address comments * Fix isCreateConflict * Add and fix unit tests --------- (cherry picked from commit 3db16ec) Signed-off-by: Tomoyuki Morita <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Description
NONE
refresh policy is used, bulk request will be responded quickly (even when the server is overloaded), and causes throttling.Issues Resolved
List any issues this PR will resolve, e.g. Closes [...].
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.