From 213d0edf062dfb671b153eeb48996c143dcc0873 Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Sun, 22 Sep 2019 14:41:13 -0400 Subject: [PATCH] Release limiters when response is returned --- .../ConcurrencyLimitingInterceptor.java | 69 ++----------------- .../ConcurrencyLimitingInterceptorTest.java | 8 ++- 2 files changed, 9 insertions(+), 68 deletions(-) diff --git a/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java b/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java index 8bf4d6719..358b6de47 100644 --- a/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java +++ b/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java @@ -22,14 +22,8 @@ import com.netflix.concurrency.limits.Limiter; import com.palantir.logsafe.Preconditions; import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import okhttp3.Interceptor; import okhttp3.Response; -import okhttp3.ResponseBody; -import okio.BufferedSource; /** * Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state @@ -52,7 +46,7 @@ * 429 and 503 response codes are used for backpressure, whilst 200 -> 399 request codes are used for determining * new limits and all other codes are not factored in to timings. *

- * Concurrency permits are only released when the response body is closed. + * Concurrency permits are released when the response is received. */ final class ConcurrencyLimitingInterceptor implements Interceptor { private static final ImmutableSet DROPPED_CODES = ImmutableSet.of(429, 503); @@ -75,71 +69,16 @@ public Response intercept(Chain chain) throws IOException { if (DROPPED_CODES.contains(response.code())) { listener.onDropped(); return response; - } else if (!response.isSuccessful() || response.isRedirect()) { + } else if (!response.isSuccessful() || response.isRedirect() || response.body() == null) { listener.onIgnore(); return response; } else { - return wrapResponse(listener, response); + listener.onSuccess(); + return response; } } catch (IOException e) { listener.onIgnore(); throw e; } } - - private static Response wrapResponse(Limiter.Listener listener, Response response) throws IOException { - // OkHttp guarantees not-null to execute() and callbacks, but not at this level. - if (response.body() == null) { - listener.onIgnore(); - return response; - } else if (response.body().source().exhausted()) { - // this case exists for Feign, which does not properly close empty responses - listener.onSuccess(); - return response; - } - ResponseBody currentBody = response.body(); - ResponseBody newResponseBody = - ResponseBody.create( - currentBody.contentType(), - currentBody.contentLength(), - wrapSource(currentBody.source(), listener)); - return response.newBuilder() - .body(newResponseBody) - .build(); - } - - private static BufferedSource wrapSource(BufferedSource currentSource, Limiter.Listener listener) { - return (BufferedSource) Proxy.newProxyInstance( - BufferedSource.class.getClassLoader(), - new Class[] { BufferedSource.class }, - new ReleaseConcurrencyLimitProxy(currentSource, listener)); - } - - /** - * This proxy enables e.g. Okio to make additive additions to their API without breaking us. - */ - private static final class ReleaseConcurrencyLimitProxy implements InvocationHandler { - private final BufferedSource delegate; - private final Limiter.Listener listener; - private boolean closed = false; - - private ReleaseConcurrencyLimitProxy(BufferedSource delegate, Limiter.Listener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public Object invoke(Object _proxy, Method method, Object[] args) throws Throwable { - if (method.getName().equals("close") && !closed) { - closed = true; - listener.onSuccess(); - } - - try { - return method.invoke(delegate, args); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - } - } } diff --git a/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java b/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java index 20edf300d..539487c67 100644 --- a/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java +++ b/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java @@ -111,10 +111,12 @@ public void wrapsResponseBody() throws IOException { String data = "data"; ResponseBody body = ResponseBody.create(MediaType.parse("application/json"), data); when(chain.proceed(request)).thenReturn(response.newBuilder().body(body).build()); - Response wrappedResponse = interceptor.intercept(chain); - verifyZeroInteractions(listener); - assertThat(wrappedResponse.body().string()).isEqualTo(data); + Response interceptedResponse = interceptor.intercept(chain); verify(listener).onSuccess(); + // Previously onSuccess would be called after the response body was read. This asserts that that behavior no + // longer exists. + assertThat(interceptedResponse.body().string()).isEqualTo(data); + verifyZeroInteractions(listener); } @Test