Skip to content

Commit

Permalink
+MulticastProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 8, 2017
1 parent a48d47b commit 3591489
Show file tree
Hide file tree
Showing 3 changed files with 792 additions and 21 deletions.
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ Maven search:
- [Computational expressions](#computational-expressions)
- [Join patterns](#join-patterns)
- [Debug support](#debug-support)
- [SingleSubject, MaybeSubject and CompletableSubject](#singlesubject-maybesubject-and-completablesubject)
- [SoloProcessor, PerhapsProcessor and NonoProcessor](#soloprocessor-perhapsprocessor-and-nonoprocessor)
- Custom Processors and Subjects
- [SingleSubject, MaybeSubject and CompletableSubject](#singlesubject-maybesubject-and-completablesubject)
- [SoloProcessor, PerhapsProcessor and NonoProcessor](#soloprocessor-perhapsprocessor-and-nonoprocessor)
- [MulticastProcessor](#multicastprocessor)
- [FlowableProcessor utils](#flowableprocessor-utils)
- [Custom Schedulers](#custom-schedulers)
- [Custom operators and transformers](#custom-operators-and-transformers)
Expand Down Expand Up @@ -521,6 +523,35 @@ to3.assertResult(1);
Note that calling `onComplete` after `onNext` is optional with `SoloProcessor` but calling `onComplete` without calling `onNext` terminates the `SoloProcessor` with a `NoSuchElementException`.
### MulticastProcessor
Works similarly to `publish(Function)` and multicasts items to subscribers if all of them are ready to receive the items.
In addition, it supports a mode where the last subscriber cancelling will trigger a cancellation to the upstream.
If you need it to run without subscribing the `MulticastProcessor` to another `Publisher` use `start()` or `startUnbounded()`.
Use `offer()` to try and offer/emit items but don't fail if the internal buffer is full.

```java
MulticastProcessor<Integer> mp = Flowable.range(1, 10)
.subscribeWith(MulticastProcessor.create());

mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// --------------------

MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();

assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));

assertFalse(mp2.offer(5));

mp2.onComplete();

mp2.test().assertResult(1, 2, 3, 4);
```

## FlowableProcessor utils

Expand Down
Loading

0 comments on commit 3591489

Please sign in to comment.