From 3591489eaebf4a68e4d94918c658072b7c767ea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sat, 8 Apr 2017 13:13:39 +0200 Subject: [PATCH] +MulticastProcessor --- README.md | 35 +- .../processors/MulticastProcessor.java | 280 +++++++++- .../processors/MulticastProcessorTest.java | 498 ++++++++++++++++++ 3 files changed, 792 insertions(+), 21 deletions(-) create mode 100644 src/test/java/hu/akarnokd/rxjava2/processors/MulticastProcessorTest.java diff --git a/README.md b/README.md index a9861ce4..40044588 100644 --- a/README.md +++ b/README.md @@ -32,8 +32,10 @@ Maven search: - [Computational expressions](#computational-expressions) - [Join patterns](#join-patterns) - [Debug support](#debug-support) - - [SingleSubject, MaybeSubject and CompletableSubject](#singlesubject-maybesubject-and-completablesubject) - - [SoloProcessor, PerhapsProcessor and NonoProcessor](#soloprocessor-perhapsprocessor-and-nonoprocessor) + - Custom Processors and Subjects + - [SingleSubject, MaybeSubject and CompletableSubject](#singlesubject-maybesubject-and-completablesubject) + - [SoloProcessor, PerhapsProcessor and NonoProcessor](#soloprocessor-perhapsprocessor-and-nonoprocessor) + - [MulticastProcessor](#multicastprocessor) - [FlowableProcessor utils](#flowableprocessor-utils) - [Custom Schedulers](#custom-schedulers) - [Custom operators and transformers](#custom-operators-and-transformers) @@ -521,6 +523,35 @@ to3.assertResult(1); Note that calling `onComplete` after `onNext` is optional with `SoloProcessor` but calling `onComplete` without calling `onNext` terminates the `SoloProcessor` with a `NoSuchElementException`. +### MulticastProcessor + +Works similarly to `publish(Function)` and multicasts items to subscribers if all of them are ready to receive the items. +In addition, it supports a mode where the last subscriber cancelling will trigger a cancellation to the upstream. +If you need it to run without subscribing the `MulticastProcessor` to another `Publisher` use `start()` or `startUnbounded()`. +Use `offer()` to try and offer/emit items but don't fail if the internal buffer is full. + +```java +MulticastProcessor mp = Flowable.range(1, 10) + .subscribeWith(MulticastProcessor.create()); + +mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + +// -------------------- + +MulticastProcessor mp2 = MulticastProcessor.create(4); +mp2.start(); + +assertTrue(mp2.offer(1)); +assertTrue(mp2.offer(2)); +assertTrue(mp2.offer(3)); +assertTrue(mp2.offer(4)); + +assertFalse(mp2.offer(5)); + +mp2.onComplete(); + +mp2.test().assertResult(1, 2, 3, 4); +``` ## FlowableProcessor utils diff --git a/src/main/java/hu/akarnokd/rxjava2/processors/MulticastProcessor.java b/src/main/java/hu/akarnokd/rxjava2/processors/MulticastProcessor.java index 9c62ac22..faa2675d 100644 --- a/src/main/java/hu/akarnokd/rxjava2/processors/MulticastProcessor.java +++ b/src/main/java/hu/akarnokd/rxjava2/processors/MulticastProcessor.java @@ -20,7 +20,7 @@ import org.reactivestreams.*; -import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.exceptions.*; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.*; @@ -52,11 +52,13 @@ public final class MulticastProcessor extends FlowableProcessor { volatile SimpleQueue queue; - boolean done; - Throwable error; + volatile boolean done; + volatile Throwable error; int consumed; + long emitted; + int fusionMode; @SuppressWarnings("rawtypes") @@ -68,17 +70,47 @@ public final class MulticastProcessor extends FlowableProcessor { /** * Constructs a fresh instance with the default Flowable.bufferSize() prefetch * amount and no refCount-behavior. + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create() { + return new MulticastProcessor(bufferSize(), false); + } + + + /** + * Constructs a fresh instance with the default Flowable.bufferSize() prefetch + * amount and no refCount-behavior. + * @param the input and output value type + * @param refCount if true and if all Subscribers have unsubscribed, the upstream + * is cancelled + * @return the new MulticastProcessor instance */ - public MulticastProcessor() { - this(bufferSize(), false); + public static MulticastProcessor create(boolean refCount) { + return new MulticastProcessor(bufferSize(), refCount); } /** * Constructs a fresh instance with the given prefetch amount and no refCount behavior. * @param bufferSize the prefetch amount + * @param the input and output value type + * @return the new MulticastProcessor instance */ - public MulticastProcessor(int bufferSize) { - this(bufferSize, false); + public static MulticastProcessor create(int bufferSize) { + return new MulticastProcessor(bufferSize, false); + } + + /** + * Constructs a fres instance with the given prefetch amount and the optional + * refCount-behavior. + * @param bufferSize the prefech amount + * @param refCount if true and if all Subscribers have unsubscribed, the upstream + * is cancelled + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + public static MulticastProcessor create(int bufferSize, boolean refCount) { + return new MulticastProcessor(bufferSize, refCount); } /** @@ -89,7 +121,7 @@ public MulticastProcessor(int bufferSize) { * is cancelled */ @SuppressWarnings("unchecked") - public MulticastProcessor(int bufferSize, boolean refCount) { + MulticastProcessor(int bufferSize, boolean refCount) { this.bufferSize = bufferSize; this.limit = bufferSize - (bufferSize >> 2); this.wip = new AtomicInteger(); @@ -99,12 +131,24 @@ public MulticastProcessor(int bufferSize, boolean refCount) { this.once = new AtomicBoolean(); } + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses a fixed buffer + * and allows using the onXXX and offer methods + * afterwards. + */ public void start() { if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { queue = new SpscArrayQueue(bufferSize); } } + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses an unbounded buffer + * and allows using the onXXX and offer methods + * afterwards. + */ public void startUnbounded() { if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { queue = new SpscLinkedArrayQueue(bufferSize); @@ -146,10 +190,10 @@ public void onNext(T t) { if (once.get()) { return; } - if (t == null) { - throw new NullPointerException("t is null"); - } if (fusionMode == QueueSubscription.NONE) { + if (t == null) { + throw new NullPointerException("t is null"); + } if (!queue.offer(t)) { SubscriptionHelper.cancel(upstream); onError(new MissingBackpressureException()); @@ -159,6 +203,28 @@ public void onNext(T t) { drain(); } + /** + * Tries to offer an item into the internal queue and returns false + * if the queue is full. + * @param t the item to offer, not null + * @return true if successful, false if the queue is full + */ + public boolean offer(T t) { + if (once.get()) { + return false; + } + if (t == null) { + throw new NullPointerException("t is null"); + } + if (fusionMode == QueueSubscription.NONE) { + if (queue.offer(t)) { + drain(); + return true; + } + } + return false; + } + @Override public void onError(Throwable t) { if (t == null) { @@ -224,18 +290,196 @@ protected void subscribeActual(Subscriber s) { } boolean add(MulticastSubscription inner) { - // TODO Auto-generated method stub - return false; + for (;;) { + MulticastSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + @SuppressWarnings("unchecked") + MulticastSubscription[] b = new MulticastSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } } + @SuppressWarnings("unchecked") void remove(MulticastSubscription inner) { - // TODO Auto-generated method stub - + for (;;) { + MulticastSubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + + if (n == 1) { + if (refcount) { + if (subscribers.compareAndSet(a, TERMINATED)) { + SubscriptionHelper.cancel(upstream); + once.set(true); + break; + } + } else { + if (subscribers.compareAndSet(a, EMPTY)) { + break; + } + } + } else { + MulticastSubscription[] b = new MulticastSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + if (subscribers.compareAndSet(a, b)) { + break; + } + } + } } + @SuppressWarnings("unchecked") void drain() { - // TODO Auto-generated method stub - + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + AtomicReference[]> subs = subscribers; + int c = consumed; + int lim = limit; + long e = emitted; + SimpleQueue q = queue; + int fm = fusionMode; + + outer: + for (;;) { + + MulticastSubscription[] as = subs.get(); + + long r = Long.MAX_VALUE; + + for (MulticastSubscription a : as) { + long ra = a.get(); + if (ra >= 0L) { + r = Math.min(r, ra); + } + } + + if (as.length != 0) { + + while (e != r) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + boolean d = done; + + T v; + + try { + v = q != null ? q.poll() : null; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + d = true; + v = null; + error = ex; + done = true; + } + boolean empty = v == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + + if (empty) { + break; + } + + for (MulticastSubscription inner : as) { + inner.onNext(v); + } + + e++; + + if (fm != QueueSubscription.SYNC) { + if (++c == lim) { + c = 0; + upstream.get().request(lim); + } + } + } + + if (e == r) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + if (done && q.isEmpty()) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + } + } + + int w = wip.get(); + if (w == missed) { + consumed = c; + emitted = e; + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } } static final class MulticastSubscription extends AtomicLong implements Subscription { @@ -246,8 +490,6 @@ static final class MulticastSubscription extends AtomicLong implements Subscr final MulticastProcessor parent; - long emitted; - MulticastSubscription(Subscriber actual, MulticastProcessor parent) { this.actual = actual; this.parent = parent; diff --git a/src/test/java/hu/akarnokd/rxjava2/processors/MulticastProcessorTest.java b/src/test/java/hu/akarnokd/rxjava2/processors/MulticastProcessorTest.java new file mode 100644 index 00000000..0e3d3c3f --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava2/processors/MulticastProcessorTest.java @@ -0,0 +1,498 @@ +/* + * Copyright 2016-2017 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava2.processors; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import hu.akarnokd.rxjava2.test.TestHelper; +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.subscribers.TestSubscriber; + +public class MulticastProcessorTest { + + @Test + public void complete() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onComplete(); + + ts.assertResult(1); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + + @Test + public void error() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onError(new IOException()); + + ts.assertFailure(IOException.class, 1); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertTrue(mp.hasThrowable()); + assertNotNull(mp.getThrowable()); + assertTrue("" + mp.getThrowable(), mp.getThrowable() instanceof IOException); + + mp.test().assertFailure(IOException.class); + } + + @Test + public void overflow() { + MulticastProcessor mp = MulticastProcessor.create(1); + mp.start(); + + TestSubscriber ts = mp.test(0); + + assertTrue(mp.offer(1)); + assertFalse(mp.offer(2)); + + mp.onNext(3); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertFailure(MissingBackpressureException.class, 1); + + mp.test().assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressure() { + MulticastProcessor mp = MulticastProcessor.create(16, false); + mp.start(); + + for (int i = 0; i < 10; i++) { + mp.onNext(i); + } + mp.onComplete(); + + mp.test(0) + .assertEmpty() + .requestMore(1) + .assertValues(0) + .assertNotComplete() + .requestMore(2) + .assertValues(0, 1, 2) + .assertNotComplete() + .requestMore(3) + .assertValues(0, 1, 2, 3, 4, 5) + .assertNotComplete() + .requestMore(4) + .assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void refCounted() { + MulticastProcessor mp = MulticastProcessor.create(true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test().cancel(); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void refCounted2() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test(1, true); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void longRunning() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.test().assertValueCount(1000).assertNoErrors().assertComplete(); + } + + + @Test + public void oneByOne() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp + .rebatchRequests(1) + .test() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCount() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCountExact() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 10).subscribe(mp); + + mp + .rebatchRequests(10) + .take(10) + .test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void crossCancel() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void crossCancelError() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onError(Throwable t) { + super.onError(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onError(new IOException()); + + ts1.assertResult(1); + ts2.assertFailure(IOException.class, 1); + } + + @Test + public void crossCancelComplete() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onComplete() { + super.onComplete(); + ts1.cancel(); + ts1.onNext(2); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(1, 2); + ts2.assertResult(1); + } + + @Test + public void crossCancel1() { + + final TestSubscriber ts1 = new TestSubscriber(1); + + TestSubscriber ts2 = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void requestCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(new FlowableSubscriber() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription t) { + t.request(-1); + t.request(1); + t.request(Long.MAX_VALUE); + t.request(Long.MAX_VALUE); + t.cancel(); + t.cancel(); + t.request(2); + } + }); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void unbounded() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.startUnbounded(); + + for (int i = 0; i < 10; i++) { + assertTrue(mp.offer(i)); + } + mp.onComplete(); + + mp.test().assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void multiStart() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(4, false); + + mp.start(); + mp.start(); + mp.startUnbounded(); + BooleanSubscription bs = new BooleanSubscription(); + mp.onSubscribe(bs); + + assertTrue(bs.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertError(errors, 1, ProtocolViolationException.class); + TestHelper.assertError(errors, 2, ProtocolViolationException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test(expected = NullPointerException.class) + public void onNextNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onNext(null); + } + + + @Test(expected = NullPointerException.class) + public void onOfferNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.offer(null); + } + + @Test(expected = NullPointerException.class) + public void onErrorNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onError(null); + } + + @Test + public void afterTerminated() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + mp.onComplete(); + mp.onComplete(); + mp.onError(new IOException()); + mp.onNext(1); + mp.offer(1); + + mp.test().assertResult(); + + TestHelper.assertUndeliverable(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void asyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + MulticastProcessor mp = MulticastProcessor.create(4); + + up.subscribe(mp); + + TestSubscriber ts = mp.test(); + + for (int i = 0; i < 10; i++) { + up.onNext(i); + } + + assertFalse(mp.offer(10)); + + up.onComplete(); + + ts.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void fusionCrash() { + MulticastProcessor mp = Flowable.range(1, 5) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new IOException(); + } + }) + .subscribeWith(MulticastProcessor.create()); + + mp.test().assertFailure(IOException.class); + } +}