Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] RestClient does not retry if only given one endpoint (e.g. load balancer) #12563

Open
ruigyang-wish opened this issue Mar 8, 2024 · 5 comments
Labels
Clients Clients within the Core repository such as High level Rest client and low level client enhancement Enhancement or improvement to existing feature or request

Comments

@ruigyang-wish
Copy link

ruigyang-wish commented Mar 8, 2024

Is your feature request related to a problem? Please describe

At present, we are using the AWS managed openSearch, and we use the AWS endpoint, such as https://vpc-57v4bbnpjsz6gmcmhoi2ca.us-west-1.es.amazonaws.com/, as the openSearch host. Actually, there are several nodes behind the endpoint.

Sometimes, the cluster maybe very busy and one of openSearch server node returns 502 Bad Gateway, then the job crashed, below is the call stack, and we didn't observe significant cpu/mem usage issues at that time.

org.apache.flink.util.SerializedThrowable: org.opensearch.client.ResponseException: method [POST], host https://vpc-57v4bbnpjsz6gmcmhoi2ca.us-west-1.es.amazonaws.com//, URI [/_bulk?timeout=1m], status line [HTTP/1.1 502 Bad Gateway]
<html>
<head><title>502 Bad Gateway</title></head>
<body>
<center><h1>502 Bad Gateway</h1></center>
</body>
</html>

at org.opensearch.client.RestClient.convertResponse(RestClient.java:375)
at org.opensearch.client.RestClient$1.completed(RestClient.java:425)
at org.opensearch.client.RestClient$1.completed(RestClient.java:421)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run
org.apache.flink.util.SerializedThrowable: org.opensearch.OpenSearchStatusException: Unable to parse response body
at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2208)
at org.opensearch.client.RestHighLevelClient$1.onFailure(RestHighLevelClient.java:2116)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:707)
at org.opensearch.client.RestClient$1.completed(RestClient.java:433)
at org.opensearch.client.RestClient$1.completed(RestClient.java:421)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run
org.apache.flink.util.SerializedThrowable: java.lang.RuntimeException: An error occurred in OpensearchSink.
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkErrorAndRethrow(OpensearchSink.java:510)
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkAsyncErrorsAndRequests(OpensearchSink.java:515)
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.invoke(OpensearchSink.java:341)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
at jdk.internal.reflect.GeneratedMethodAccessor439.invoke
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke
at java.lang.reflect.Method.invoke

According to the code of openSearch, the openSearch client will mark the host as dead if the openSearch server returns 502 Bad Gateway, then try to forward the request to other available hosts. So if we only pass the load balancer URL of our openSearch cluster, which means the program will crash and won't retry.

https://github.com/opensearch-project/OpenSearch/blob/main/client/rest/src/main/java/org/opensearch/client/RestClient.java#L386-L393

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
    throws IOException {
    //....
    ResponseException responseException = new ResponseException(response);
    if (isRetryStatus(statusCode)) {
        // mark host dead and retry against next one
        onFailure(node);
        return new ResponseOrResponseException(responseException);
    }
    // mark host alive and don't retry, as the error should be a request problem
    onResponse(node);
    throw responseException;
}
private static boolean isRetryStatus(int statusCode) {
    switch (statusCode) {
        case 502:
        case 503:
        case 504:
            return true;
    }
    return false;
}

Describe the solution you'd like

So I suggest openSearch client should expose 2 parameters for each openSearch host.

  • The first parameter can be used to indicate that host is one load balancer or not;
  • If the host is the URL of one load balancer, then the second parameter can be used to indicate how many times we can retry before marking it as dead.

Related component

Clients

Describe alternatives you've considered

Alternatively, I suggest to expose one parameter to allow the openSearch client user can set the maximum retry times before marking one host as dead.

Additional context

NA

@ruigyang-wish ruigyang-wish added enhancement Enhancement or improvement to existing feature or request untriaged labels Mar 8, 2024
@github-actions github-actions bot added the Clients Clients within the Core repository such as High level Rest client and low level client label Mar 8, 2024
@andrross
Copy link
Member

[Triage - attendees 1 2 3]
@ruigyang-wish Thanks for filing this issue. The behavior you're seeing does match the design of the client. Looking forward to more discussion here.

@andrross andrross changed the title [Feature Request]Process differently when the host is one AWS Endpoint [Feature Request] RestClient does not retry if only given one endpoint (e.g. load balancer) Mar 13, 2024
@ruigyang-wish
Copy link
Author

@andrross Thanks for the triage.
I also did further investigation for this issue, it seems there is no elegant solution to identify the passed host is load balancer or not in the RestClient. Of course, we can expose another parameter in RestClientBuilder to allow the users to indicate the host is load balancer or not.

@andrross
Copy link
Member

@ruigyang-wish Can you pass the same endpoint multiple times so it appears to the client like there are multiple hosts to retry across? Or does it dedupe?

@dblock
Copy link
Member

dblock commented Mar 15, 2024

Do other transports in https://github.com/opensearch-project/opensearch-java fix this problem?

@ruigyang-wish
Copy link
Author

@ruigyang-wish Can you pass the same endpoint multiple times so it appears to the client like there are multiple hosts to retry across? Or does it dedupe?

@andrross
I also considered this way, but openSearch is using map to maintain the dead hosts, so it will dedupe.
https://github.com/opensearch-project/OpenSearch/blob/main/client/rest/src/main/java/org/opensearch/client/RestClient.java#L139

private final ConcurrentMap<HttpHost, DeadHostState> denylist = new ConcurrentHashMap<>();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Clients Clients within the Core repository such as High level Rest client and low level client enhancement Enhancement or improvement to existing feature or request
Projects
None yet
Development

No branches or pull requests

3 participants