From b5ed392bef4f96863cb31e998c9d1467acf86bf1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 29 Jun 2017 15:32:40 +0200 Subject: [PATCH] Fix mapAsync and filterAsync emission accounting --- .../operators/FlowableFilterAsync.java | 10 +++- .../rxjava2/operators/FlowableMapAsync.java | 10 +++- .../operators/FlowableFilterAsyncTest.java | 51 ++++++++++++++++++- .../operators/FlowableMapAsyncTest.java | 26 ++++++++++ 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java b/src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java index 16d48f4e..3d37a9da 100644 --- a/src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java +++ b/src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java @@ -100,6 +100,8 @@ static final class FilterAsyncSubscriber volatile boolean cancelled; Boolean innerResult; + + long emitted; volatile int state; static final int STATE_FRESH = 0; @@ -201,6 +203,7 @@ void drain() { int missed = 1; int limit = bufferSize - (bufferSize >> 2); + long e = emitted; long ci = consumerIndex; int f = consumed; int m = length() - 1; @@ -209,7 +212,7 @@ void drain() { for (;;) { long r = requested.get(); - while (ci != r) { + while (e != r) { if (cancelled) { clear(); return; @@ -261,6 +264,7 @@ void drain() { if (u != null && u) { a.onNext(t); + e++; } } else { InnerSubscriber inner = new InnerSubscriber(this); @@ -284,6 +288,7 @@ void drain() { innerResult = null; if (u != null && u) { + e++; a.onNext(t); } @@ -299,7 +304,7 @@ void drain() { } } - if (ci == r) { + if (e == r) { if (cancelled) { clear(); return; @@ -326,6 +331,7 @@ void drain() { if (missed == w) { consumed = f; consumerIndex = ci; + emitted = e; missed = wip.addAndGet(-missed); if (missed == 0) { break; diff --git a/src/main/java/hu/akarnokd/rxjava2/operators/FlowableMapAsync.java b/src/main/java/hu/akarnokd/rxjava2/operators/FlowableMapAsync.java index 7d0bffdf..72494c3d 100644 --- a/src/main/java/hu/akarnokd/rxjava2/operators/FlowableMapAsync.java +++ b/src/main/java/hu/akarnokd/rxjava2/operators/FlowableMapAsync.java @@ -117,6 +117,8 @@ static final class MapAsyncSubscriber U innerResult; + long emitted; + volatile int state; static final int STATE_FRESH = 0; static final int STATE_RUNNING = 1; @@ -217,6 +219,7 @@ void drain() { int missed = 1; int limit = bufferSize - (bufferSize >> 2); + long e = emitted; long ci = consumerIndex; int f = consumed; int m = length() - 1; @@ -225,7 +228,7 @@ void drain() { for (;;) { long r = requested.get(); - while (ci != r) { + while (e != r) { if (cancelled) { clear(); return; @@ -283,6 +286,7 @@ void drain() { if (v != null) { a.onNext(v); + e++; } } else { InnerSubscriber inner = new InnerSubscriber(this); @@ -318,6 +322,7 @@ void drain() { if (v != null) { a.onNext(v); + e++; } } @@ -333,7 +338,7 @@ void drain() { } } - if (ci == r) { + if (e == r) { if (cancelled) { clear(); return; @@ -360,6 +365,7 @@ void drain() { if (missed == w) { consumed = f; consumerIndex = ci; + emitted = e; missed = wip.addAndGet(-missed); if (missed == 0) { break; diff --git a/src/test/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsyncTest.java b/src/test/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsyncTest.java index 4c8280c1..ba71b6e6 100644 --- a/src/test/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsyncTest.java +++ b/src/test/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsyncTest.java @@ -23,9 +23,11 @@ import org.junit.*; import org.reactivestreams.*; +import hu.akarnokd.rxjava2.basetypes.Solo; import hu.akarnokd.rxjava2.test.TestHelper; import io.reactivex.Flowable; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -373,4 +375,51 @@ public Publisher apply(Integer v) throws Exception { Assert.assertFalse(pp.hasSubscribers()); } + @Test + public void filterAllOut() { + final int[] calls = { 0 }; + + Flowable.range(1, 1000) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + calls[0]++; + } + }) + .compose(FlowableTransformers.filterAsync(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Solo.just(false); + } + }, 16)) + .flatMap(Functions.justFunction(Flowable.just(0))) + .test() + .assertResult(); + + Assert.assertEquals(1000, calls[0]); + } + + @Test + public void filterAllOutHidden() { + final int[] calls = { 0 }; + + Flowable.range(1, 1000) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + calls[0]++; + } + }) + .compose(FlowableTransformers.filterAsync(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Solo.just(false).hide(); + } + }, 16)) + .flatMap(Functions.justFunction(Flowable.just(0))) + .test() + .assertResult(); + + Assert.assertEquals(1000, calls[0]); + } } diff --git a/src/test/java/hu/akarnokd/rxjava2/operators/FlowableMapAsyncTest.java b/src/test/java/hu/akarnokd/rxjava2/operators/FlowableMapAsyncTest.java index 5d53c959..ff8888eb 100644 --- a/src/test/java/hu/akarnokd/rxjava2/operators/FlowableMapAsyncTest.java +++ b/src/test/java/hu/akarnokd/rxjava2/operators/FlowableMapAsyncTest.java @@ -23,9 +23,11 @@ import org.junit.*; import org.reactivestreams.*; +import hu.akarnokd.rxjava2.basetypes.*; import hu.akarnokd.rxjava2.test.TestHelper; import io.reactivex.Flowable; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -427,4 +429,28 @@ public void subscribeActual(Subscriber s) { RxJavaPlugins.reset(); } } + + @Test + public void filterAllOut() { + final int[] calls = { 0 }; + + Flowable.range(1, 1000) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + calls[0]++; + } + }) + .compose(FlowableTransformers.mapAsync(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Perhaps.empty(); + } + }, 16)) + .flatMap(Functions.justFunction(Flowable.just(0))) + .test() + .assertResult(); + + Assert.assertEquals(1000, calls[0]); + } }