From ea5d97a7bdfca4e9a512a60deef4f4084d45c0b8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 8 Dec 2016 14:27:34 +0100 Subject: [PATCH] Few ops, more tests, NonoProcessor --- README.md | 39 ++ .../BasicEmptyQueueSubscription.java | 2 +- .../BasicNonoIntQueueSubscription.java | 2 +- .../basetypes/BasicNonoSubscriber.java | 4 +- .../basetypes/BasicRefNonoSubscriber.java | 4 +- .../basetypes/BasicRefQueueSubscription.java | 2 +- .../hu/akarnokd/rxjava2/basetypes/Nono.java | 26 +- .../akarnokd/rxjava2/basetypes/NonoHide.java | 84 +++ .../rxjava2/basetypes/NonoProcessor.java | 240 +++++++ .../rxjava2/basetypes/NonoTakeUntil.java | 145 ++++ .../akarnokd/rxjava2/basetypes/NonoUsing.java | 2 +- .../rxjava2/basetypes/NonoProcessorTest.java | 275 ++++++++ .../akarnokd/rxjava2/basetypes/NonoTest.java | 644 ++++++++++++++++++ 13 files changed, 1459 insertions(+), 10 deletions(-) create mode 100644 src/main/java/hu/akarnokd/rxjava2/basetypes/NonoHide.java create mode 100644 src/main/java/hu/akarnokd/rxjava2/basetypes/NonoProcessor.java create mode 100644 src/main/java/hu/akarnokd/rxjava2/basetypes/NonoTakeUntil.java create mode 100644 src/test/java/hu/akarnokd/rxjava2/basetypes/NonoProcessorTest.java diff --git a/README.md b/README.md index 254f6d96..33afd7ef 100644 --- a/README.md +++ b/README.md @@ -613,6 +613,45 @@ Flowable.range(1, 10) ### Nono - 0-error publisher +The `Publisher`-based sibling of the `Completable` type. The usage is practically the same as `Completable` with the exception that because `Nono` implements the Reactive-Streams `Publisher`, you can use it directly with operators of `Flowable` that accept `Publisher` in some form. + +Examples: + +```java +Nono.fromAction(() -> System.out.println("Hello world!")) + .subscribe(); + +Nono.fromAction(() -> System.out.println("Hello world!")) + .delay(1, TimeUnit.SECONDS) + .blockingSubscribe(); + +Nono.complete() + .test() + .assertResult(); + +Nono.error(new IOException()) + .test() + .assertFailure(IOException.class); + +Flowable.range(1, 10) + .to(Nono::fromPublisher) + .test() + .assertResult(); +``` + +#### NonoProcessor + +A hot, Reactive-Streams `Processor` implementation of `Nono`. + +```java +NonoProcessor np = NonoProcessor.create(); + +TestSubscriber ts = np.test(); + +np.onComplete(); + +ts.assertResult(); +``` ### Perhaps - 0-1-error publisher diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicEmptyQueueSubscription.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicEmptyQueueSubscription.java index 635def20..4b898f01 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicEmptyQueueSubscription.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicEmptyQueueSubscription.java @@ -21,7 +21,7 @@ /** * Base class for empty, async-fuseable intermediate operators. */ -public abstract class BasicEmptyQueueSubscription implements QueueSubscription { +abstract class BasicEmptyQueueSubscription implements QueueSubscription { @Override public final boolean offer(Void e) { diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoIntQueueSubscription.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoIntQueueSubscription.java index f4fb30ee..604dbcbd 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoIntQueueSubscription.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoIntQueueSubscription.java @@ -23,7 +23,7 @@ /** * Base class for atomic integer, async-fuseable intermediate operators. */ -public abstract class BasicNonoIntQueueSubscription extends AtomicInteger implements QueueSubscription { +abstract class BasicNonoIntQueueSubscription extends AtomicInteger implements QueueSubscription { private static final long serialVersionUID = -4226314340037668732L; diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoSubscriber.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoSubscriber.java index da3b80fa..ef369d7e 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoSubscriber.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicNonoSubscriber.java @@ -23,13 +23,13 @@ /** * Basic subscriber that supports queue fusion and defaults onSubscribe, onNext and cancel. */ -public abstract class BasicNonoSubscriber extends BasicEmptyQueueSubscription implements Subscriber { +abstract class BasicNonoSubscriber extends BasicEmptyQueueSubscription implements Subscriber { protected final Subscriber actual; protected Subscription s; - public BasicNonoSubscriber(Subscriber actual) { + BasicNonoSubscriber(Subscriber actual) { this.actual = actual; } diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefNonoSubscriber.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefNonoSubscriber.java index dbc27e95..8804a567 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefNonoSubscriber.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefNonoSubscriber.java @@ -25,14 +25,14 @@ * * @param the reference type */ -public abstract class BasicRefNonoSubscriber extends BasicRefQueueSubscription implements Subscriber { +abstract class BasicRefNonoSubscriber extends BasicRefQueueSubscription implements Subscriber { private static final long serialVersionUID = -3157015053656142804L; protected final Subscriber actual; Subscription s; - public BasicRefNonoSubscriber(Subscriber actual) { + BasicRefNonoSubscriber(Subscriber actual) { this.actual = actual; } diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefQueueSubscription.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefQueueSubscription.java index 77ce8e05..5edc9449 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefQueueSubscription.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/BasicRefQueueSubscription.java @@ -10,7 +10,7 @@ * @param the value type * @param the reference type */ -public abstract class BasicRefQueueSubscription extends AtomicReference implements QueueSubscription { +abstract class BasicRefQueueSubscription extends AtomicReference implements QueueSubscription { private static final long serialVersionUID = -6671519529404341862L; diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java index c9d8dca7..97f482ec 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java @@ -884,6 +884,28 @@ public final Nono retryWhen(Function, ? extends Publ return onAssembly(new NonoRetryWhen(this, handler)); } + /** + * Hides the identity of this Nono. + *

+ * This also breaks optimizations such as operator fusion - useful + * when diagnosing issues. + * @return the new Nono instance + */ + public final Nono hide() { + return onAssembly(new NonoHide(this)); + } + + /** + * Run this Nono and cancel it when the other Publisher signals + * an item or completes. + * @param other the other Publisher + * @return the new Nono instance + */ + public final Nono takeUntil(Publisher other) { + ObjectHelper.requireNonNull(other, "other is null"); + return onAssembly(new NonoTakeUntil(this, other)); + } + // ----------------------------------------------------------- // Consumers and subscribers (leave) // ----------------------------------------------------------- @@ -1062,8 +1084,8 @@ public final void blockingSubscribe(Action onComplete, Consumer s) { + source.subscribe(new HideSubscriber(s)); + } + + static final class HideSubscriber implements Subscriber, Subscription { + + final Subscriber actual; + + Subscription s; + + HideSubscriber(Subscriber actual) { + this.actual = actual; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(Void t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoProcessor.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoProcessor.java new file mode 100644 index 00000000..7065156d --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoProcessor.java @@ -0,0 +1,240 @@ +/* + * Copyright 2016 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava2.basetypes; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.plugins.RxJavaPlugins; + +/** + * A hot Nono that signals the terminal event to Subscribers. + *

+ * NonoProcessor is thread-safe and naturally serialized on its onXXX methods. + * + * @since 0.12.0 + */ +public final class NonoProcessor extends Nono implements Processor { + + static final NonoSubscription[] EMPTY = new NonoSubscription[0]; + + static final NonoSubscription[] TERMINATED = new NonoSubscription[0]; + + Throwable error; + + final AtomicReference subscribers; + + final AtomicBoolean once; + + /** + * Creates a NonoProcessor instance ready to receive events and Subscribers. + * @return the new NonoProcessor instance + */ + public static NonoProcessor create() { + return new NonoProcessor(); + } + + NonoProcessor() { + subscribers = new AtomicReference(EMPTY); + once = new AtomicBoolean(); + } + + boolean add(NonoSubscription inner) { + for (;;) { + NonoSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + NonoSubscription[] b = new NonoSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + void delete(NonoSubscription inner) { + for (;;) { + NonoSubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + return; + } + int j = -1; + for (int i = 0; i < a.length; i++) { + NonoSubscription ns = a[i]; + if (ns == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + NonoSubscription[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new NonoSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (subscribers.compareAndSet(a, b)) { + return; + } + } + } + + @Override + public void onSubscribe(Subscription s) { + if (subscribers.get() == TERMINATED) { + s.cancel(); + } else { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Void t) { + throw new NullPointerException(); + } + + @Override + public void onError(Throwable t) { + if (once.compareAndSet(false, true)) { + if (t == null) { + t = new NullPointerException(); + } + error = t; + for (NonoSubscription ns : subscribers.getAndSet(TERMINATED)) { + ns.doError(t); + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + for (NonoSubscription ns : subscribers.getAndSet(TERMINATED)) { + ns.doComplete(); + } + } + } + + @Override + protected void subscribeActual(Subscriber s) { + NonoSubscription ns = new NonoSubscription(s, this); + s.onSubscribe(ns); + if (add(ns)) { + if (ns.get() != 0) { + delete(ns); + } + } else { + Throwable ex = error; + if (ex != null) { + ns.doError(ex); + } else { + ns.doComplete(); + } + } + } + + /** + * Returns true if this NonoProcessor currently has Subscribers. + * @return true if there are subscribers + */ + public boolean hasSubscribers() { + return subscribers.get().length != 0; + } + + /** + * Returns true if this NonoProcessor has completed normally. + * @return true if completed normally + */ + public boolean hasComplete() { + return subscribers.get() == TERMINATED && error == null; + } + + /** + * Returns true if this NonoProcessor has terminated with an error. + * @return true if terminated with an error + * @see #getThrowable() + */ + public boolean hasThrowable() { + return subscribers.get() == TERMINATED && error != null; + } + + /** + * Returns the error that terminated this NonoProcessor if + * {@link #hasThrowable()} returns true. + * @return the error Throwable that terminated this NonoProcessor + */ + public Throwable getThrowable() { + return subscribers.get() == TERMINATED ? error : null; + } + + /** + * Returns the current number of subscribers. + * @return the current number of subscribers + */ + /* test */ int subscriberCount() { + return subscribers.get().length; + } + + /** + * Fuseable subscription handed to the subscribers. + */ + static final class NonoSubscription extends BasicNonoIntQueueSubscription { + + private static final long serialVersionUID = 8377121611843740196L; + + final Subscriber actual; + + final NonoProcessor parent; + + NonoSubscription(Subscriber actual, NonoProcessor parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + public void cancel() { + if (compareAndSet(0, 1)) { + parent.delete(this); + } + } + + void doError(Throwable t) { + if (compareAndSet(0, 1)) { + actual.onError(t); + } + } + + void doComplete() { + if (compareAndSet(0, 1)) { + actual.onComplete(); + } + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoTakeUntil.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoTakeUntil.java new file mode 100644 index 00000000..ee1132c3 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoTakeUntil.java @@ -0,0 +1,145 @@ +/* + * Copyright 2016 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava2.basetypes; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Run the main Nono until a Publisher signals an item or completes. + */ +final class NonoTakeUntil extends Nono { + + final Nono source; + + final Publisher other; + + NonoTakeUntil(Nono source, Publisher other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(Subscriber s) { + TakeUntilSubscriber parent = new TakeUntilSubscriber(s); + s.onSubscribe(parent); + + other.subscribe(parent.inner); + source.subscribe(parent); + } + + static final class TakeUntilSubscriber extends BasicRefQueueSubscription + implements Subscriber { + + private static final long serialVersionUID = 5812459132190733401L; + + final Subscriber actual; + + final AtomicBoolean once; + + final OtherSubscriber inner; + + TakeUntilSubscriber(Subscriber actual) { + this.actual = actual; + this.once = new AtomicBoolean(); + this.inner = new OtherSubscriber(); + } + + @Override + public void cancel() { + SubscriptionHelper.cancel(this); + SubscriptionHelper.cancel(inner); + } + + @Override + public void onSubscribe(Subscription s) { + SubscriptionHelper.setOnce(this, s); + } + + @Override + public void onNext(Void t) { + // never called + } + + @Override + public void onError(Throwable t) { + if (once.compareAndSet(false, true)) { + SubscriptionHelper.cancel(inner); + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + SubscriptionHelper.cancel(inner); + actual.onComplete(); + } + } + + void innerComplete() { + if (once.compareAndSet(false, true)) { + SubscriptionHelper.cancel(this); + actual.onComplete(); + } + } + + void innerError(Throwable t) { + if (once.compareAndSet(false, true)) { + SubscriptionHelper.cancel(this); + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } + } + + final class OtherSubscriber extends AtomicReference + implements Subscriber { + + private static final long serialVersionUID = 9056087023210091030L; + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + get().cancel(); + innerComplete(); + } + + @Override + public void onError(Throwable t) { + innerError(t); + } + + @Override + public void onComplete() { + innerComplete(); + } + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoUsing.java b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoUsing.java index 36e9932d..477c27e3 100644 --- a/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoUsing.java +++ b/src/main/java/hu/akarnokd/rxjava2/basetypes/NonoUsing.java @@ -81,7 +81,7 @@ protected void subscribeActual(Subscriber s) { disposer.accept(resource); } catch (Throwable exc) { Exceptions.throwIfFatal(exc); - RxJavaPlugins.onError(new CompositeException(ex, exc)); + RxJavaPlugins.onError(exc); } } return; diff --git a/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoProcessorTest.java b/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoProcessorTest.java new file mode 100644 index 00000000..5c80b1e6 --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoProcessorTest.java @@ -0,0 +1,275 @@ +/* + * Copyright 2016 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava2.basetypes; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.*; + +import hu.akarnokd.rxjava2.test.TestHelper; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class NonoProcessorTest { + + @Test + public void once() { + NonoProcessor ms = NonoProcessor.create(); + + TestSubscriber to = ms.test(); + + ms.onComplete(); + + List errors = TestHelper.trackPluginErrors(); + try { + ms.onError(new IOException()); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + ms.onComplete(); + + to.assertResult(); + } + + @Test + public void error() { + NonoProcessor ms = NonoProcessor.create(); + + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + + TestSubscriber to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasSubscribers()); + assertEquals(1, ms.subscriberCount()); + + ms.onError(new IOException()); + + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + + to.assertFailure(IOException.class); + + ms.test().assertFailure(IOException.class); + + assertFalse(ms.hasComplete()); + assertTrue(ms.hasThrowable()); + assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + } + + @Test + public void complete() { + NonoProcessor ms = NonoProcessor.create(); + + assertFalse(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + + TestSubscriber to = ms.test(); + + to.assertEmpty(); + + assertTrue(ms.hasSubscribers()); + assertEquals(1, ms.subscriberCount()); + + ms.onComplete(); + + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + + to.assertResult(); + + ms.test().assertResult(); + + assertTrue(ms.hasComplete()); + assertFalse(ms.hasThrowable()); + assertNull(ms.getThrowable()); + assertFalse(ms.hasSubscribers()); + assertEquals(0, ms.subscriberCount()); + } + + @Test + public void nullThrowable() { + NonoProcessor ms = NonoProcessor.create(); + + TestSubscriber to = ms.test(); + + ms.onError(null); + + to.assertFailure(NullPointerException.class); + } + + @Test + public void cancelOnArrival() { + NonoProcessor.create() + .test(true) + .assertEmpty(); + } + + @Test + public void cancelOnArrival2() { + NonoProcessor ms = NonoProcessor.create(); + + ms.test(); + + ms + .test(true) + .assertEmpty(); + } + + @Test + public void disposeTwice() { + NonoProcessor.create() + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription d) { + d.cancel(); + d.cancel(); + } + + @Override + public void onNext(Void t) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + } + + @Test + public void onSubscribeDispose() { + NonoProcessor ms = NonoProcessor.create(); + + BooleanSubscription d = new BooleanSubscription(); + + ms.onSubscribe(d); + + assertFalse(d.isCancelled()); + + ms.onComplete(); + + d = new BooleanSubscription(); + + ms.onSubscribe(d); + + assertTrue(d.isCancelled()); + } + + @Test + public void addRemoveRace() { + for (int i = 0; i < 500; i++) { + final NonoProcessor ms = NonoProcessor.create(); + + final TestSubscriber to = ms.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test(expected = NullPointerException.class) + public void onNextNpe() { + NonoProcessor.create().onNext(null); + } + + @Test + public void crossCancelComplete() { + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onComplete() { + super.onComplete(); + ts1.cancel(); + } + }; + + NonoProcessor np = NonoProcessor.create(); + np.subscribe(ts2); + np.subscribe(ts1); + + np.onComplete(); + + ts1.assertEmpty(); + ts2.assertResult(); + } + + @Test + public void crossCancelError() { + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onError(Throwable t) { + super.onError(t); + ts1.cancel(); + } + }; + + NonoProcessor np = NonoProcessor.create(); + np.subscribe(ts2); + np.subscribe(ts1); + + np.onError(new IOException()); + + ts1.assertEmpty(); + ts2.assertFailure(IOException.class); + } +} diff --git a/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoTest.java b/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoTest.java index 86d125fa..d0594b09 100644 --- a/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoTest.java +++ b/src/test/java/hu/akarnokd/rxjava2/basetypes/NonoTest.java @@ -29,6 +29,9 @@ import io.reactivex.exceptions.CompositeException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -776,6 +779,150 @@ public void usingNonEagerError() { Assert.assertEquals(1, count); } + @Test + public void usingResourceThrows() { + Nono.using(new Callable() { + @Override + public Object call() throws Exception { + throw new IllegalArgumentException(); + } + }, + Functions.justFunction(Nono.complete()), + this + ) + .test() + .assertFailure(IllegalArgumentException.class); + } + + @Test + public void usingSourceThrows() { + Nono.using(Functions.justCallable(0), + new Function() { + @Override + public Nono apply(Integer v) throws Exception { + throw new IllegalArgumentException(); + } + }, + this + ) + .test() + .assertFailure(IllegalArgumentException.class); + } + + @Test + public void usingDisposerThrows1() { + Nono.using(Functions.justCallable(0), + Functions.justFunction(Nono.complete()), + new Consumer() { + @Override + public void accept(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + } + ) + .test() + .assertFailure(IllegalArgumentException.class); + } + + @Test + public void usingDisposerThrows2() { + Nono.using(Functions.justCallable(0), + Functions.justFunction(ioError), + new Consumer() { + @Override + public void accept(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + } + ) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @SuppressWarnings("unchecked") + @Override + public void accept(TestSubscriber ts) throws Exception { + TestHelper.assertCompositeExceptions(ts, IOException.class, IllegalArgumentException.class); + } + }); + } + + @Test + public void usingDisposerThrows3() { + List errors = TestHelper.trackPluginErrors(); + try { + Nono.using(Functions.justCallable(0), + Functions.justFunction(Nono.complete()), + new Consumer() { + @Override + public void accept(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + }, false + ) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void usingDisposerThrows4() { + Nono.using(Functions.justCallable(0), + new Function() { + @Override + public Nono apply(Integer v) throws Exception { + throw new IOException(); + } + }, + new Consumer() { + @Override + public void accept(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + } + ) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @SuppressWarnings("unchecked") + @Override + public void accept(TestSubscriber ts) throws Exception { + TestHelper.assertCompositeExceptions(ts, + IOException.class, IllegalArgumentException.class); + } + }); + } + + @Test + public void usingDisposerThrows5() { + List errors = TestHelper.trackPluginErrors(); + try { + Nono.using(Functions.justCallable(0), + new Function() { + @Override + public Nono apply(Integer v) throws Exception { + throw new IOException(); + } + }, + new Consumer() { + @Override + public void accept(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + }, false + ) + .test() + .assertFailure(IOException.class); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void fromPublisher1() { Nono.fromPublisher(Flowable.empty()) @@ -2009,4 +2156,501 @@ public void accept(TestSubscriber ts) throws Exception { } }); } + + @Test + public void retryWhenNoError() { + Nono.fromAction(this) + .retryWhen(Functions.>identity()) + .test() + .assertResult(); + } + + @Test + public void retryWhen() { + ioError + .retryWhen(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.takeWhile(new Predicate() { + @Override + public boolean test(Throwable v) throws Exception { + return count++ != 5; + } + }); + } + }) + .test() + .assertResult(); + } + + @Test + public void retryWhenThrows() { + Nono.complete() + .retryWhen(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + throw new IOException(); + } + }) + .test() + .assertFailure(IOException.class); + } + + @Test + public void retryWhenSignalError() { + ioError + .retryWhen(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.map(new Function() { + @Override + public Throwable apply(Throwable v) throws Exception { + throw new IllegalArgumentException(); + } + }); + } + }) + .test() + .assertFailure(IllegalArgumentException.class); + } + + @Test(expected = NullPointerException.class) + public void subscribeActual() { + new Nono() { + @Override + protected void subscribeActual(Subscriber s) { + throw new NullPointerException(); + } + }.test(); + } + + @Test + public void subscribeActual2() { + try { + new Nono() { + @Override + protected void subscribeActual(Subscriber s) { + throw new IllegalArgumentException(); + } + }.test(false); + } catch (NullPointerException ex) { + Assert.assertTrue(ex.toString(), ex.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void subscribeWith() { + TestSubscriber ts = new TestSubscriber(); + + Assert.assertSame(ts, Nono.complete().subscribeWith(ts)); + } + + @Test + public void onAssembly() { + Assert.assertNull(Nono.getOnAssemblyHandler()); + try { + Nono.setOnAssemblyHandler(new Function() { + @Override + public Nono apply(Nono f) throws Exception { + count++; + return f; + } + }); + Assert.assertNotNull(Nono.getOnAssemblyHandler()); + + Nono.complete().delay(1, TimeUnit.MILLISECONDS); + + Assert.assertEquals(2, count); + } finally { + Nono.setOnAssemblyHandler(null); + } + Assert.assertNull(Nono.getOnAssemblyHandler()); + + Nono.complete().delay(1, TimeUnit.MILLISECONDS); + + Assert.assertEquals(2, count); + } + + @Test + public void onAssemblyThrows() { + Assert.assertNull(Nono.getOnAssemblyHandler()); + try { + Nono.setOnAssemblyHandler(new Function() { + @Override + public Nono apply(Nono f) throws Exception { + throw new IllegalArgumentException(); + } + }); + Assert.assertNotNull(Nono.getOnAssemblyHandler()); + + try { + Nono.complete().delay(1, TimeUnit.MILLISECONDS); + Assert.fail("Should have thrown"); + } catch (IllegalArgumentException ex) { + // expected + } + + } finally { + Nono.setOnAssemblyHandler(null); + } + Assert.assertNull(Nono.getOnAssemblyHandler()); + + Nono.complete().delay(1, TimeUnit.MILLISECONDS); + + Assert.assertEquals(0, count); + } + + @Test + public void subscribe1() { + Nono.complete() + .subscribe(this); + + Assert.assertEquals(1, count); + } + + @Test + public void subscribe1Error() { + List errors = TestHelper.trackPluginErrors(); + try { + ioError + .subscribe(this); + + Assert.assertEquals(0, count); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void subscribe2() { + Nono.complete() + .subscribe(this, this); + + Assert.assertEquals(1, count); + } + + @Test + public void subscribe2Error() { + ioError + .subscribe(this, this); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingAwaitScalar() { + Assert.assertNull(Nono.complete().blockingAwait()); + } + + @Test + public void blockingAwait() { + Assert.assertNull(Nono.complete().doOnComplete(this).blockingAwait()); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingAwaitErrorScalar() { + Assert.assertNotNull(ioError.blockingAwait()); + } + + @Test + public void blockingAwaitError() { + Assert.assertNotNull(ioError.doOnError(this).blockingAwait()); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingAwaitDelayed() { + Assert.assertNull(Nono.complete().delay(10, TimeUnit.MILLISECONDS).blockingAwait()); + } + + @Test + public void blockingAwaitDelayedError() { + Assert.assertNotNull(ioError.delay(10, TimeUnit.MILLISECONDS).blockingAwait()); + } + + @Test + public void blockingAwaitScalarWithTimeout() { + Assert.assertNull(Nono.complete().blockingAwait(5, TimeUnit.SECONDS)); + } + + @Test + public void blockingAwaitWithTimeoutDoTimeout() { + Throwable t = Nono.never().blockingAwait(200, TimeUnit.MILLISECONDS); + Assert.assertTrue(t.toString(), t instanceof TimeoutException); + } + + @Test + public void blockingAwaitErrorScalarWithTimeout() { + Throwable t = ioError.blockingAwait(5, TimeUnit.SECONDS); + Assert.assertTrue(t.toString(), t instanceof IOException); + } + + @Test + public void blockingAwaitErrorWithTimeout() { + Assert.assertNotNull(ioError.doOnError(this).blockingAwait(5, TimeUnit.SECONDS)); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingAwaitDelayedWithTimeout() { + Assert.assertNull(Nono.complete().delay(10, TimeUnit.MILLISECONDS).blockingAwait(5, TimeUnit.SECONDS)); + } + + @Test + public void blockingAwaitDelayedErrorWithTimeout() { + Assert.assertNotNull(ioError.delay(10, TimeUnit.MILLISECONDS).blockingAwait(5, TimeUnit.SECONDS)); + } + + @Test + public void blockingAwaitInterrupt() { + try { + Thread.currentThread().interrupt(); + Throwable t = Nono.never().blockingAwait(); + Assert.assertTrue(String.valueOf(t), t instanceof InterruptedException); + } finally { + Thread.interrupted(); + } + } + + @Test + public void blockingAwaitInterruptTiemout() { + try { + Thread.currentThread().interrupt(); + Throwable t = Nono.never().blockingAwait(5, TimeUnit.SECONDS); + Assert.assertTrue(String.valueOf(t), t instanceof InterruptedException); + } finally { + Thread.interrupted(); + } + } + + @Test + public void blockingSubscribe1() { + Nono.complete() + .blockingSubscribe(this); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingSubscribe1Error() { + List errors = TestHelper.trackPluginErrors(); + try { + ioError + .blockingSubscribe(this); + + Assert.assertEquals(0, count); + + TestHelper.assertError(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void blockingSubscribe2() { + Nono.complete() + .blockingSubscribe(this, this); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingSubscribe2Error() { + ioError + .blockingSubscribe(this, this); + + Assert.assertEquals(1, count); + } + + @Test + public void blockingSubscribeCompleteThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Nono.complete() + .blockingSubscribe(new Action() { + @Override + public void run() throws Exception { + throw new IllegalArgumentException(); + } + }, this); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void blockingSubscribeErrorThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + ioError + .blockingSubscribe(this, new Consumer() { + @Override + public void accept(Throwable ex) throws Exception { + throw new IllegalArgumentException(); + } + }); + TestHelper.assertError(errors, 0, CompositeException.class); + + List ce = TestHelper.compositeList(errors.get(0)); + + TestHelper.assertError(ce, 0, IOException.class); + TestHelper.assertError(ce, 1, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void hide() { + Subscriber s = new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + Assert.assertFalse(s instanceof QueueSubscription); + } + + @Override + public void onNext(Void t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }; + Nono.complete().hide().subscribe(s); + ioError.hide().subscribe(s); + } + + void checkNoNext(Function mapper) { + try { + mapper.apply(new Nono() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(null); + s.onComplete(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + + @Test + public void noNext() { + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.andThen(Nono.complete()); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return Nono.fromPublisher(np.andThen(Flowable.empty())); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.delay(50, TimeUnit.MILLISECONDS); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.delaySubscription(50, TimeUnit.MILLISECONDS); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.delaySubscription(Flowable.timer(50, TimeUnit.MILLISECONDS)); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.doFinally(NonoTest.this); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.repeat(1); + } + }); + + checkNoNext(new Function() { + @Override + public Nono apply(Nono np) throws Exception { + return np.retry(1); + } + }); + } + + @Test + public void takeUntil1() { + Nono.complete().takeUntil(Nono.never()) + .test() + .assertResult(); + } + + @Test + public void takeUntil2() { + Nono.complete().takeUntil(Nono.complete()) + .test() + .assertResult(); + } + + @Test + public void takeUntil3() { + ioError.takeUntil(Nono.never()) + .test() + .assertFailure(IOException.class); + } + + @Test + public void takeUntil4() { + Nono.never().takeUntil(ioError) + .test() + .assertFailure(IOException.class); + } + + @Test + public void takeUntil5() { + Nono.never().takeUntil(Flowable.range(1, 2)) + .test() + .assertResult(); + } + + @Test + public void takeUntil6() { + Nono.never().takeUntil(Nono.never()) + .test() + .assertEmpty() + .cancel(); + } }