Skip to content

Commit

Permalink
Few ops, more tests, NonoProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 8, 2016
1 parent bc0c82c commit ea5d97a
Show file tree
Hide file tree
Showing 13 changed files with 1,459 additions and 10 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,45 @@ Flowable.range(1, 10)

### Nono - 0-error publisher

The `Publisher`-based sibling of the `Completable` type. The usage is practically the same as `Completable` with the exception that because `Nono` implements the Reactive-Streams `Publisher`, you can use it directly with operators of `Flowable` that accept `Publisher` in some form.

Examples:

```java
Nono.fromAction(() -> System.out.println("Hello world!"))
.subscribe();

Nono.fromAction(() -> System.out.println("Hello world!"))
.delay(1, TimeUnit.SECONDS)
.blockingSubscribe();

Nono.complete()
.test()
.assertResult();

Nono.error(new IOException())
.test()
.assertFailure(IOException.class);

Flowable.range(1, 10)
.to(Nono::fromPublisher)
.test()
.assertResult();
```

#### NonoProcessor

A hot, Reactive-Streams `Processor` implementation of `Nono`.

```java
NonoProcessor np = NonoProcessor.create();

TestSubscriber<Void> ts = np.test();

np.onComplete();

ts.assertResult();
```

### Perhaps - 0-1-error publisher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Base class for empty, async-fuseable intermediate operators.
*/
public abstract class BasicEmptyQueueSubscription implements QueueSubscription<Void> {
abstract class BasicEmptyQueueSubscription implements QueueSubscription<Void> {

@Override
public final boolean offer(Void e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Base class for atomic integer, async-fuseable intermediate operators.
*/
public abstract class BasicNonoIntQueueSubscription extends AtomicInteger implements QueueSubscription<Void> {
abstract class BasicNonoIntQueueSubscription extends AtomicInteger implements QueueSubscription<Void> {

private static final long serialVersionUID = -4226314340037668732L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
/**
* Basic subscriber that supports queue fusion and defaults onSubscribe, onNext and cancel.
*/
public abstract class BasicNonoSubscriber extends BasicEmptyQueueSubscription implements Subscriber<Object> {
abstract class BasicNonoSubscriber extends BasicEmptyQueueSubscription implements Subscriber<Object> {

protected final Subscriber<? super Void> actual;

protected Subscription s;

public BasicNonoSubscriber(Subscriber<? super Void> actual) {
BasicNonoSubscriber(Subscriber<? super Void> actual) {
this.actual = actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
*
* @param <R> the reference type
*/
public abstract class BasicRefNonoSubscriber<R> extends BasicRefQueueSubscription<Void, R> implements Subscriber<Void> {
abstract class BasicRefNonoSubscriber<R> extends BasicRefQueueSubscription<Void, R> implements Subscriber<Void> {
private static final long serialVersionUID = -3157015053656142804L;

protected final Subscriber<? super Void> actual;

Subscription s;

public BasicRefNonoSubscriber(Subscriber<? super Void> actual) {
BasicRefNonoSubscriber(Subscriber<? super Void> actual) {
this.actual = actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* @param <T> the value type
* @param <R> the reference type
*/
public abstract class BasicRefQueueSubscription<T, R> extends AtomicReference<R> implements QueueSubscription<T> {
abstract class BasicRefQueueSubscription<T, R> extends AtomicReference<R> implements QueueSubscription<T> {


private static final long serialVersionUID = -6671519529404341862L;
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,28 @@ public final Nono retryWhen(Function<? super Flowable<Throwable>, ? extends Publ
return onAssembly(new NonoRetryWhen(this, handler));
}

/**
* Hides the identity of this Nono.
* <p>
* This also breaks optimizations such as operator fusion - useful
* when diagnosing issues.
* @return the new Nono instance
*/
public final Nono hide() {
return onAssembly(new NonoHide(this));
}

/**
* Run this Nono and cancel it when the other Publisher signals
* an item or completes.
* @param other the other Publisher
* @return the new Nono instance
*/
public final Nono takeUntil(Publisher<?> other) {
ObjectHelper.requireNonNull(other, "other is null");
return onAssembly(new NonoTakeUntil(this, other));
}

// -----------------------------------------------------------
// Consumers and subscribers (leave)
// -----------------------------------------------------------
Expand Down Expand Up @@ -1062,8 +1084,8 @@ public final void blockingSubscribe(Action onComplete, Consumer<? super Throwabl
try {
onComplete.run();
} catch (Throwable exc) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
Exceptions.throwIfFatal(exc);
RxJavaPlugins.onError(exc);
}
}
}
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/hu/akarnokd/rxjava2/basetypes/NonoHide.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava2.basetypes;

import org.reactivestreams.*;

import io.reactivex.internal.subscriptions.SubscriptionHelper;

/**
* Hides the identity of the upstream and downstream including
* breaking fusion.
*/
final class NonoHide extends Nono {

final Nono source;

NonoHide(Nono source) {
this.source = source;
}

@Override
protected void subscribeActual(Subscriber<? super Void> s) {
source.subscribe(new HideSubscriber(s));
}

static final class HideSubscriber implements Subscriber<Void>, Subscription {

final Subscriber<? super Void> actual;

Subscription s;

HideSubscriber(Subscriber<? super Void> actual) {
this.actual = actual;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);
}
}

@Override
public void onNext(Void t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
}
}
}
Loading

0 comments on commit ea5d97a

Please sign in to comment.