Skip to content

Commit

Permalink
Merge pull request #20 from aol/bug-fixes
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
johnmcclean committed Mar 26, 2015
2 parents 5efdd84 + 61fa70e commit 08635db
Show file tree
Hide file tree
Showing 19 changed files with 975 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -21,8 +22,14 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -54,6 +61,11 @@
*/
public interface EagerFutureStream<U> extends FutureStream<U>, EagerToQueue<U> {


default <R> EagerFutureStream<R> map(Function<? super U, ? extends R> mapper) {
return (EagerFutureStream<R>)FutureStream.super.map(mapper);
}

/**
* @return a Stream that batches all completed elements from this stream
* since last read attempt into a collection
Expand Down Expand Up @@ -559,7 +571,6 @@ default <R> EagerFutureStream<R> anyOf(Function<U, R> fn) {
*/
@Override
default <R> EagerFutureStream<R> fromStream(Stream<R> stream) {

return (EagerFutureStream) FutureStream.super.fromStream(stream);
}

Expand Down Expand Up @@ -836,7 +847,8 @@ default EagerFutureStream<Tuple2<U,Long>> zipFuturesWithIndex() {

Seq seq = Seq.seq(getLastActive().stream().iterator()).zipWithIndex();
Seq<Tuple2<CompletableFuture<U>,Long>> withType = (Seq<Tuple2<CompletableFuture<U>,Long>>)seq;
Stream futureStream = fromStream(withType.map(t -> t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))).map(CompletableFuture::join));
Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t -> t.v1.thenApply(v ->
Tuple.tuple(t.v1.join(),t.v2))));
return (EagerFutureStream<Tuple2<U,Long>>)futureStream;

}
Expand Down Expand Up @@ -1203,7 +1215,7 @@ public static EagerReact parallelBuilder(int parallelism) {
*
* @see ThreadPools#getStandard() see RetryBuilder#getDefaultInstance()
*/
public static EagerReact paraellelCommonBuilder() {
public static EagerReact parallelCommonBuilder() {
return EagerReact
.builder()
.executor(ThreadPools.getStandard())
Expand Down Expand Up @@ -1326,6 +1338,77 @@ static <T> EagerFutureStream<T> futureStream(Iterator<T> iterator) {
spliteratorUnknownSize(iterator, ORDERED), false));
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#parallel()
*/
@Override
default EagerFutureStream<U> parallel() {
return this;
}

@Override
default EagerFutureStream<U> stream() {
return (EagerFutureStream<U>)FutureStream.super.stream();
}





/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#unordered()
*/
@Override
default EagerFutureStream<U> unordered() {
return this;
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#onClose(java.lang.Runnable)
*/
@Override
default EagerFutureStream<U> onClose(Runnable closeHandler) {

return (EagerFutureStream)FutureStream.super.onClose(closeHandler);
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#sorted()
*/
@Override
default EagerFutureStream<U> sorted() {
return (EagerFutureStream<U>)fromStream(FutureStream.super.sorted());
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#sorted(java.util.Comparator)
*/
@Override
default EagerFutureStream<U> sorted(Comparator<? super U> comparator) {
return (EagerFutureStream<U>)fromStream(FutureStream.super.sorted(comparator));
}

/**
* Give a function access to the current stage of a SimpleReact Stream
*
* @param consumer
* Consumer that will recieve current stage
* @return Self (current stage)
*/
default EagerFutureStream<U> self(Consumer<FutureStream<U>> consumer) {
return ( EagerFutureStream<U>)FutureStream.super.self(consumer);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public <U> EagerFutureStream<U> of(U... array) {
return (EagerFutureStream)super.of(array);
}

public <U> EagerFutureStream<U> react(final Supplier<U>... actions) {
@SafeVarargs
public final <U> EagerFutureStream<U> react(final Supplier<U>... actions) {

return (EagerFutureStream)super.reactI(actions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -37,6 +38,7 @@
import com.aol.simple.react.collectors.lazy.LazyResultConsumer;
import com.aol.simple.react.exceptions.SimpleReactFailedStageException;
import com.aol.simple.react.stream.CloseableIterator;
import com.aol.simple.react.stream.MissingValue;
import com.aol.simple.react.stream.StreamWrapper;
import com.aol.simple.react.stream.ThreadPools;
import com.aol.simple.react.stream.traits.FutureStream;
Expand Down Expand Up @@ -72,6 +74,10 @@ LazyFutureStream<U> withErrorHandler(
LazyFutureStream<U> withLastActive(StreamWrapper streamWrapper);



default <R> LazyFutureStream<R> map(Function<? super U, ? extends R> mapper) {
return (LazyFutureStream<R>)FutureStream.super.map(mapper);
}
/**
* Zip this Stream with an index, but Zip based on the underlying tasks, not completed results.
*
Expand Down Expand Up @@ -121,7 +127,7 @@ default LazyFutureStream<Tuple2<U,Long>> zipFuturesWithIndex() {
* @return Two equivalent Streams
*/
default Tuple2<Seq<U>, Seq<U>> duplicateFuturesSeq() {
// unblocking impl

Stream stream = getLastActive().stream();
Tuple2<Seq<CompletableFuture<U>>, Seq<CompletableFuture<U>>> duplicated = Seq
.seq((Stream<CompletableFuture<U>>) stream).duplicate();
Expand Down Expand Up @@ -188,7 +194,7 @@ default LazyFutureStream<Collection<U>> chunkSinceLastRead() {
* e.g.
* <code>
*
* EagerFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new
* LazyFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new
* Queue(),"odd",new Queue(),element-&gt; element%2==0? "even" : "odd");
*
* </code>
Expand Down Expand Up @@ -332,7 +338,7 @@ default LazyFutureStream<U> jitter(long jitterInNanos) {
* Stream. Note this doesn't neccessarily imply a fixed delay between
* element creation (although it may do). e.g.
*
* EagerFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours);
* LazyFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours);
*
* Will emit 1 on start, then 2 after an hour, 3 after 2 hours and so on.
*
Expand Down Expand Up @@ -456,6 +462,7 @@ default <T> LazyFutureStream<Tuple2<U, T>> withLatest(FutureStream<T> s) {
* @param futureStreams Streams to race
* @return First Stream to start emitting values
*/
@SafeVarargs
static <U> LazyFutureStream<U> firstOf(LazyFutureStream<U>... futureStreams) {
return (LazyFutureStream<U>) FutureStream.firstOf(futureStreams);
}
Expand Down Expand Up @@ -1096,7 +1103,7 @@ public static LazyReact sequentialBuilder() {
.executor(new ForkJoinPool(1))
.retrier(
RetryBuilder.getDefaultInstance().withScheduler(
Executors.newScheduledThreadPool(1))).build();
Executors.newScheduledThreadPool(2))).build();
}

/**
Expand Down Expand Up @@ -1349,5 +1356,77 @@ public T next() {

return Seq.seq(new LimitUntil());
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#parallel()
*/
@Override
default LazyFutureStream<U> parallel() {
return this;
}

@Override
default LazyFutureStream<U> stream() {
return (LazyFutureStream<U>)FutureStream.super.stream();
}





/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#unordered()
*/
@Override
default LazyFutureStream<U> unordered() {
return this;
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#onClose(java.lang.Runnable)
*/
@Override
default LazyFutureStream<U> onClose(Runnable closeHandler) {

return (LazyFutureStream)FutureStream.super.onClose(closeHandler);
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#sorted()
*/
@Override
default LazyFutureStream<U> sorted() {
return (LazyFutureStream<U>)fromStream(FutureStream.super.sorted());
}

/*
* (non-Javadoc)
*
* @see org.jooq.lambda.Seq#sorted(java.util.Comparator)
*/
@Override
default LazyFutureStream<U> sorted(Comparator<? super U> comparator) {
return (LazyFutureStream<U>)fromStream(FutureStream.super.sorted(comparator));
}

/**
* Give a function access to the current stage of a SimpleReact Stream
*
* @param consumer
* Consumer that will recieve current stage
* @return Self (current stage)
*/
default LazyFutureStream<U> self(Consumer<FutureStream<U>> consumer) {
return ( LazyFutureStream<U>)FutureStream.super.self(consumer);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public <U> LazyFutureStream<U> fromStream(
return (LazyFutureStream)super.fromStream(stream);
}

public <U> LazyFutureStream<U> react(final Supplier<U>... actions) {
@SafeVarargs
public final <U> LazyFutureStream<U> react(final Supplier<U>... actions) {

return (LazyFutureStream)super.reactI(actions);

Expand All @@ -114,6 +115,7 @@ public <U> LazyFutureStream<U> fromStreamWithoutFutures(Stream<U> stream) {
* @return EagerFutureStream
* @see com.aol.simple.react.stream.BaseSimpleReact#fromStreamWithoutFutures(java.util.stream.Stream)
*/
@SuppressWarnings("unchecked")
@Override
public LazyFutureStream<Integer> fromPrimitiveStream(IntStream stream) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.aol.simple.react.stream.InfiniteProcessingException;
import com.aol.simple.react.stream.MissingValue;
import com.aol.simple.react.stream.ThreadPools;
import com.aol.simple.react.stream.traits.FutureStream;
import com.aol.simple.react.stream.traits.SimpleReactStream;
import com.google.common.annotations.VisibleForTesting;
import com.nurkiewicz.asyncretry.RetryExecutor;
Expand Down Expand Up @@ -316,6 +317,9 @@ private SimpleReact(ExecutorService executor, RetryExecutor retrier,
this.retrier = retrier;
this.eager = Optional.ofNullable(eager).orElse(true);
}





}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.aol.simple.react.blockers.Blocker;
import com.aol.simple.react.collectors.ReactCollector;
Expand Down Expand Up @@ -103,7 +104,10 @@ default <R> R block(final Collector collector) {
@SuppressWarnings({ "rawtypes", "unchecked" })
default <R> R block(final Collector collector,
final StreamWrapper lastActive) {
return (R) lastActive.stream().map((future) -> {
Stream<CompletableFuture> stream = lastActive.stream();
if(!isEager())
stream = lastActive.stream().collect(Collectors.toList()).stream();
return (R) stream.map((future) -> {
return (U) getSafe(future,getErrorHandler());
}).filter(v -> v != MissingValue.MISSING_VALUE).collect(collector);
}
Expand Down
Loading

0 comments on commit 08635db

Please sign in to comment.