Skip to content

Commit

Permalink
api clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Mar 20, 2015
1 parent 291e01c commit e685e69
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 38 deletions.
42 changes: 42 additions & 0 deletions src/main/java/com/aol/simple/react/stream/BaseLazySimpleReact.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.aol.simple.react.stream;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.aol.simple.react.async.Subscription;
import com.aol.simple.react.stream.traits.SimpleReactStream;

public abstract class BaseLazySimpleReact extends BaseSimpleReact{
/**
* Generate an infinite reactive flow. Requires a lazy flow.
*
* The flow will run indefinitely unless / until the provided Supplier throws an Exception
*
* @see com.aol.simple.react.async.Queue SimpleReact Queue for a way to create a more managable infinit flow
*
* @param s Supplier to generate the infinite flow
* @return Next stage in the flow
*/
public <U> SimpleReactStream< U> reactInfinitely(final Supplier<U> s) {
if(isEager())
throw new InfiniteProcessingException("To reactInfinitely use a lazy stream");
Subscription sub = new Subscription();
SimpleReactStream stream = construct(StreamSupport.stream(
new InfiniteClosingSpliterator(Long.MAX_VALUE, () -> CompletableFuture.completedFuture(s.get()),sub), false),
this.getExecutor(),getRetrier(),false).withSubscription(sub);

return stream;


}

public <U> SimpleReactStream<U> iterateInfinitely(final U seed, final UnaryOperator<U> f){
if(isEager())
throw new InfiniteProcessingException("To iterateInfinitely use a lazy stream");
return construct(Stream.iterate(seed, it -> f.apply(it)).map(it -> CompletableFuture.completedFuture(it)),
this.getExecutor(),getRetrier(),false);
}
}
28 changes: 0 additions & 28 deletions src/main/java/com/aol/simple/react/stream/BaseSimpleReact.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,36 +138,8 @@ public <U> SimpleReactStream< U> react(final Supplier<U> s, Generator t) {
return construct(t.generate(s),
this.getExecutor(),getRetrier(),isEager());

}
/**
* Generate an infinite reactive flow. Requires a lazy flow.
*
* The flow will run indefinitely unless / until the provided Supplier throws an Exception
*
* @see com.aol.simple.react.async.Queue SimpleReact Queue for a way to create a more managable infinit flow
*
* @param s Supplier to generate the infinite flow
* @return Next stage in the flow
*/
public <U> SimpleReactStream< U> reactInfinitely(final Supplier<U> s) {
if(isEager())
throw new InfiniteProcessingException("To reactInfinitely use a lazy stream");
Subscription sub = new Subscription();
SimpleReactStream stream = construct(StreamSupport.stream(
new InfiniteClosingSpliterator(Long.MAX_VALUE, () -> CompletableFuture.completedFuture(s.get()),sub), false),
this.getExecutor(),getRetrier(),false).withSubscription(sub);

return stream;


}

public <U> SimpleReactStream<U> iterateInfinitely(final U seed, final UnaryOperator<U> f){
if(isEager())
throw new InfiniteProcessingException("To iterateInfinitely use a lazy stream");
return construct(Stream.iterate(seed, it -> f.apply(it)).map(it -> CompletableFuture.completedFuture(it)),
this.getExecutor(),getRetrier(),false);
}
/**
* Create a Sequential Generator that will trigger a Supplier to be called the specified number of times
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.aol.simple.react.exceptions.SimpleReactFailedStageException;
import com.aol.simple.react.stream.StreamWrapper;
import com.aol.simple.react.stream.ThreadPools;
import com.aol.simple.react.stream.lazy.LazyFutureStream;
import com.aol.simple.react.stream.simple.SimpleReact;
import com.aol.simple.react.stream.traits.EagerToQueue;
import com.aol.simple.react.stream.traits.FutureStream;
Expand All @@ -44,7 +45,16 @@
*/
public interface EagerFutureStream<U> extends FutureStream<U>, EagerToQueue<U> {


/*
* React to new events with the supplied function on the supplied ExecutorService
*
* @param fn Apply to incoming events
* @param service Service to execute function on
* @return next stage in the Stream
*/
default <R> EagerFutureStream<R> then(final Function<U, R> fn, ExecutorService service){
return (EagerFutureStream<R>)FutureStream.super.then(fn, service);
}

/*
* Non-blocking asyncrhonous application of the supplied function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,16 @@ public interface LazyFutureStream<U> extends FutureStream<U>, LazyToQueue<U> {

LazyFutureStream<U> withLastActive(StreamWrapper streamWrapper);


/*
* React to new events with the supplied function on the supplied ExecutorService
*
* @param fn Apply to incoming events
* @param service Service to execute function on
* @return next stage in the Stream
*/
default <R> LazyFutureStream<R> then(final Function<U, R> fn, ExecutorService service){
return (LazyFutureStream<R>)FutureStream.super.then(fn, service);
}

/**
* Override return type on SimpleReactStream
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

import com.aol.simple.react.generators.Generator;
import com.aol.simple.react.generators.ReactIterator;
import com.aol.simple.react.stream.BaseSimpleReact;
import com.aol.simple.react.stream.BaseLazySimpleReact;
import com.aol.simple.react.stream.ThreadPools;
import com.aol.simple.react.stream.eager.EagerFutureStream;
import com.aol.simple.react.stream.traits.SimpleReactStream;
import com.nurkiewicz.asyncretry.RetryExecutor;

/**
Expand All @@ -33,7 +31,7 @@
@Builder
@Wither
@AllArgsConstructor
public class LazyReact extends BaseSimpleReact {
public class LazyReact extends BaseLazySimpleReact {

@Getter
private final ExecutorService executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.aol.simple.react.generators.ParallelGenerator;
import com.aol.simple.react.generators.ReactIterator;
import com.aol.simple.react.generators.SequentialIterator;
import com.aol.simple.react.stream.BaseSimpleReact;
import com.aol.simple.react.stream.BaseLazySimpleReact;
import com.aol.simple.react.stream.InfiniteProcessingException;
import com.aol.simple.react.stream.MissingValue;
import com.aol.simple.react.stream.ThreadPools;
Expand All @@ -39,7 +39,7 @@

@Builder
@Wither
public class SimpleReact extends BaseSimpleReact{
public class SimpleReact extends BaseLazySimpleReact{

@Getter
private final ExecutorService executor;
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/com/aol/simple/react/async/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Ignore;
import org.junit.Test;

import com.aol.simple.react.stream.lazy.LazyFutureStream;
import com.aol.simple.react.stream.simple.SimpleReact;
import com.aol.simple.react.stream.traits.SimpleReactStream;

Expand Down Expand Up @@ -228,9 +229,9 @@ public void mergingTestLazyIndividualMerge() {
count1 = 100000;

Queue<Integer> q = new Queue(new LinkedBlockingQueue());
parallelBuilder().reactInfinitely(() -> count++)
LazyFutureStream.parallelBuilder().reactInfinitely(() -> count++)
.then(it -> q.offer(it)).run(new ForkJoinPool(1));
parallelBuilder().reactInfinitely(() -> count1++)
LazyFutureStream.parallelBuilder().reactInfinitely(() -> count1++)
.then(it -> q.offer(it)).run(new ForkJoinPool(1));

List<Integer> result = q.stream().limit(1000)
Expand Down

0 comments on commit e685e69

Please sign in to comment.