diff --git a/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java b/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java
index b877cb241..a39eec082 100644
--- a/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java
+++ b/okhttp-clients/src/main/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptor.java
@@ -22,14 +22,8 @@
import com.netflix.concurrency.limits.Limiter;
import com.palantir.logsafe.Preconditions;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
import okhttp3.Interceptor;
import okhttp3.Response;
-import okhttp3.ResponseBody;
-import okio.BufferedSource;
/**
* Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state
@@ -52,7 +46,7 @@
* 429 and 503 response codes are used for backpressure, whilst 200 -> 399 request codes are used for determining
* new limits and all other codes are not factored in to timings.
*
- * Concurrency permits are only released when the response body is closed.
+ * Concurrency permits are released when the response is received.
*/
final class ConcurrencyLimitingInterceptor implements Interceptor {
private static final ImmutableSet DROPPED_CODES = ImmutableSet.of(429, 503);
@@ -75,71 +69,16 @@ public Response intercept(Chain chain) throws IOException {
if (DROPPED_CODES.contains(response.code())) {
listener.onDropped();
return response;
- } else if (!response.isSuccessful() || response.isRedirect()) {
+ } else if (!response.isSuccessful() || response.isRedirect() || response.body() == null) {
listener.onIgnore();
return response;
} else {
- return wrapResponse(listener, response);
+ listener.onSuccess();
+ return response;
}
} catch (IOException e) {
listener.onIgnore();
throw e;
}
}
-
- private static Response wrapResponse(Limiter.Listener listener, Response response) throws IOException {
- // OkHttp guarantees not-null to execute() and callbacks, but not at this level.
- if (response.body() == null) {
- listener.onIgnore();
- return response;
- } else if (response.body().source().exhausted()) {
- // this case exists for Feign, which does not properly close empty responses
- listener.onSuccess();
- return response;
- }
- ResponseBody currentBody = response.body();
- ResponseBody newResponseBody =
- ResponseBody.create(
- currentBody.contentType(),
- currentBody.contentLength(),
- wrapSource(currentBody.source(), listener));
- return response.newBuilder()
- .body(newResponseBody)
- .build();
- }
-
- private static BufferedSource wrapSource(BufferedSource currentSource, Limiter.Listener listener) {
- return (BufferedSource) Proxy.newProxyInstance(
- BufferedSource.class.getClassLoader(),
- new Class>[] {BufferedSource.class},
- new ReleaseConcurrencyLimitProxy(currentSource, listener));
- }
-
- /**
- * This proxy enables e.g. Okio to make additive additions to their API without breaking us.
- */
- private static final class ReleaseConcurrencyLimitProxy implements InvocationHandler {
- private final BufferedSource delegate;
- private final Limiter.Listener listener;
- private boolean closed = false;
-
- private ReleaseConcurrencyLimitProxy(BufferedSource delegate, Limiter.Listener listener) {
- this.delegate = delegate;
- this.listener = listener;
- }
-
- @Override
- public Object invoke(Object _proxy, Method method, Object[] args) throws Throwable {
- if (method.getName().equals("close") && !closed) {
- closed = true;
- listener.onSuccess();
- }
-
- try {
- return method.invoke(delegate, args);
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
- }
- }
}
diff --git a/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java b/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java
index 580cfb82b..d9112cfed 100644
--- a/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java
+++ b/okhttp-clients/src/test/java/com/palantir/conjure/java/okhttp/ConcurrencyLimitingInterceptorTest.java
@@ -114,10 +114,12 @@ public void wrapsResponseBody() throws IOException {
String data = "data";
ResponseBody body = ResponseBody.create(MediaType.parse("application/json"), data);
when(chain.proceed(request)).thenReturn(response.newBuilder().body(body).build());
- Response wrappedResponse = interceptor.intercept(chain);
- verifyZeroInteractions(listener);
- assertThat(wrappedResponse.body().string()).isEqualTo(data);
+ Response interceptedResponse = interceptor.intercept(chain);
verify(listener).onSuccess();
+ // Previously onSuccess would be called after the response body was read. This asserts that that behavior no
+ // longer exists.
+ assertThat(interceptedResponse.body().string()).isEqualTo(data);
+ verifyZeroInteractions(listener);
}
@Test