Skip to content

Commit

Permalink
Fix mapAsync and filterAsync emission accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jun 29, 2017
1 parent ab7c17d commit b5ed392
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ static final class FilterAsyncSubscriber<T>
volatile boolean cancelled;

Boolean innerResult;

long emitted;

volatile int state;
static final int STATE_FRESH = 0;
Expand Down Expand Up @@ -201,6 +203,7 @@ void drain() {

int missed = 1;
int limit = bufferSize - (bufferSize >> 2);
long e = emitted;
long ci = consumerIndex;
int f = consumed;
int m = length() - 1;
Expand All @@ -209,7 +212,7 @@ void drain() {
for (;;) {
long r = requested.get();

while (ci != r) {
while (e != r) {
if (cancelled) {
clear();
return;
Expand Down Expand Up @@ -261,6 +264,7 @@ void drain() {

if (u != null && u) {
a.onNext(t);
e++;
}
} else {
InnerSubscriber<Boolean> inner = new InnerSubscriber<Boolean>(this);
Expand All @@ -284,6 +288,7 @@ void drain() {
innerResult = null;

if (u != null && u) {
e++;
a.onNext(t);
}

Expand All @@ -299,7 +304,7 @@ void drain() {
}
}

if (ci == r) {
if (e == r) {
if (cancelled) {
clear();
return;
Expand All @@ -326,6 +331,7 @@ void drain() {
if (missed == w) {
consumed = f;
consumerIndex = ci;
emitted = e;
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ static final class MapAsyncSubscriber<T, U, R>

U innerResult;

long emitted;

volatile int state;
static final int STATE_FRESH = 0;
static final int STATE_RUNNING = 1;
Expand Down Expand Up @@ -217,6 +219,7 @@ void drain() {

int missed = 1;
int limit = bufferSize - (bufferSize >> 2);
long e = emitted;
long ci = consumerIndex;
int f = consumed;
int m = length() - 1;
Expand All @@ -225,7 +228,7 @@ void drain() {
for (;;) {
long r = requested.get();

while (ci != r) {
while (e != r) {
if (cancelled) {
clear();
return;
Expand Down Expand Up @@ -283,6 +286,7 @@ void drain() {

if (v != null) {
a.onNext(v);
e++;
}
} else {
InnerSubscriber<U> inner = new InnerSubscriber<U>(this);
Expand Down Expand Up @@ -318,6 +322,7 @@ void drain() {

if (v != null) {
a.onNext(v);
e++;
}
}

Expand All @@ -333,7 +338,7 @@ void drain() {
}
}

if (ci == r) {
if (e == r) {
if (cancelled) {
clear();
return;
Expand All @@ -360,6 +365,7 @@ void drain() {
if (missed == w) {
consumed = f;
consumerIndex = ci;
emitted = e;
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.junit.*;
import org.reactivestreams.*;

import hu.akarnokd.rxjava2.basetypes.Solo;
import hu.akarnokd.rxjava2.test.TestHelper;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -373,4 +375,51 @@ public Publisher<Boolean> apply(Integer v) throws Exception {
Assert.assertFalse(pp.hasSubscribers());
}

@Test
public void filterAllOut() {
final int[] calls = { 0 };

Flowable.range(1, 1000)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
calls[0]++;
}
})
.compose(FlowableTransformers.filterAsync(new Function<Integer, Publisher<Boolean>>() {
@Override
public Publisher<Boolean> apply(Integer v) throws Exception {
return Solo.just(false);
}
}, 16))
.flatMap(Functions.justFunction(Flowable.just(0)))
.test()
.assertResult();

Assert.assertEquals(1000, calls[0]);
}

@Test
public void filterAllOutHidden() {
final int[] calls = { 0 };

Flowable.range(1, 1000)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
calls[0]++;
}
})
.compose(FlowableTransformers.filterAsync(new Function<Integer, Publisher<Boolean>>() {
@Override
public Publisher<Boolean> apply(Integer v) throws Exception {
return Solo.just(false).hide();
}
}, 16))
.flatMap(Functions.justFunction(Flowable.just(0)))
.test()
.assertResult();

Assert.assertEquals(1000, calls[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.junit.*;
import org.reactivestreams.*;

import hu.akarnokd.rxjava2.basetypes.*;
import hu.akarnokd.rxjava2.test.TestHelper;
import io.reactivex.Flowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -427,4 +429,28 @@ public void subscribeActual(Subscriber<? super Integer> s) {
RxJavaPlugins.reset();
}
}

@Test
public void filterAllOut() {
final int[] calls = { 0 };

Flowable.range(1, 1000)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
calls[0]++;
}
})
.compose(FlowableTransformers.mapAsync(new Function<Integer, Publisher<Boolean>>() {
@Override
public Publisher<Boolean> apply(Integer v) throws Exception {
return Perhaps.empty();
}
}, 16))
.flatMap(Functions.justFunction(Flowable.just(0)))
.test()
.assertResult();

Assert.assertEquals(1000, calls[0]);
}
}

0 comments on commit b5ed392

Please sign in to comment.