diff --git a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java index e35c6f1a39..0bde13c8a6 100644 --- a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -21,8 +22,14 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -54,6 +61,11 @@ */ public interface EagerFutureStream extends FutureStream, EagerToQueue { + + default EagerFutureStream map(Function mapper) { + return (EagerFutureStream)FutureStream.super.map(mapper); + } + /** * @return a Stream that batches all completed elements from this stream * since last read attempt into a collection @@ -559,7 +571,6 @@ default EagerFutureStream anyOf(Function fn) { */ @Override default EagerFutureStream fromStream(Stream stream) { - return (EagerFutureStream) FutureStream.super.fromStream(stream); } @@ -836,7 +847,8 @@ default EagerFutureStream> zipFuturesWithIndex() { Seq seq = Seq.seq(getLastActive().stream().iterator()).zipWithIndex(); Seq,Long>> withType = (Seq,Long>>)seq; - Stream futureStream = fromStream(withType.map(t -> t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))).map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t -> t.v1.thenApply(v -> + Tuple.tuple(t.v1.join(),t.v2)))); return (EagerFutureStream>)futureStream; } @@ -1203,7 +1215,7 @@ public static EagerReact parallelBuilder(int parallelism) { * * @see ThreadPools#getStandard() see RetryBuilder#getDefaultInstance() */ - public static EagerReact paraellelCommonBuilder() { + public static EagerReact parallelCommonBuilder() { return EagerReact .builder() .executor(ThreadPools.getStandard()) @@ -1326,6 +1338,77 @@ static EagerFutureStream futureStream(Iterator iterator) { spliteratorUnknownSize(iterator, ORDERED), false)); } + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#parallel() + */ + @Override + default EagerFutureStream parallel() { + return this; + } + + @Override + default EagerFutureStream stream() { + return (EagerFutureStream)FutureStream.super.stream(); + } + + + + + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#unordered() + */ + @Override + default EagerFutureStream unordered() { + return this; + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#onClose(java.lang.Runnable) + */ + @Override + default EagerFutureStream onClose(Runnable closeHandler) { + + return (EagerFutureStream)FutureStream.super.onClose(closeHandler); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted() + */ + @Override + default EagerFutureStream sorted() { + return (EagerFutureStream)fromStream(FutureStream.super.sorted()); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted(java.util.Comparator) + */ + @Override + default EagerFutureStream sorted(Comparator comparator) { + return (EagerFutureStream)fromStream(FutureStream.super.sorted(comparator)); + } + /** + * Give a function access to the current stage of a SimpleReact Stream + * + * @param consumer + * Consumer that will recieve current stage + * @return Self (current stage) + */ + default EagerFutureStream self(Consumer> consumer) { + return ( EagerFutureStream)FutureStream.super.self(consumer); + } + + } diff --git a/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java b/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java index dd61e8a83a..728ff60773 100644 --- a/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java +++ b/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java @@ -146,7 +146,8 @@ public EagerFutureStream of(U... array) { return (EagerFutureStream)super.of(array); } - public EagerFutureStream react(final Supplier... actions) { + @SafeVarargs + public final EagerFutureStream react(final Supplier... actions) { return (EagerFutureStream)super.reactI(actions); diff --git a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java index 19b5fd9055..d3f2d04f0c 100644 --- a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,6 +38,7 @@ import com.aol.simple.react.collectors.lazy.LazyResultConsumer; import com.aol.simple.react.exceptions.SimpleReactFailedStageException; import com.aol.simple.react.stream.CloseableIterator; +import com.aol.simple.react.stream.MissingValue; import com.aol.simple.react.stream.StreamWrapper; import com.aol.simple.react.stream.ThreadPools; import com.aol.simple.react.stream.traits.FutureStream; @@ -72,6 +74,10 @@ LazyFutureStream withErrorHandler( LazyFutureStream withLastActive(StreamWrapper streamWrapper); + + default LazyFutureStream map(Function mapper) { + return (LazyFutureStream)FutureStream.super.map(mapper); + } /** * Zip this Stream with an index, but Zip based on the underlying tasks, not completed results. * @@ -121,7 +127,7 @@ default LazyFutureStream> zipFuturesWithIndex() { * @return Two equivalent Streams */ default Tuple2, Seq> duplicateFuturesSeq() { - // unblocking impl + Stream stream = getLastActive().stream(); Tuple2>, Seq>> duplicated = Seq .seq((Stream>) stream).duplicate(); @@ -188,7 +194,7 @@ default LazyFutureStream> chunkSinceLastRead() { * e.g. * * - * EagerFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new + * LazyFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new * Queue(),"odd",new Queue(),element-> element%2==0? "even" : "odd"); * * @@ -332,7 +338,7 @@ default LazyFutureStream jitter(long jitterInNanos) { * Stream. Note this doesn't neccessarily imply a fixed delay between * element creation (although it may do). e.g. * - * EagerFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours); + * LazyFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours); * * Will emit 1 on start, then 2 after an hour, 3 after 2 hours and so on. * @@ -456,6 +462,7 @@ default LazyFutureStream> withLatest(FutureStream s) { * @param futureStreams Streams to race * @return First Stream to start emitting values */ + @SafeVarargs static LazyFutureStream firstOf(LazyFutureStream... futureStreams) { return (LazyFutureStream) FutureStream.firstOf(futureStreams); } @@ -1096,7 +1103,7 @@ public static LazyReact sequentialBuilder() { .executor(new ForkJoinPool(1)) .retrier( RetryBuilder.getDefaultInstance().withScheduler( - Executors.newScheduledThreadPool(1))).build(); + Executors.newScheduledThreadPool(2))).build(); } /** @@ -1349,5 +1356,77 @@ public T next() { return Seq.seq(new LimitUntil()); } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#parallel() + */ + @Override + default LazyFutureStream parallel() { + return this; + } + + @Override + default LazyFutureStream stream() { + return (LazyFutureStream)FutureStream.super.stream(); + } + + + + + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#unordered() + */ + @Override + default LazyFutureStream unordered() { + return this; + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#onClose(java.lang.Runnable) + */ + @Override + default LazyFutureStream onClose(Runnable closeHandler) { + + return (LazyFutureStream)FutureStream.super.onClose(closeHandler); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted() + */ + @Override + default LazyFutureStream sorted() { + return (LazyFutureStream)fromStream(FutureStream.super.sorted()); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted(java.util.Comparator) + */ + @Override + default LazyFutureStream sorted(Comparator comparator) { + return (LazyFutureStream)fromStream(FutureStream.super.sorted(comparator)); + } + + /** + * Give a function access to the current stage of a SimpleReact Stream + * + * @param consumer + * Consumer that will recieve current stage + * @return Self (current stage) + */ + default LazyFutureStream self(Consumer> consumer) { + return ( LazyFutureStream)FutureStream.super.self(consumer); + } + } diff --git a/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java b/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java index 06d59ccfc6..89ad9d5ac6 100644 --- a/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java +++ b/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java @@ -88,7 +88,8 @@ public LazyFutureStream fromStream( return (LazyFutureStream)super.fromStream(stream); } - public LazyFutureStream react(final Supplier... actions) { + @SafeVarargs + public final LazyFutureStream react(final Supplier... actions) { return (LazyFutureStream)super.reactI(actions); @@ -114,6 +115,7 @@ public LazyFutureStream fromStreamWithoutFutures(Stream stream) { * @return EagerFutureStream * @see com.aol.simple.react.stream.BaseSimpleReact#fromStreamWithoutFutures(java.util.stream.Stream) */ + @SuppressWarnings("unchecked") @Override public LazyFutureStream fromPrimitiveStream(IntStream stream) { diff --git a/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java b/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java index 327eccce0e..54bb98ae83 100644 --- a/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java +++ b/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java @@ -24,6 +24,7 @@ import com.aol.simple.react.stream.InfiniteProcessingException; import com.aol.simple.react.stream.MissingValue; import com.aol.simple.react.stream.ThreadPools; +import com.aol.simple.react.stream.traits.FutureStream; import com.aol.simple.react.stream.traits.SimpleReactStream; import com.google.common.annotations.VisibleForTesting; import com.nurkiewicz.asyncretry.RetryExecutor; @@ -316,6 +317,9 @@ private SimpleReact(ExecutorService executor, RetryExecutor retrier, this.retrier = retrier; this.eager = Optional.ofNullable(eager).orElse(true); } + + + } diff --git a/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java b/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java index c4b782c3b9..03425bfad2 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java @@ -9,6 +9,7 @@ import java.util.function.Predicate; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.aol.simple.react.blockers.Blocker; import com.aol.simple.react.collectors.ReactCollector; @@ -103,7 +104,10 @@ default R block(final Collector collector) { @SuppressWarnings({ "rawtypes", "unchecked" }) default R block(final Collector collector, final StreamWrapper lastActive) { - return (R) lastActive.stream().map((future) -> { + Stream stream = lastActive.stream(); + if(!isEager()) + stream = lastActive.stream().collect(Collectors.toList()).stream(); + return (R) stream.map((future) -> { return (U) getSafe(future,getErrorHandler()); }).filter(v -> v != MissingValue.MISSING_VALUE).collect(collector); } diff --git a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java index 4dbbff234b..5ad77dd117 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java @@ -67,8 +67,8 @@ default FutureStream> zipFutures(Stream other) { return zipFutures((FutureStream)other); Seq seq = Seq.seq(getLastActive().stream()).zip(Seq.seq(other)); Seq,R>> withType = (Seq,R>>)seq; - Stream futureStream = fromStream(withType.map(t ->t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))) - .map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t ->t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))) + ); return (FutureStream>)futureStream; @@ -86,8 +86,8 @@ default FutureStream> zipFutures(Stream other) { default FutureStream> zipFutures(FutureStream other) { Seq seq = Seq.seq(getLastActive().stream()).zip(Seq.seq(other.getLastActive().stream())); Seq,CompletableFuture>> withType = (Seq,CompletableFuture>>)seq; - Stream futureStream = fromStream(withType.map(t ->CompletableFuture.allOf(t.v1,t.v2).thenApply(v -> Tuple.tuple(t.v1.join(),t.v2.join()))) - .map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t ->CompletableFuture.allOf(t.v1,t.v2).thenApply(v -> Tuple.tuple(t.v1.join(),t.v2.join()))) + ); return (FutureStream>)futureStream; @@ -373,6 +373,7 @@ default FutureStream control(Function, Supplier> fn){ */ default FutureStream debounce(long time, TimeUnit unit) { Queue queue = toQueue(); + long timeNanos = unit.toNanos(time); Function, Supplier> fn = s -> { return () -> { @@ -383,7 +384,7 @@ default FutureStream debounce(long time, TimeUnit unit) { while(elapsedNanos>0){ result = Optional.of(s.get()); - elapsedNanos= unit.toNanos(time) - timer.getElapsedNanoseconds(); + elapsedNanos= timeNanos - timer.getElapsedNanoseconds(); } @@ -729,6 +730,7 @@ public R next() { return Seq.seq(new Zip()).filter(next->!(next instanceof Optional)); } + static Seq skipUntil(FutureStream left, FutureStream right) { diff --git a/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java b/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java index 70a7b331c0..c4f974fb1a 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java @@ -38,7 +38,8 @@ default void run(ExecutorService e) { } default void run(ExecutorService e,Runnable r) { - new SimpleReact(e).react(() -> new Runner(r).run(getLastActive(),new EmptyCollector(getLazyCollector().getMaxActive()))); + new SimpleReact(e).react(() -> new Runner(r).run(getLastActive(), + new EmptyCollector(getLazyCollector().getMaxActive()))); } default void runThread(Runnable r) { diff --git a/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java b/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java index 805acd5133..a2983a1d88 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java +++ b/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java @@ -9,17 +9,15 @@ import com.aol.simple.react.stream.BaseSimpleReact; import com.aol.simple.react.stream.lazy.LazyReact; -public interface LazyToQueue extends ToQueue{ +public interface LazyToQueue extends ToQueue { - - - - abstract SimpleReactStream allOf(final Collector collector, + abstract SimpleReactStream allOf(final Collector collector, final Function fn); - - - abstract SimpleReactStream then(final Function fn, ExecutorService exec); - abstract T getPopulator(); + + abstract SimpleReactStream then(final Function fn, + ExecutorService exec); + + abstract T getPopulator(); /** * Convert the current Stream to a SimpleReact Queue @@ -28,42 +26,44 @@ abstract SimpleReactStream allOf(final Collector collector, */ default Queue toQueue() { Queue queue = this.getQueueFactory().build(); - - LazyReact service = getPopulator(); - then(queue::offer,service.getExecutor()).runThread( - () -> {queue.close(); returnPopulator(service); }); - + LazyReact service = getPopulator(); + then(queue::offer, service.getExecutor()).runThread(() -> { + queue.close(); + returnPopulator(service); + }); + return queue; } - - default Queue toQueue(Function fn) { + + default Queue toQueue(Function fn) { Queue queue = fn.apply(this.getQueueFactory().build()); - - LazyReact service = getPopulator(); - then(queue::offer,service.getExecutor()).runThread( - () -> {queue.close(); returnPopulator(service); }); - + LazyReact service = getPopulator(); + then(queue::offer, service.getExecutor()).runThread(() -> { + queue.close(); + returnPopulator(service); + }); + return queue; } - default void toQueue(Map> shards, Function sharder) { - - - LazyReact service = getPopulator(); - then(it-> shards.get(sharder.apply(it)).offer(it),service.getExecutor()) - .capture(Throwable::printStackTrace) - .runThread( - () -> {shards.values().forEach(it->it.close()); returnPopulator(service); }); - - - + + default void toQueue(Map> shards, Function sharder) { + + LazyReact service = getPopulator(); + then(it -> shards.get(sharder.apply(it)).offer(it), + service.getExecutor()) + .runThread(() -> { + shards.values().forEach(it -> it.close()); + returnPopulator(service); + }); + } - - - abstract void returnPopulator(T service); - default U add(U value,Queue queue){ - if(!queue.add(value)) + + abstract void returnPopulator(T service); + + default U add(U value, Queue queue) { + if (!queue.add(value)) throw new RuntimeException(); return value; } diff --git a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java index 4cae5f2567..6892208fa3 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java @@ -34,6 +34,7 @@ import com.aol.simple.react.stream.simple.SimpleReact; import com.aol.simple.react.stream.simple.SimpleReactStreamImpl; import com.nurkiewicz.asyncretry.RetryExecutor; +import com.nurkiewicz.asyncretry.policy.AbortRetryException; public interface SimpleReactStream extends LazyStream, @@ -138,13 +139,19 @@ default SimpleReactStream retry(final Function fn) { return (SimpleReactStream) this.withLastActive(getLastActive().permutate( getLastActive().stream().map( - (ft) -> ft.thenApplyAsync((res) -> BlockingStream.getSafe(getRetrier() - .getWithRetry(() -> fn.apply((U) res)),getErrorHandler()), + (ft) -> ft.thenApplyAsync(res -> + getRetrier().getWithRetry(()->SimpleReactStream.handleExceptions(fn).apply((U)res) ).join() + , + //BlockingStream.getSafe(getRetrier() + // .getWithRetry(() -> fn.apply((U) res)) + // ,getErrorHandler()), getTaskExecutor())), Collectors.toList())); } default SimpleReactStream fromStream(Stream stream) { + + return (SimpleReactStream) this.withLastActive(getLastActive() .withNewStream(stream.map(CompletableFuture::completedFuture))); } @@ -159,6 +166,12 @@ default SimpleReactStream fromStream(Stream stream) { default SimpleReactStream fromStreamCompletableFuture( Stream> stream) { Stream noType = stream; + return (SimpleReactStream) this.withLastActive(getLastActive() + .withNewStream(noType)); + } + default SimpleReactStream fromStreamCompletableFutureReplace( + Stream> stream) { + Stream noType = stream; return (SimpleReactStream) this.withLastActive(getLastActive() .withStream(noType)); } @@ -235,6 +248,8 @@ static Function handleExceptions(Function fn) { try { return fn.apply(input); } catch (Throwable t) { + if(t instanceof AbortRetryException)//special case for retry + throw t; throw new SimpleReactFailedStageException(input, t); } diff --git a/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java b/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java index 83155a22ae..85a1919849 100644 --- a/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java +++ b/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java @@ -69,6 +69,7 @@ public void testFindAny(){ } @Test public void testDistinct(){ + assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()).size(),is(2)); assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()),hasItem(1)); assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()),hasItem(2)); } @@ -178,5 +179,32 @@ public void testCount(){ assertThat(of(1,5,3,4,2).count(),is(5L)); } + @Test + public void collectSBB(){ + List list = of(1,2,3,4,5).collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + assertThat(list.size(),is(5)); + } + @Test + public void collect(){ + assertThat(of(1,2,3,4,5).collect(Collectors.toList()).size(),is(5)); + assertThat(of(1,1,1,2).collect(Collectors.toSet()).size(),is(2)); + } + @Test + public void testFilter(){ + assertThat(of(1,1,1,2).filter(it -> it==1).collect(Collectors.toList()).size(),is(3)); + } + @Test + public void testMap(){ + assertThat(of(1).map(it->it+100).collect(Collectors.toList()).get(0),is(101)); + } + Object val; + @Test + public void testPeek(){ + val = null; + of(1).map(it->it+100).peek(it -> val=it).collect(Collectors.toList()); + assertThat(val,is(101)); + } + + } diff --git a/src/test/java/com/aol/simple/react/base/BaseSeqTest.java b/src/test/java/com/aol/simple/react/base/BaseSeqTest.java index 423df63e9a..e44db6e37a 100644 --- a/src/test/java/com/aol/simple/react/base/BaseSeqTest.java +++ b/src/test/java/com/aol/simple/react/base/BaseSeqTest.java @@ -91,6 +91,11 @@ public void combine(){ assertThat(of(1,2,3,4,5,6).combineLatest(of(3)).collect(Collectors.toList()).size(),greaterThan(5)); } @Test + public void combineLatest(){ + + assertThat(of(1,2,3,4,5,6).combineLatest(react(()->3,()->value())).collect(Collectors.toList()).size(),greaterThan(5)); + } + @Test public void combineValues(){ assertTrue(of(1,2,3,4,5,6).combineLatest(of(3)).anyMatch(it-> it.v2==null)); //assertTrue(of(1,2,3,4,5,6).combine(of(3)).oneMatch(it-> it.v2==3)); @@ -495,10 +500,10 @@ public void testZipDifferingLength() { @Test public void testZipWithIndex() { - assertEquals(asList(), Seq.of().zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L)), Seq.of("a").zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L), tuple("b", 1L)), Seq.of("a", "b").zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L), tuple("b", 1L), tuple("c", 2L)), Seq.of("a", "b", "c").zipWithIndex().toList()); + assertEquals(asList(), of().zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L)), of("a").zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L), tuple("b", 1L)), of("a", "b").zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L), tuple("b", 1L), tuple("c", 2L)), of("a", "b", "c").zipWithIndex().toList()); } diff --git a/src/test/java/com/aol/simple/react/eager/EagerTest.java b/src/test/java/com/aol/simple/react/eager/EagerTest.java index 144263e6d6..8396b00a0b 100644 --- a/src/test/java/com/aol/simple/react/eager/EagerTest.java +++ b/src/test/java/com/aol/simple/react/eager/EagerTest.java @@ -1,15 +1,36 @@ package com.aol.simple.react.eager; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import java.util.stream.IntStream; + +import org.junit.Ignore; import org.junit.Test; import com.aol.simple.react.stream.eager.EagerFutureStream; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; public class EagerTest { + @Test @Ignore + public void jitter(){ + EagerFutureStream.parallelCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it*100) + .jitter(10l) + .peek(System.out::println) + .block(); + } + @Test @Ignore + public void jitterSequential(){ + EagerFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it*100) + .jitter(100000l) + .peek(System.out::println) + .runOnCurrent(); + } @Test public void doOnEach(){ String[] found = {""}; diff --git a/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java b/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java index 6821c35681..92988f0465 100644 --- a/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java +++ b/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java @@ -103,7 +103,7 @@ public void batchSinceLastReadIterator() throws InterruptedException{ } @Test public void batchSinceLastRead() throws InterruptedException{ - List cols = of(1,2,3,4,5,6).chunkSinceLastRead().peek(it->{sleep(50);}).collect(Collectors.toList()); + List cols = of(1,2,3,4,5,6).chunkSinceLastRead().peek(System.out::println).peek(it->{sleep(50);}).collect(Collectors.toList()); System.out.println(cols.get(0)); assertThat(cols.get(0).size(),is(6)); diff --git a/src/test/java/com/aol/simple/react/lazy/LazyTest.java b/src/test/java/com/aol/simple/react/lazy/LazyTest.java index 9b53e3c4f0..40db9f1773 100644 --- a/src/test/java/com/aol/simple/react/lazy/LazyTest.java +++ b/src/test/java/com/aol/simple/react/lazy/LazyTest.java @@ -1,40 +1,88 @@ package com.aol.simple.react.lazy; +import static org.junit.Assert.assertThat; import static java.util.Arrays.asList; -import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.Matchers.*; + import com.aol.simple.react.stream.lazy.LazyFutureStream; +import com.aol.simple.react.stream.traits.FutureStream; public class LazyTest { + + @Test @Ignore + public void debounce() { + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .debounce(100, TimeUnit.MILLISECONDS) + .peek(System.out::println) + .block().size()); + } + + @Test @Ignore + public void skipUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 50).then(this::sleep) + .peek(System.out::println); + assertThat( + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 100000)) + .skipUntil(stoppingStream).peek(System.out::println) + .block().size(), greaterThan(0)); + } + @Test - public void lazyReactStream(){ - LazyFutureStream.sequentialBuilder() - .react( ()-> 1 ) - .map(list -> 1+2) - .block(); + @Ignore + public void takeUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 100).then(this::sleep) + .peek(System.out::println); + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + // .peek(System.out::println) + .takeUntil(stoppingStream).peek(System.out::println).block() + .size()); } + + private boolean sleep(int i) { + + try { + Thread.sleep(i); + } catch (InterruptedException e) { + + e.printStackTrace(); + } + return true; + + } + + @Test + public void lazyReactStream() { + LazyFutureStream.sequentialBuilder().react(() -> 1).map(list -> 1 + 2) + .block(); + } + @Test - public void lazyParallel(){ - LazyFutureStream.parallelBuilder() - .react( ()-> 1 ) - .map(list -> 1+2) - .block(); + public void lazyParallel() { + LazyFutureStream.parallelBuilder().react(() -> 1).map(list -> 1 + 2) + .block(); } + @Test - public void lazyReactStreamList(){ - LazyFutureStream.sequentialBuilder() - .react( asList(()-> 1 )) - .map(list -> 1+2) - .block(); + public void lazyReactStreamList() { + LazyFutureStream.sequentialBuilder().react(asList(() -> 1)) + .map(list -> 1 + 2).block(); } + @Test - public void lazyParallelList(){ - LazyFutureStream.parallelBuilder() - .react( asList(()-> 1 )) - .map(list -> 1+2) - .block(); + public void lazyParallelList() { + LazyFutureStream.parallelBuilder().react(asList(() -> 1)) + .map(list -> 1 + 2).block(); } } diff --git a/src/test/java/com/aol/simple/react/lazy/Tutorial.java b/src/test/java/com/aol/simple/react/lazy/Tutorial.java index a2470cbf02..2f74975a62 100644 --- a/src/test/java/com/aol/simple/react/lazy/Tutorial.java +++ b/src/test/java/com/aol/simple/react/lazy/Tutorial.java @@ -1,73 +1,588 @@ package com.aol.simple.react.lazy; +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; import java.util.Collection; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + import org.junit.Ignore; import org.junit.Test; +import com.aol.simple.react.async.Queue; import com.aol.simple.react.async.QueueFactories; +import com.aol.simple.react.exceptions.SimpleReactFailedStageException; +import com.aol.simple.react.stream.eager.EagerFutureStream; +import com.aol.simple.react.stream.eager.EagerReact; import com.aol.simple.react.stream.lazy.LazyFutureStream; +import com.aol.simple.react.stream.simple.SimpleReact; +import com.aol.simple.react.stream.traits.FutureStream; +import com.aol.simple.react.threads.ReactPool; import com.aol.simple.react.threads.SequentialElasticPools; +import com.google.common.collect.ImmutableMap; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; +@Ignore public class Tutorial { - String status="ok"; + @SuppressWarnings("unchecked") + @Test + public void zipByResults() { + + LazyFutureStream a = LazyFutureStream.parallelBuilder(3).react( + () -> slowest(), () -> fast(), () -> slow()); + LazyFutureStream b = LazyFutureStream.sequentialBuilder().of( + 1, 2, 3, 4, 5, 6); + + a.zip(b).forEach(System.out::println); + + } + + @SuppressWarnings("unchecked") + @Test + public void zipWithIndex() { + + LazyFutureStream.sequentialBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .zipWithIndex().forEach(System.out::println); + + } + + @SuppressWarnings("unchecked") + @Test + public void zipFuturesWithIndex() { + + EagerFutureStream.parallelBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .zipFuturesWithIndex().forEach(System.out::println); + + } + + @SuppressWarnings("unchecked") + @Test + public void combineLatest() { + LazyFutureStream + .parallelBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .combineLatest( + LazyFutureStream.sequentialBuilder().of(1, 2, 3, 4, 5, + 6)).forEach(System.out::println); + } + + @SuppressWarnings("unchecked") + @Test + public void withLatest() { + LazyFutureStream + .sequentialBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .withLatest( + LazyFutureStream.sequentialBuilder().of(1, 2, 3, 4, 5, + 6)).forEach(System.out::println); + } + + @SuppressWarnings("unchecked") + @Test + public void zipByFutures() { + + LazyFutureStream.parallelBuilder(3) + .react(() -> slowest(), () -> fast(), () -> slow()) + .flatMap(it -> it.chars().boxed()).forEach(System.out::println); + + EagerFutureStream a = EagerFutureStream.parallelBuilder(3) + .react(() -> slowest(), () -> fast(), () -> slow()); + EagerFutureStream b = EagerFutureStream.sequentialBuilder() + .of(1, 2, 3, 4, 5, 6); + + a.zipFutures(b).forEach(System.out::println); + + } + + private String slowest() { + sleep(2500); + return "slowestResult"; + } + + private String slow() { + sleep(100); + return "slowResult"; + } + + private String fast() { + return "fast"; + } + + @Test + public void errorHandling() { + AsyncRetryExecutor retrier = new AsyncRetryExecutor( + Executors.newScheduledThreadPool(Runtime.getRuntime() + .availableProcessors())).retryOn(Throwable.class) + .withMaxDelay(1_000). // 1 seconds + withUniformJitter(). // add between +/- 100 ms randomly + withMaxRetries(1); + + List results = LazyFutureStream.sequentialBuilder() + .withRetrier(retrier) + .react(() -> "new event1", () -> "new event2") + .retry(this::unreliable).onFail(e -> "default") + .peek(System.out::println).capture(Throwable::printStackTrace) + .block(); + + assertThat(results.size(), equalTo(2)); + + } + + private String unreliable(Object o) { + throw new RuntimeException(); + + } + + @SuppressWarnings("unchecked") + @Test + public void shard() { + Map> shards = new HashMap<>(); + shards.put(0, new Queue<>()); + shards.put(1, new Queue<>()); + shards.put(2, new Queue<>()); + + Map> sharded = LazyFutureStream + .sequentialBuilder().react(() -> loadUserData()) + .flatMap(Collection::stream) + .shard(shards, user -> user.getUserId() % 3); + + System.out.println("First shard"); + sharded.get(0).forEach(System.out::println); + + System.out.println("Second shard"); + sharded.get(1).forEach(System.out::println); + + System.out.println("Third shard"); + sharded.get(2).forEach(System.out::println); + } + + @Test + public void firstOf(){ + + LazyFutureStream stream1 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromDb()) + .map(this::convertToStandardFormat); + + LazyFutureStream stream2 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromService1()) + .map(this::convertToStandardFormat); + + LazyFutureStream stream3 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromService2()) + .map(this::convertToStandardFormat); + + LazyFutureStream.firstOf(stream1, stream2, stream3) + .peek(System.out::println) + .map(this::saveData) + .runOnCurrent(); + + + } + + + @Test + public void anyOf(){ + + + + LazyFutureStream.parallelBuilder(8).react(() -> loadFromDb(),() -> loadFromService1(), + () -> loadFromService2()) + .map(this::convertToStandardFormat) + .peek(System.out::println) + .map(this::saveData) + .block(); + + + } + + private String convertToStandardFormat(String input){ + if(count++%2==0){ + System.out.println("sleeping!" + input); + sleep(1000); + } + return "converted " + input; + } + private String loadFromDb(){ + + return "from db"; + } + private String loadFromService1(){ + + return "from service1"; + } + private String loadFromService2(){ + return "from service2"; + } + + + @Test + public void allOf(){ + LazyFutureStream.sequentialBuilder().react(()->1,()->2,()->3) + .map(it->it+100) + .peek(System.out::println) + .allOf(c-> ImmutableMap.of("numbers",c)) + .peek(System.out::println) + .block(); + } + + + + + + + + + + @Test + public void filterMapReduceFlatMap() { + int totalVisits = LazyFutureStream.sequentialBuilder() + .react(() -> loadUserData()).flatMap(Collection::stream) + .filter(User::hasPurchased).map(User::getTotalVisits) + .reduce(0, (acc, next) -> acc + next); + + System.out.println("Total visits is : " + totalVisits); + } + + @AllArgsConstructor + @ToString + @Getter + class User { + boolean purchased; + int totalVisits; + final int userId = count++; + + public boolean hasPurchased() { + return purchased; + } + } + + private Collection loadUserData() { + return Arrays.asList(new User(true, 102), new User(false, 501), + new User(true, 14), new User(true, 23), new User(false, 3), + new User(true, 531), new User(false, 56)); + } + + @Test + public void gettingStarted() { + + List results = new SimpleReact() + .react(() -> readData("data1"), () -> readData("data2")) + .onFail(RuntimeException.class, this::loadFromDb) + .peek(System.out::println).then(this::processData).block(); + + } + + private String readData(String name) { + if (name.equals("data1")) + throw new RuntimeException(); + + else + return "hello world from file!"; + + } + + private String processData(String data) { + return "processed : " + data; + } + + private String loadFromDb(SimpleReactFailedStageException e) { + return "hello world from DB!"; + } + + @Test + public void skipUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 1000).then(this::sleep) + .peek(System.out::println); + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + // .peek(System.out::println) + .skipUntil(stoppingStream).peek(System.out::println).block() + .size()); + } + + private boolean sleep(int i) { + + try { + Thread.sleep(i); + } catch (InterruptedException e) { + + e.printStackTrace(); + } + return true; + + } + + @Test + public void jitter() { + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it * 100).jitter(10000000l) + .peek(System.out::println).runOnCurrent(); + } + + @Test + public void fixedDelay() { + + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .fixedDelay(1l, TimeUnit.SECONDS).peek(System.out::println) + .runOnCurrent(); + } + + @Test + public void elasticPool() { + + List files = Arrays.asList("/tmp/1.data", "/tmp/2.data"); + + List data = SequentialElasticPools.lazyReact.react(er -> er + .reactToCollection(files).map(this::loadData) + .peek(System.out::println).map(this::saveData) + .collect(Collectors.toList())); + + System.out.println("Loaded and saved " + data.size()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testRoundRobin() { + + EagerReact react1 = new EagerReact(new ForkJoinPool(4)); + EagerReact react2 = new EagerReact(new ForkJoinPool(4)); + + ReactPool pool = ReactPool.boundedPool(asList(react1, + react2)); + + Supplier[] suppliers = { () -> "hello", () -> "world" }; + + pool.react((er) -> er.react(suppliers).peek( + it -> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + + pool.react((er) -> er.react(suppliers).peek( + it -> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + + } + + @Test + public void add100() { + + new SimpleReact().of(1, 2, 3, 4, 5).then(num -> num + 100) + .then(num -> Thread.currentThread().getId()) + .peek(System.out::println); + + } + + Stack dataArray = new Stack() { + { + add("{order:1000,{customer:604}}"); + add("{order:1001,{customer:605}}"); + } + }; + + private String loadData(String file) { + sleep(1000); + return dataArray.pop(); + } + + private Status saveData(String data) { + return new Status(); + } + + @Test + public void debounce() { + LazyFutureStream.sequentialCommonBuilder() + .iterateInfinitely(0, it -> it + 1) + .debounce(100, TimeUnit.MILLISECONDS).peek(System.out::println) + .runOnCurrent(); + } + + @Test + public void onePerSecond() { + + LazyFutureStream.sequentialCommonBuilder() + .iterateInfinitely(0, it -> it + 1).onePer(1, TimeUnit.SECONDS) + .map(seconds -> readStatus()).retry(this::saveStatus) + .peek(System.out::println).capture(Throwable::printStackTrace) + .block(); + + } + + private String saveStatus(Status s) { + if (count++ % 2 == 0) + throw new RuntimeException(); + + return "Status saved:" + s.getId(); + } + + int count = 0; + + private Status readStatus() { + return new Status(); + } + + static int nextId = 1; + + @Getter + class Status { + long id = nextId++; + } + + String status = "ok"; + /** * check status every second, batch every 10 secs */ - @Test @Ignore - public void onePerSecondAndBatch(){ - List> collected = LazyFutureStream.sequentialCommonBuilder().reactInfinitely(()->status) - .withQueueFactory(QueueFactories.boundedQueue(1)) - .onePer(1, TimeUnit.SECONDS) - .batchByTime(10, TimeUnit.SECONDS) - .limit(15) - .block(); + @Test + @Ignore + public void onePerSecondAndBatch() { + List> collected = LazyFutureStream + .sequentialCommonBuilder().reactInfinitely(() -> status) + .withQueueFactory(QueueFactories.boundedQueue(1)) + .onePer(1, TimeUnit.SECONDS).batchByTime(10, TimeUnit.SECONDS) + .limit(15).block(); System.out.println(collected); } + /** * create a stream of time intervals in seconds */ - @Test - public void secondsTimeInterval(){ - List> collected = LazyFutureStream.sequentialCommonBuilder().iterateInfinitely(0, it -> it+1) - .withQueueFactory(QueueFactories.boundedQueue(1)) - .onePer(1, TimeUnit.SECONDS) - .peek(System.out::println) - .batchByTime(10, TimeUnit.SECONDS) - .peek(System.out::println) - .limit(15) - .block(); + @Test + public void secondsTimeInterval() { + List> collected = LazyFutureStream + .sequentialCommonBuilder().iterateInfinitely(0, it -> it + 1) + .withQueueFactory(QueueFactories.boundedQueue(1)) + .onePer(1, TimeUnit.SECONDS).peek(System.out::println) + .batchByTime(10, TimeUnit.SECONDS).peek(System.out::println) + .limit(15).block(); System.out.println(collected); } - @Test @Ignore - public void range(){ - List> collected = LazyFutureStream.sequentialCommonBuilder() - .fromPrimitiveStream(IntStream.range(0, 10)) - .batchBySize(5) - .block(); + + @Test + @Ignore + public void range() { + List> collected = LazyFutureStream + .sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 10)).batchBySize(5) + .block(); System.out.println(collected); } - - @Test @Ignore - public void executeRestCallInPool(){ - boolean success = SequentialElasticPools.eagerReact.react( er-> er.react(()->restGet()) - .map(Tutorial::transformData) - .then(Tutorial::saveToDb) - .first()); - } - private static boolean saveToDb(Object o){ + + @Test + @Ignore + public void executeRestCallInPool() { + boolean success = SequentialElasticPools.eagerReact.react(er -> er + .react(() -> restGet()).map(Tutorial::transformData) + .then(Tutorial::saveToDb).first()); + } + + private static boolean saveToDb(Object o) { return true; } + private Object restGet() { // TODO Auto-generated method stub return null; } + private static Object transformData(Object o) { // TODO Auto-generated method stub return null; } + + @Test + public void batchBySize() { + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .batchBySize(10) + .onePer(1, TimeUnit.SECONDS) + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); + + } + + @Test + public void batchByTime() { + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .batchByTime(1, TimeUnit.SECONDS) + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); + + } + + @Test + public void chunkSinceLastRead() { + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .chunkSinceLastRead() + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); + + } + + private void save(Map map) { + + } + + private Collection processOrders(Collection input) { + sleep(100); + return input.stream().map(m -> ImmutableMap.of("processed", m)) + .collect(Collectors.toList()); + } + + private Map parseJson(String json) { + return ImmutableMap.of("id", count++, "type", "order", "date", + new Date()); + } + + private String readFileToString(String name) { + return ""; + } + + private String nextFile() { + + return null; + } } diff --git a/src/test/java/com/aol/simple/react/simple/AllOfTest.java b/src/test/java/com/aol/simple/react/simple/AllOfTest.java index bf4a093c41..5782e2ffcf 100644 --- a/src/test/java/com/aol/simple/react/simple/AllOfTest.java +++ b/src/test/java/com/aol/simple/react/simple/AllOfTest.java @@ -1,25 +1,49 @@ package com.aol.simple.react.simple; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.Test; import com.aol.simple.react.extractors.Extractors; +import com.aol.simple.react.stream.lazy.LazyFutureStream; import com.aol.simple.react.stream.simple.SimpleReact; +import com.google.common.collect.ImmutableMap; public class AllOfTest { + @Test + public void allOf(){ + List>> result = new ArrayList<>(); + Supplier s = ()->result; + + LazyFutureStream.sequentialBuilder().react(()->1,()->2,()->3) + .map(it->it+100) + .peek(System.out::println) + .allOf(c-> { System.out.println(c);return ImmutableMap.of("numbers",c);}) + .peek(map -> System.out.println(map)) + .run(s); + + assertThat(result.size(),is(1)); + } + @Test public void testAllOfFailure(){ new SimpleReact().react(()-> { throw new RuntimeException();},()->"hello",()->"world") diff --git a/src/test/java/com/aol/simple/react/simple/RetryTest.java b/src/test/java/com/aol/simple/react/simple/RetryTest.java index 0d30c0a1db..edb064ca65 100644 --- a/src/test/java/com/aol/simple/react/simple/RetryTest.java +++ b/src/test/java/com/aol/simple/react/simple/RetryTest.java @@ -25,6 +25,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import com.aol.simple.react.exceptions.SimpleReactFailedStageException; import com.aol.simple.react.stream.simple.SimpleReact; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.nurkiewicz.asyncretry.RetryExecutor; @@ -119,7 +120,7 @@ public void shouldRethrowOriginalExceptionFromUserFutureCompletion() assertThat(result.size(), is(0)); - assertThat(error.getMessage(), is("DONT PANIC")); + assertThat(((SimpleReactFailedStageException)error).getCause().getMessage(), is("DONT PANIC")); } @@ -132,12 +133,13 @@ public void shouldAbortWhenTargetFutureWantsToAbort() throws Exception { List result = new SimpleReact().react(() -> 1) - .withRetrier(executor).capture(e -> error = e) + .withRetrier(executor) + .capture(e -> error = e) .retry(serviceMock).block(); assertThat(result.size(), is(0)); - + error.printStackTrace(); assertThat(error, instanceOf(AbortRetryException.class)); } @@ -147,7 +149,7 @@ public void shouldRethrowExceptionThatWasThrownFromUserTaskBeforeReturningFuture error = null; final RetryExecutor executor = new AsyncRetryExecutor(schedulerMock) - .abortOn(IllegalArgumentException.class); + .abortIf(t-> t.getCause().getClass().isAssignableFrom(IllegalArgumentException.class)); given(serviceMock.apply(anyInt())).willThrow( new IllegalArgumentException("DONT PANIC")); @@ -158,8 +160,9 @@ public void shouldRethrowExceptionThatWasThrownFromUserTaskBeforeReturningFuture assertThat(result.size(), is(0)); - assertThat(error, instanceOf(IllegalArgumentException.class)); - assertThat(error.getMessage(), is("DONT PANIC")); + error.printStackTrace(); + assertThat(error.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(error.getCause().getMessage(), is("DONT PANIC")); } diff --git a/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java b/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java index 234da92b95..28bc27335d 100644 --- a/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java +++ b/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java @@ -26,6 +26,11 @@ public void testReact(){ List result = pool.react( (er) -> er.react(()->"hello",()->"world").block() ); assertThat(result.size(),is(2)); } + + + + + @Test public void testRoundRobin(){ EagerReact react1 = mock(EagerReact.class); @@ -40,6 +45,16 @@ public void testRoundRobin(){ verify(react1,times(1)).react(suppliers); verify(react2,times(1)).react(suppliers); } + + + + + + + + + + @Test public void testElastic(){