From 1c2c982ce3d4efe31cb6ff69ad0d83a67b36eb81 Mon Sep 17 00:00:00 2001 From: John McClean Date: Tue, 24 Mar 2015 23:30:20 +0000 Subject: [PATCH 1/3] fix for EagerFutureStream xxxFutures methods should use fromStreamCompletableFutures #17 --- .../react/stream/eager/EagerFutureStream.java | 6 +- .../react/stream/simple/SimpleReact.java | 4 + .../react/stream/traits/FutureStream.java | 11 +- .../stream/traits/SimpleReactStream.java | 6 + .../aol/simple/react/base/BaseSeqTest.java | 13 +- .../com/aol/simple/react/eager/EagerTest.java | 25 +- .../com/aol/simple/react/lazy/LazyTest.java | 90 +++-- .../com/aol/simple/react/lazy/Tutorial.java | 342 ++++++++++++++++++ .../simple/react/threads/ReactPoolTest.java | 15 + 9 files changed, 477 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java index e35c6f1a39..83e4fbc93e 100644 --- a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java @@ -559,7 +559,6 @@ default EagerFutureStream anyOf(Function fn) { */ @Override default EagerFutureStream fromStream(Stream stream) { - return (EagerFutureStream) FutureStream.super.fromStream(stream); } @@ -836,7 +835,8 @@ default EagerFutureStream> zipFuturesWithIndex() { Seq seq = Seq.seq(getLastActive().stream().iterator()).zipWithIndex(); Seq,Long>> withType = (Seq,Long>>)seq; - Stream futureStream = fromStream(withType.map(t -> t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))).map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t -> t.v1.thenApply(v -> + Tuple.tuple(t.v1.join(),t.v2)))); return (EagerFutureStream>)futureStream; } @@ -1203,7 +1203,7 @@ public static EagerReact parallelBuilder(int parallelism) { * * @see ThreadPools#getStandard() see RetryBuilder#getDefaultInstance() */ - public static EagerReact paraellelCommonBuilder() { + public static EagerReact parallelCommonBuilder() { return EagerReact .builder() .executor(ThreadPools.getStandard()) diff --git a/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java b/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java index 327eccce0e..54bb98ae83 100644 --- a/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java +++ b/src/main/java/com/aol/simple/react/stream/simple/SimpleReact.java @@ -24,6 +24,7 @@ import com.aol.simple.react.stream.InfiniteProcessingException; import com.aol.simple.react.stream.MissingValue; import com.aol.simple.react.stream.ThreadPools; +import com.aol.simple.react.stream.traits.FutureStream; import com.aol.simple.react.stream.traits.SimpleReactStream; import com.google.common.annotations.VisibleForTesting; import com.nurkiewicz.asyncretry.RetryExecutor; @@ -316,6 +317,9 @@ private SimpleReact(ExecutorService executor, RetryExecutor retrier, this.retrier = retrier; this.eager = Optional.ofNullable(eager).orElse(true); } + + + } diff --git a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java index 4dbbff234b..7088c7be5f 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java @@ -67,8 +67,8 @@ default FutureStream> zipFutures(Stream other) { return zipFutures((FutureStream)other); Seq seq = Seq.seq(getLastActive().stream()).zip(Seq.seq(other)); Seq,R>> withType = (Seq,R>>)seq; - Stream futureStream = fromStream(withType.map(t ->t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))) - .map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t ->t.v1.thenApply(v -> Tuple.tuple(t.v1.join(),t.v2))) + ); return (FutureStream>)futureStream; @@ -86,8 +86,8 @@ default FutureStream> zipFutures(Stream other) { default FutureStream> zipFutures(FutureStream other) { Seq seq = Seq.seq(getLastActive().stream()).zip(Seq.seq(other.getLastActive().stream())); Seq,CompletableFuture>> withType = (Seq,CompletableFuture>>)seq; - Stream futureStream = fromStream(withType.map(t ->CompletableFuture.allOf(t.v1,t.v2).thenApply(v -> Tuple.tuple(t.v1.join(),t.v2.join()))) - .map(CompletableFuture::join)); + Stream futureStream = fromStreamCompletableFuture((Stream)withType.map(t ->CompletableFuture.allOf(t.v1,t.v2).thenApply(v -> Tuple.tuple(t.v1.join(),t.v2.join()))) + ); return (FutureStream>)futureStream; @@ -373,6 +373,7 @@ default FutureStream control(Function, Supplier> fn){ */ default FutureStream debounce(long time, TimeUnit unit) { Queue queue = toQueue(); + long timeNanos = unit.toNanos(time); Function, Supplier> fn = s -> { return () -> { @@ -383,7 +384,7 @@ default FutureStream debounce(long time, TimeUnit unit) { while(elapsedNanos>0){ result = Optional.of(s.get()); - elapsedNanos= unit.toNanos(time) - timer.getElapsedNanoseconds(); + elapsedNanos= timeNanos - timer.getElapsedNanoseconds(); } diff --git a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java index 4cae5f2567..fb2e6b7e71 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java @@ -159,6 +159,12 @@ default SimpleReactStream fromStream(Stream stream) { default SimpleReactStream fromStreamCompletableFuture( Stream> stream) { Stream noType = stream; + return (SimpleReactStream) this.withLastActive(getLastActive() + .withNewStream(noType)); + } + default SimpleReactStream fromStreamCompletableFutureReplace( + Stream> stream) { + Stream noType = stream; return (SimpleReactStream) this.withLastActive(getLastActive() .withStream(noType)); } diff --git a/src/test/java/com/aol/simple/react/base/BaseSeqTest.java b/src/test/java/com/aol/simple/react/base/BaseSeqTest.java index 423df63e9a..e44db6e37a 100644 --- a/src/test/java/com/aol/simple/react/base/BaseSeqTest.java +++ b/src/test/java/com/aol/simple/react/base/BaseSeqTest.java @@ -91,6 +91,11 @@ public void combine(){ assertThat(of(1,2,3,4,5,6).combineLatest(of(3)).collect(Collectors.toList()).size(),greaterThan(5)); } @Test + public void combineLatest(){ + + assertThat(of(1,2,3,4,5,6).combineLatest(react(()->3,()->value())).collect(Collectors.toList()).size(),greaterThan(5)); + } + @Test public void combineValues(){ assertTrue(of(1,2,3,4,5,6).combineLatest(of(3)).anyMatch(it-> it.v2==null)); //assertTrue(of(1,2,3,4,5,6).combine(of(3)).oneMatch(it-> it.v2==3)); @@ -495,10 +500,10 @@ public void testZipDifferingLength() { @Test public void testZipWithIndex() { - assertEquals(asList(), Seq.of().zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L)), Seq.of("a").zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L), tuple("b", 1L)), Seq.of("a", "b").zipWithIndex().toList()); - assertEquals(asList(tuple("a", 0L), tuple("b", 1L), tuple("c", 2L)), Seq.of("a", "b", "c").zipWithIndex().toList()); + assertEquals(asList(), of().zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L)), of("a").zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L), tuple("b", 1L)), of("a", "b").zipWithIndex().toList()); + assertEquals(asList(tuple("a", 0L), tuple("b", 1L), tuple("c", 2L)), of("a", "b", "c").zipWithIndex().toList()); } diff --git a/src/test/java/com/aol/simple/react/eager/EagerTest.java b/src/test/java/com/aol/simple/react/eager/EagerTest.java index 144263e6d6..8396b00a0b 100644 --- a/src/test/java/com/aol/simple/react/eager/EagerTest.java +++ b/src/test/java/com/aol/simple/react/eager/EagerTest.java @@ -1,15 +1,36 @@ package com.aol.simple.react.eager; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import java.util.stream.IntStream; + +import org.junit.Ignore; import org.junit.Test; import com.aol.simple.react.stream.eager.EagerFutureStream; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; public class EagerTest { + @Test @Ignore + public void jitter(){ + EagerFutureStream.parallelCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it*100) + .jitter(10l) + .peek(System.out::println) + .block(); + } + @Test @Ignore + public void jitterSequential(){ + EagerFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it*100) + .jitter(100000l) + .peek(System.out::println) + .runOnCurrent(); + } @Test public void doOnEach(){ String[] found = {""}; diff --git a/src/test/java/com/aol/simple/react/lazy/LazyTest.java b/src/test/java/com/aol/simple/react/lazy/LazyTest.java index 9b53e3c4f0..40db9f1773 100644 --- a/src/test/java/com/aol/simple/react/lazy/LazyTest.java +++ b/src/test/java/com/aol/simple/react/lazy/LazyTest.java @@ -1,40 +1,88 @@ package com.aol.simple.react.lazy; +import static org.junit.Assert.assertThat; import static java.util.Arrays.asList; -import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.Matchers.*; + import com.aol.simple.react.stream.lazy.LazyFutureStream; +import com.aol.simple.react.stream.traits.FutureStream; public class LazyTest { + + @Test @Ignore + public void debounce() { + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .debounce(100, TimeUnit.MILLISECONDS) + .peek(System.out::println) + .block().size()); + } + + @Test @Ignore + public void skipUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 50).then(this::sleep) + .peek(System.out::println); + assertThat( + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 100000)) + .skipUntil(stoppingStream).peek(System.out::println) + .block().size(), greaterThan(0)); + } + @Test - public void lazyReactStream(){ - LazyFutureStream.sequentialBuilder() - .react( ()-> 1 ) - .map(list -> 1+2) - .block(); + @Ignore + public void takeUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 100).then(this::sleep) + .peek(System.out::println); + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + // .peek(System.out::println) + .takeUntil(stoppingStream).peek(System.out::println).block() + .size()); } + + private boolean sleep(int i) { + + try { + Thread.sleep(i); + } catch (InterruptedException e) { + + e.printStackTrace(); + } + return true; + + } + + @Test + public void lazyReactStream() { + LazyFutureStream.sequentialBuilder().react(() -> 1).map(list -> 1 + 2) + .block(); + } + @Test - public void lazyParallel(){ - LazyFutureStream.parallelBuilder() - .react( ()-> 1 ) - .map(list -> 1+2) - .block(); + public void lazyParallel() { + LazyFutureStream.parallelBuilder().react(() -> 1).map(list -> 1 + 2) + .block(); } + @Test - public void lazyReactStreamList(){ - LazyFutureStream.sequentialBuilder() - .react( asList(()-> 1 )) - .map(list -> 1+2) - .block(); + public void lazyReactStreamList() { + LazyFutureStream.sequentialBuilder().react(asList(() -> 1)) + .map(list -> 1 + 2).block(); } + @Test - public void lazyParallelList(){ - LazyFutureStream.parallelBuilder() - .react( asList(()-> 1 )) - .map(list -> 1+2) - .block(); + public void lazyParallelList() { + LazyFutureStream.parallelBuilder().react(asList(() -> 1)) + .map(list -> 1 + 2).block(); } } diff --git a/src/test/java/com/aol/simple/react/lazy/Tutorial.java b/src/test/java/com/aol/simple/react/lazy/Tutorial.java index a2470cbf02..a4025dbb31 100644 --- a/src/test/java/com/aol/simple/react/lazy/Tutorial.java +++ b/src/test/java/com/aol/simple/react/lazy/Tutorial.java @@ -1,19 +1,361 @@ package com.aol.simple.react.lazy; +import static java.util.Arrays.asList; + +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Stack; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.Getter; + import org.junit.Ignore; import org.junit.Test; import com.aol.simple.react.async.QueueFactories; +import com.aol.simple.react.exceptions.SimpleReactFailedStageException; +import com.aol.simple.react.stream.eager.EagerFutureStream; +import com.aol.simple.react.stream.eager.EagerReact; import com.aol.simple.react.stream.lazy.LazyFutureStream; +import com.aol.simple.react.stream.simple.SimpleReact; +import com.aol.simple.react.stream.traits.FutureStream; +import com.aol.simple.react.threads.ReactPool; import com.aol.simple.react.threads.SequentialElasticPools; +@Ignore public class Tutorial { + + @SuppressWarnings("unchecked") + + + + @Test + public void zipByResults(){ + + LazyFutureStream a = LazyFutureStream.parallelBuilder(3).react(()->slowest(),()->fast(),()->slow()); + LazyFutureStream b = LazyFutureStream.sequentialBuilder().of(1,2,3,4,5,6); + + a.zip(b).forEach(System.out::println); + + + + + } + + + + + + @SuppressWarnings("unchecked") + + + + + + + + @Test + public void zipByFutures(){ + + EagerFutureStream a = EagerFutureStream.parallelBuilder(3).react(()->slowest(),()->fast(),()->slow()); + EagerFutureStream b = EagerFutureStream.sequentialBuilder().of(1,2,3,4,5,6); + + a.zipFutures(b).forEach(System.out::println); + + + + + } + + + + private String slowest(){ + sleep(2500); + return "slowestResult"; + } + private String slow(){ + sleep(100); + return "slowResult"; + } + private String fast(){ + return "fast"; + } + + + + + + + + + + + + + + + + + @Test + public void gettingStarted(){ + + List results = new SimpleReact() + .react(() -> readData("data1"), () -> readData("data2")) + .onFail(RuntimeException.class, this::loadFromDb) + .peek(System.out::println) + .then(this::processData) + .block(); + + } + + private String readData(String name) { + if(name.equals("data1")) + throw new RuntimeException(); + + else + return "hello world from file!"; + + } + private String processData(String data){ + return "processed : " + data; + } + private String loadFromDb(SimpleReactFailedStageException e){ + return "hello world from DB!"; + } + + + + + + + + + @Test + public void skipUntil(){ + FutureStream stoppingStream = LazyFutureStream.sequentialBuilder() + .react(()-> 1000) + .then(this::sleep) + .peek(System.out::println); + System.out.println(LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + //.peek(System.out::println) + .skipUntil(stoppingStream) + .peek(System.out::println) + .block().size()); + } + + + + private boolean sleep(int i) { + + try { + Thread.sleep(i); + } catch (InterruptedException e) { + + e.printStackTrace(); + } + return true; + + } + + + + + + + + + + + + + + @Test + public void jitter(){ + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it*100) + .jitter(10000000l) + .peek(System.out::println) + .runOnCurrent(); + } + + + + + + + @Test + public void fixedDelay(){ + + LazyFutureStream.sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .fixedDelay(1l,TimeUnit.SECONDS) + .peek(System.out::println) + .runOnCurrent(); + } + + + + + + + + @Test + public void elasticPool(){ + + List files = Arrays.asList("/tmp/1.data","/tmp/2.data"); + + List data = SequentialElasticPools.lazyReact.react(er -> er.reactToCollection(files) + .map(this::loadData) + .peek(System.out::println) + .map(this::saveData) + .collect(Collectors.toList())); + + System.out.println("Loaded and saved " + data.size()); + } + + + + + + + + @SuppressWarnings({ "unchecked", "rawtypes" }) + + + + @Test + public void testRoundRobin(){ + + EagerReact react1 = new EagerReact(new ForkJoinPool(4)); + EagerReact react2 = new EagerReact(new ForkJoinPool(4)); + + ReactPool pool = ReactPool.boundedPool(asList(react1,react2)); + + + Supplier[] suppliers = { ()->"hello",()->"world" }; + + pool.react( (er) -> er.react(suppliers) + .peek(it-> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + + pool.react( (er) -> er.react(suppliers) + .peek(it-> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + + + + } + + + + @Test + public void add100(){ + + + new SimpleReact().of(1,2,3,4,5) + .then(num -> num+100) + .then(num -> Thread.currentThread().getId()) + .peek(System.out::println); + + } + + + + + + + + + + + + Stack dataArray = new Stack(){{ + add( "{order:1000,{customer:604}}"); + add( "{order:1001,{customer:605}}"); + }}; + private String loadData(String file){ + sleep(1000); + return dataArray.pop(); + } + private Status saveData(String data){ + return new Status(); + } + + + + + + @Test + public void debounce(){ + LazyFutureStream.sequentialCommonBuilder() + .iterateInfinitely(0, it -> it + 1) + .debounce(100, TimeUnit.MILLISECONDS) + .peek(System.out::println) + .runOnCurrent(); + } + + + + + + + + + + + + + + + @Test + public void onePerSecond(){ + + + + + LazyFutureStream.sequentialCommonBuilder() + .iterateInfinitely(0, it -> it + 1) + .onePer(1, TimeUnit.SECONDS) + .map(seconds -> readStatus()) + .retry(this::saveStatus) + .peek(System.out::println) + .capture(Throwable::printStackTrace) + .block(); + + } + + + private String saveStatus(Status s){ + if(count++%2==0) + throw new RuntimeException(); + + return "Status saved:"+s.getId(); + } + + int count =0; + + + + + private Status readStatus() { + return new Status(); + } + static int nextId=1; + @Getter + class Status{ + long id = nextId++; + } + + + String status="ok"; /** * check status every second, batch every 10 secs diff --git a/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java b/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java index 234da92b95..28bc27335d 100644 --- a/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java +++ b/src/test/java/com/aol/simple/react/threads/ReactPoolTest.java @@ -26,6 +26,11 @@ public void testReact(){ List result = pool.react( (er) -> er.react(()->"hello",()->"world").block() ); assertThat(result.size(),is(2)); } + + + + + @Test public void testRoundRobin(){ EagerReact react1 = mock(EagerReact.class); @@ -40,6 +45,16 @@ public void testRoundRobin(){ verify(react1,times(1)).react(suppliers); verify(react2,times(1)).react(suppliers); } + + + + + + + + + + @Test public void testElastic(){ From f464fc8705c78d54b87db24f935ee43d2d8f639c Mon Sep 17 00:00:00 2001 From: John McClean Date: Wed, 25 Mar 2015 17:55:17 +0000 Subject: [PATCH 2/3] bug fixes and tutorial work --- .../react/stream/lazy/LazyFutureStream.java | 4 +- .../react/stream/traits/LazyStream.java | 3 +- .../react/stream/traits/LazyToQueue.java | 74 +- .../stream/traits/SimpleReactStream.java | 13 +- .../simple/react/base/BaseJDKStreamTest.java | 28 + .../aol/simple/react/lazy/LazySeqTest.java | 2 +- .../com/aol/simple/react/lazy/Tutorial.java | 633 +++++++++++------- .../aol/simple/react/simple/RetryTest.java | 15 +- 8 files changed, 466 insertions(+), 306 deletions(-) diff --git a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java index 19b5fd9055..61b24496bb 100644 --- a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java @@ -121,7 +121,7 @@ default LazyFutureStream> zipFuturesWithIndex() { * @return Two equivalent Streams */ default Tuple2, Seq> duplicateFuturesSeq() { - // unblocking impl + Stream stream = getLastActive().stream(); Tuple2>, Seq>> duplicated = Seq .seq((Stream>) stream).duplicate(); @@ -1096,7 +1096,7 @@ public static LazyReact sequentialBuilder() { .executor(new ForkJoinPool(1)) .retrier( RetryBuilder.getDefaultInstance().withScheduler( - Executors.newScheduledThreadPool(1))).build(); + Executors.newScheduledThreadPool(2))).build(); } /** diff --git a/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java b/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java index 70a7b331c0..c4f974fb1a 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/LazyStream.java @@ -38,7 +38,8 @@ default void run(ExecutorService e) { } default void run(ExecutorService e,Runnable r) { - new SimpleReact(e).react(() -> new Runner(r).run(getLastActive(),new EmptyCollector(getLazyCollector().getMaxActive()))); + new SimpleReact(e).react(() -> new Runner(r).run(getLastActive(), + new EmptyCollector(getLazyCollector().getMaxActive()))); } default void runThread(Runnable r) { diff --git a/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java b/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java index 805acd5133..a2983a1d88 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java +++ b/src/main/java/com/aol/simple/react/stream/traits/LazyToQueue.java @@ -9,17 +9,15 @@ import com.aol.simple.react.stream.BaseSimpleReact; import com.aol.simple.react.stream.lazy.LazyReact; -public interface LazyToQueue extends ToQueue{ +public interface LazyToQueue extends ToQueue { - - - - abstract SimpleReactStream allOf(final Collector collector, + abstract SimpleReactStream allOf(final Collector collector, final Function fn); - - - abstract SimpleReactStream then(final Function fn, ExecutorService exec); - abstract T getPopulator(); + + abstract SimpleReactStream then(final Function fn, + ExecutorService exec); + + abstract T getPopulator(); /** * Convert the current Stream to a SimpleReact Queue @@ -28,42 +26,44 @@ abstract SimpleReactStream allOf(final Collector collector, */ default Queue toQueue() { Queue queue = this.getQueueFactory().build(); - - LazyReact service = getPopulator(); - then(queue::offer,service.getExecutor()).runThread( - () -> {queue.close(); returnPopulator(service); }); - + LazyReact service = getPopulator(); + then(queue::offer, service.getExecutor()).runThread(() -> { + queue.close(); + returnPopulator(service); + }); + return queue; } - - default Queue toQueue(Function fn) { + + default Queue toQueue(Function fn) { Queue queue = fn.apply(this.getQueueFactory().build()); - - LazyReact service = getPopulator(); - then(queue::offer,service.getExecutor()).runThread( - () -> {queue.close(); returnPopulator(service); }); - + LazyReact service = getPopulator(); + then(queue::offer, service.getExecutor()).runThread(() -> { + queue.close(); + returnPopulator(service); + }); + return queue; } - default void toQueue(Map> shards, Function sharder) { - - - LazyReact service = getPopulator(); - then(it-> shards.get(sharder.apply(it)).offer(it),service.getExecutor()) - .capture(Throwable::printStackTrace) - .runThread( - () -> {shards.values().forEach(it->it.close()); returnPopulator(service); }); - - - + + default void toQueue(Map> shards, Function sharder) { + + LazyReact service = getPopulator(); + then(it -> shards.get(sharder.apply(it)).offer(it), + service.getExecutor()) + .runThread(() -> { + shards.values().forEach(it -> it.close()); + returnPopulator(service); + }); + } - - - abstract void returnPopulator(T service); - default U add(U value,Queue queue){ - if(!queue.add(value)) + + abstract void returnPopulator(T service); + + default U add(U value, Queue queue) { + if (!queue.add(value)) throw new RuntimeException(); return value; } diff --git a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java index fb2e6b7e71..6892208fa3 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/SimpleReactStream.java @@ -34,6 +34,7 @@ import com.aol.simple.react.stream.simple.SimpleReact; import com.aol.simple.react.stream.simple.SimpleReactStreamImpl; import com.nurkiewicz.asyncretry.RetryExecutor; +import com.nurkiewicz.asyncretry.policy.AbortRetryException; public interface SimpleReactStream extends LazyStream, @@ -138,13 +139,19 @@ default SimpleReactStream retry(final Function fn) { return (SimpleReactStream) this.withLastActive(getLastActive().permutate( getLastActive().stream().map( - (ft) -> ft.thenApplyAsync((res) -> BlockingStream.getSafe(getRetrier() - .getWithRetry(() -> fn.apply((U) res)),getErrorHandler()), + (ft) -> ft.thenApplyAsync(res -> + getRetrier().getWithRetry(()->SimpleReactStream.handleExceptions(fn).apply((U)res) ).join() + , + //BlockingStream.getSafe(getRetrier() + // .getWithRetry(() -> fn.apply((U) res)) + // ,getErrorHandler()), getTaskExecutor())), Collectors.toList())); } default SimpleReactStream fromStream(Stream stream) { + + return (SimpleReactStream) this.withLastActive(getLastActive() .withNewStream(stream.map(CompletableFuture::completedFuture))); } @@ -241,6 +248,8 @@ static Function handleExceptions(Function fn) { try { return fn.apply(input); } catch (Throwable t) { + if(t instanceof AbortRetryException)//special case for retry + throw t; throw new SimpleReactFailedStageException(input, t); } diff --git a/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java b/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java index 83155a22ae..85a1919849 100644 --- a/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java +++ b/src/test/java/com/aol/simple/react/base/BaseJDKStreamTest.java @@ -69,6 +69,7 @@ public void testFindAny(){ } @Test public void testDistinct(){ + assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()).size(),is(2)); assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()),hasItem(1)); assertThat(of(1,1,1,2,1).distinct().collect(Collectors.toList()),hasItem(2)); } @@ -178,5 +179,32 @@ public void testCount(){ assertThat(of(1,5,3,4,2).count(),is(5L)); } + @Test + public void collectSBB(){ + List list = of(1,2,3,4,5).collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + assertThat(list.size(),is(5)); + } + @Test + public void collect(){ + assertThat(of(1,2,3,4,5).collect(Collectors.toList()).size(),is(5)); + assertThat(of(1,1,1,2).collect(Collectors.toSet()).size(),is(2)); + } + @Test + public void testFilter(){ + assertThat(of(1,1,1,2).filter(it -> it==1).collect(Collectors.toList()).size(),is(3)); + } + @Test + public void testMap(){ + assertThat(of(1).map(it->it+100).collect(Collectors.toList()).get(0),is(101)); + } + Object val; + @Test + public void testPeek(){ + val = null; + of(1).map(it->it+100).peek(it -> val=it).collect(Collectors.toList()); + assertThat(val,is(101)); + } + + } diff --git a/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java b/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java index 6821c35681..92988f0465 100644 --- a/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java +++ b/src/test/java/com/aol/simple/react/lazy/LazySeqTest.java @@ -103,7 +103,7 @@ public void batchSinceLastReadIterator() throws InterruptedException{ } @Test public void batchSinceLastRead() throws InterruptedException{ - List cols = of(1,2,3,4,5,6).chunkSinceLastRead().peek(it->{sleep(50);}).collect(Collectors.toList()); + List cols = of(1,2,3,4,5,6).chunkSinceLastRead().peek(System.out::println).peek(it->{sleep(50);}).collect(Collectors.toList()); System.out.println(cols.get(0)); assertThat(cols.get(0).size(),is(6)); diff --git a/src/test/java/com/aol/simple/react/lazy/Tutorial.java b/src/test/java/com/aol/simple/react/lazy/Tutorial.java index a4025dbb31..63154ec2ac 100644 --- a/src/test/java/com/aol/simple/react/lazy/Tutorial.java +++ b/src/test/java/com/aol/simple/react/lazy/Tutorial.java @@ -1,22 +1,31 @@ package com.aol.simple.react.lazy; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; import java.util.Arrays; import java.util.Collection; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Stack; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.ToString; import org.junit.Ignore; import org.junit.Test; +import com.aol.simple.react.async.Queue; import com.aol.simple.react.async.QueueFactories; import com.aol.simple.react.exceptions.SimpleReactFailedStageException; import com.aol.simple.react.stream.eager.EagerFutureStream; @@ -26,76 +35,187 @@ import com.aol.simple.react.stream.traits.FutureStream; import com.aol.simple.react.threads.ReactPool; import com.aol.simple.react.threads.SequentialElasticPools; +import com.google.common.collect.ImmutableMap; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -@Ignore public class Tutorial { - @SuppressWarnings("unchecked") - - - @Test - public void zipByResults(){ - - LazyFutureStream a = LazyFutureStream.parallelBuilder(3).react(()->slowest(),()->fast(),()->slow()); - LazyFutureStream b = LazyFutureStream.sequentialBuilder().of(1,2,3,4,5,6); - + public void zipByResults() { + + LazyFutureStream a = LazyFutureStream.parallelBuilder(3).react( + () -> slowest(), () -> fast(), () -> slow()); + LazyFutureStream b = LazyFutureStream.sequentialBuilder().of( + 1, 2, 3, 4, 5, 6); + a.zip(b).forEach(System.out::println); - - - - + + } + + @SuppressWarnings("unchecked") + @Test + public void zipWithIndex() { + + LazyFutureStream.sequentialBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .zipWithIndex().forEach(System.out::println); + + } + + @SuppressWarnings("unchecked") + @Test + public void zipFuturesWithIndex() { + + EagerFutureStream.parallelBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .zipFuturesWithIndex().forEach(System.out::println); + + } + + @SuppressWarnings("unchecked") + @Test + public void combineLatest() { + LazyFutureStream + .parallelBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .combineLatest( + LazyFutureStream.sequentialBuilder().of(1, 2, 3, 4, 5, + 6)).forEach(System.out::println); + } + + @SuppressWarnings("unchecked") + @Test + public void withLatest() { + LazyFutureStream + .sequentialBuilder() + .react(() -> slowest(), () -> fast(), () -> slow()) + .withLatest( + LazyFutureStream.sequentialBuilder().of(1, 2, 3, 4, 5, + 6)).forEach(System.out::println); + } + + @SuppressWarnings("unchecked") + @Test + public void zipByFutures() { + + LazyFutureStream.parallelBuilder(3) + .react(() -> slowest(), () -> fast(), () -> slow()) + .flatMap(it -> it.chars().boxed()).forEach(System.out::println); + + EagerFutureStream a = EagerFutureStream.parallelBuilder(3) + .react(() -> slowest(), () -> fast(), () -> slow()); + EagerFutureStream b = EagerFutureStream.sequentialBuilder() + .of(1, 2, 3, 4, 5, 6); + + a.zipFutures(b).forEach(System.out::println); + + } + + private String slowest() { + sleep(2500); + return "slowestResult"; + } + + private String slow() { + sleep(100); + return "slowResult"; + } + + private String fast() { + return "fast"; } - @SuppressWarnings("unchecked") - - + @Test + public void errorHandling() { + AsyncRetryExecutor retrier = new AsyncRetryExecutor(Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors())). + retryOn(Throwable.class). + withMaxDelay(1_000). //1 seconds + withUniformJitter(). //add between +/- 100 ms randomly + withMaxRetries(1); + + List results =LazyFutureStream.sequentialBuilder().withRetrier(retrier) + .react(()->"new event1",()->"new event2") + .retry(this::unreliable) + .onFail(e->"default") + .peek(System.out::println) + .capture(Throwable::printStackTrace) + .block(); + + assertThat(results.size(),equalTo(2)); + + } + private String unreliable(Object o) { + throw new RuntimeException(); + + } + + @SuppressWarnings("unchecked") @Test - public void zipByFutures(){ + public void shard(){ + Map> shards = new HashMap<>(); + shards.put(0,new Queue<>()); + shards.put(1,new Queue<>()); + shards.put(2,new Queue<>()); - EagerFutureStream a = EagerFutureStream.parallelBuilder(3).react(()->slowest(),()->fast(),()->slow()); - EagerFutureStream b = EagerFutureStream.sequentialBuilder().of(1,2,3,4,5,6); - a.zipFutures(b).forEach(System.out::println); + Map> sharded = LazyFutureStream.sequentialBuilder() + .react(()->loadUserData()) + .flatMap(Collection::stream) + .shard(shards,user -> user.getUserId()%3); + System.out.println("First shard"); + sharded.get(0).forEach(System.out::println); + System.out.println("Second shard"); + sharded.get(1).forEach(System.out::println); + System.out.println("Third shard"); + sharded.get(2).forEach(System.out::println); } - private String slowest(){ - sleep(2500); - return "slowestResult"; - } - private String slow(){ - sleep(100); - return "slowResult"; - } - private String fast(){ - return "fast"; + @Test + public void filterMapReduceFlatMap(){ + int totalVisits = LazyFutureStream.sequentialBuilder() + .react(()->loadUserData()) + .flatMap(Collection::stream) + .filter(User::hasPurchased) + .map(User::getTotalVisits) + .reduce(0, (acc,next) -> acc+next); + + System.out.println("Total visits is : " + totalVisits); } - - - - - - - + @AllArgsConstructor + @ToString + @Getter + class User{ + boolean purchased; + int totalVisits; + final int userId = count++; + public boolean hasPurchased(){ + return purchased; + } + } + + private Collection loadUserData() { + return Arrays.asList(new User(true,102),new User(false,501),new User(true,14),new User(true,23),new User(false,3),new User(true,531),new User(false,56)); + } + @@ -103,313 +223,312 @@ private String fast(){ @Test - public void gettingStarted(){ - + public void gettingStarted() { + List results = new SimpleReact() .react(() -> readData("data1"), () -> readData("data2")) .onFail(RuntimeException.class, this::loadFromDb) .peek(System.out::println) .then(this::processData) .block(); - + } - + private String readData(String name) { - if(name.equals("data1")) + if (name.equals("data1")) throw new RuntimeException(); - + else return "hello world from file!"; - + } - private String processData(String data){ + + private String processData(String data) { return "processed : " + data; } - private String loadFromDb(SimpleReactFailedStageException e){ + + private String loadFromDb(SimpleReactFailedStageException e) { return "hello world from DB!"; } - - - - - - - @Test - public void skipUntil(){ - FutureStream stoppingStream = LazyFutureStream.sequentialBuilder() - .react(()-> 1000) - .then(this::sleep) - .peek(System.out::println); + public void skipUntil() { + FutureStream stoppingStream = LazyFutureStream + .sequentialBuilder().react(() -> 1000).then(this::sleep) + .peek(System.out::println); System.out.println(LazyFutureStream.sequentialCommonBuilder() - .fromPrimitiveStream(IntStream.range(0, 1000000)) - //.peek(System.out::println) - .skipUntil(stoppingStream) - .peek(System.out::println) - .block().size()); + .fromPrimitiveStream(IntStream.range(0, 1000000)) + // .peek(System.out::println) + .skipUntil(stoppingStream).peek(System.out::println).block() + .size()); } - - - + private boolean sleep(int i) { - + try { Thread.sleep(i); } catch (InterruptedException e) { - + e.printStackTrace(); } return true; - - } + } - - - - - - - - - - - @Test - public void jitter(){ + public void jitter() { LazyFutureStream.sequentialCommonBuilder() - .fromPrimitiveStream(IntStream.range(0, 1000000)) - .map(it -> it*100) - .jitter(10000000l) - .peek(System.out::println) - .runOnCurrent(); + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .map(it -> it * 100).jitter(10000000l) + .peek(System.out::println).runOnCurrent(); } - - - - - - + @Test - public void fixedDelay(){ - + public void fixedDelay() { + LazyFutureStream.sequentialCommonBuilder() - .fromPrimitiveStream(IntStream.range(0, 1000000)) - .fixedDelay(1l,TimeUnit.SECONDS) - .peek(System.out::println) - .runOnCurrent(); + .fromPrimitiveStream(IntStream.range(0, 1000000)) + .fixedDelay(1l, TimeUnit.SECONDS).peek(System.out::println) + .runOnCurrent(); } - - - - - - - + @Test - public void elasticPool(){ - - List files = Arrays.asList("/tmp/1.data","/tmp/2.data"); + public void elasticPool() { + + List files = Arrays.asList("/tmp/1.data", "/tmp/2.data"); + + List data = SequentialElasticPools.lazyReact.react(er -> er + .reactToCollection(files).map(this::loadData) + .peek(System.out::println).map(this::saveData) + .collect(Collectors.toList())); - List data = SequentialElasticPools.lazyReact.react(er -> er.reactToCollection(files) - .map(this::loadData) - .peek(System.out::println) - .map(this::saveData) - .collect(Collectors.toList())); - - System.out.println("Loaded and saved " + data.size()); + System.out.println("Loaded and saved " + data.size()); } - - - - - - - + @SuppressWarnings({ "unchecked", "rawtypes" }) - - - @Test - public void testRoundRobin(){ - + public void testRoundRobin() { + EagerReact react1 = new EagerReact(new ForkJoinPool(4)); - EagerReact react2 = new EagerReact(new ForkJoinPool(4)); - - ReactPool pool = ReactPool.boundedPool(asList(react1,react2)); - - - Supplier[] suppliers = { ()->"hello",()->"world" }; - - pool.react( (er) -> er.react(suppliers) - .peek(it-> System.out.println("Data is : " + it + " - " - + " Reactor is : " + System.identityHashCode(er)))); - - pool.react( (er) -> er.react(suppliers) - .peek(it-> System.out.println("Data is : " + it + " - " - + " Reactor is : " + System.identityHashCode(er)))); - - - + EagerReact react2 = new EagerReact(new ForkJoinPool(4)); + + ReactPool pool = ReactPool.boundedPool(asList(react1, + react2)); + + Supplier[] suppliers = { () -> "hello", () -> "world" }; + + pool.react((er) -> er.react(suppliers).peek( + it -> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + + pool.react((er) -> er.react(suppliers).peek( + it -> System.out.println("Data is : " + it + " - " + + " Reactor is : " + System.identityHashCode(er)))); + } - - - + @Test - public void add100(){ - - - new SimpleReact().of(1,2,3,4,5) - .then(num -> num+100) - .then(num -> Thread.currentThread().getId()) - .peek(System.out::println); - + public void add100() { + + new SimpleReact().of(1, 2, 3, 4, 5).then(num -> num + 100) + .then(num -> Thread.currentThread().getId()) + .peek(System.out::println); + } - - - - - - - - - - - - Stack dataArray = new Stack(){{ - add( "{order:1000,{customer:604}}"); - add( "{order:1001,{customer:605}}"); - }}; - private String loadData(String file){ + + Stack dataArray = new Stack() { + { + add("{order:1000,{customer:604}}"); + add("{order:1001,{customer:605}}"); + } + }; + + private String loadData(String file) { sleep(1000); return dataArray.pop(); } - private Status saveData(String data){ + + private Status saveData(String data) { return new Status(); } - - - - @Test - public void debounce(){ + public void debounce() { LazyFutureStream.sequentialCommonBuilder() - .iterateInfinitely(0, it -> it + 1) - .debounce(100, TimeUnit.MILLISECONDS) - .peek(System.out::println) - .runOnCurrent(); + .iterateInfinitely(0, it -> it + 1) + .debounce(100, TimeUnit.MILLISECONDS).peek(System.out::println) + .runOnCurrent(); } - - - - - - - - - - - - - - + @Test - public void onePerSecond(){ - - - - + public void onePerSecond() { + LazyFutureStream.sequentialCommonBuilder() - .iterateInfinitely(0, it -> it + 1) - .onePer(1, TimeUnit.SECONDS) - .map(seconds -> readStatus()) - .retry(this::saveStatus) - .peek(System.out::println) - .capture(Throwable::printStackTrace) - .block(); - + .iterateInfinitely(0, it -> it + 1).onePer(1, TimeUnit.SECONDS) + .map(seconds -> readStatus()).retry(this::saveStatus) + .peek(System.out::println).capture(Throwable::printStackTrace) + .block(); + } - - - private String saveStatus(Status s){ - if(count++%2==0) + + private String saveStatus(Status s) { + if (count++ % 2 == 0) throw new RuntimeException(); - - return "Status saved:"+s.getId(); + + return "Status saved:" + s.getId(); } - int count =0; - - - - + int count = 0; + private Status readStatus() { return new Status(); } - static int nextId=1; + + static int nextId = 1; + @Getter - class Status{ + class Status { long id = nextId++; } + String status = "ok"; - - String status="ok"; /** * check status every second, batch every 10 secs */ - @Test @Ignore - public void onePerSecondAndBatch(){ - List> collected = LazyFutureStream.sequentialCommonBuilder().reactInfinitely(()->status) - .withQueueFactory(QueueFactories.boundedQueue(1)) - .onePer(1, TimeUnit.SECONDS) - .batchByTime(10, TimeUnit.SECONDS) - .limit(15) - .block(); + @Test + @Ignore + public void onePerSecondAndBatch() { + List> collected = LazyFutureStream + .sequentialCommonBuilder().reactInfinitely(() -> status) + .withQueueFactory(QueueFactories.boundedQueue(1)) + .onePer(1, TimeUnit.SECONDS).batchByTime(10, TimeUnit.SECONDS) + .limit(15).block(); System.out.println(collected); } + /** * create a stream of time intervals in seconds */ - @Test - public void secondsTimeInterval(){ - List> collected = LazyFutureStream.sequentialCommonBuilder().iterateInfinitely(0, it -> it+1) - .withQueueFactory(QueueFactories.boundedQueue(1)) - .onePer(1, TimeUnit.SECONDS) - .peek(System.out::println) - .batchByTime(10, TimeUnit.SECONDS) - .peek(System.out::println) - .limit(15) - .block(); + @Test + public void secondsTimeInterval() { + List> collected = LazyFutureStream + .sequentialCommonBuilder().iterateInfinitely(0, it -> it + 1) + .withQueueFactory(QueueFactories.boundedQueue(1)) + .onePer(1, TimeUnit.SECONDS).peek(System.out::println) + .batchByTime(10, TimeUnit.SECONDS).peek(System.out::println) + .limit(15).block(); System.out.println(collected); } - @Test @Ignore - public void range(){ - List> collected = LazyFutureStream.sequentialCommonBuilder() - .fromPrimitiveStream(IntStream.range(0, 10)) - .batchBySize(5) - .block(); + + @Test + @Ignore + public void range() { + List> collected = LazyFutureStream + .sequentialCommonBuilder() + .fromPrimitiveStream(IntStream.range(0, 10)).batchBySize(5) + .block(); System.out.println(collected); } - - @Test @Ignore - public void executeRestCallInPool(){ - boolean success = SequentialElasticPools.eagerReact.react( er-> er.react(()->restGet()) - .map(Tutorial::transformData) - .then(Tutorial::saveToDb) - .first()); + + @Test + @Ignore + public void executeRestCallInPool() { + boolean success = SequentialElasticPools.eagerReact.react(er -> er + .react(() -> restGet()).map(Tutorial::transformData) + .then(Tutorial::saveToDb).first()); } - private static boolean saveToDb(Object o){ + + private static boolean saveToDb(Object o) { return true; } + private Object restGet() { // TODO Auto-generated method stub return null; } + private static Object transformData(Object o) { // TODO Auto-generated method stub return null; } + + + + @Test + public void batchBySize() { + + LazyFutureStream.parallelCommonBuilder() + .iterateInfinitely("", last->nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .batchBySize(10) + .onePer(1, TimeUnit.SECONDS) + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + individual)) + .forEach(this::save); + + } + + + @Test + public void batchByTime() { + + LazyFutureStream.parallelCommonBuilder() + .iterateInfinitely("", last->nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .batchByTime(1, TimeUnit.SECONDS) + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + individual)) + .forEach(this::save); + + } + + + @Test + public void chunkSinceLastRead() { + + LazyFutureStream.parallelCommonBuilder() + .iterateInfinitely("", last->nextFile()) + .map(this::readFileToString) + .map(this::parseJson) + .chunkSinceLastRead() + .peek(batch -> System.out.println("batched : " + batch)) + .map(this::processOrders) + .flatMap(Collection::stream) + .peek(individual -> System.out.println("Flattened : " + individual)) + .forEach(this::save); + + } + + + + + + private void save(Map map){ + + } + + private Collection processOrders(Collection input){ + sleep(100); + return input.stream().map(m -> ImmutableMap.of("processed",m)).collect(Collectors.toList()); + } + + private Map parseJson(String json){ + return ImmutableMap.of("id",count++,"type","order","date",new Date()); + } + + private String readFileToString(String name){ + return ""; + } + + private String nextFile() { + + return null; + } } diff --git a/src/test/java/com/aol/simple/react/simple/RetryTest.java b/src/test/java/com/aol/simple/react/simple/RetryTest.java index 0d30c0a1db..edb064ca65 100644 --- a/src/test/java/com/aol/simple/react/simple/RetryTest.java +++ b/src/test/java/com/aol/simple/react/simple/RetryTest.java @@ -25,6 +25,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import com.aol.simple.react.exceptions.SimpleReactFailedStageException; import com.aol.simple.react.stream.simple.SimpleReact; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.nurkiewicz.asyncretry.RetryExecutor; @@ -119,7 +120,7 @@ public void shouldRethrowOriginalExceptionFromUserFutureCompletion() assertThat(result.size(), is(0)); - assertThat(error.getMessage(), is("DONT PANIC")); + assertThat(((SimpleReactFailedStageException)error).getCause().getMessage(), is("DONT PANIC")); } @@ -132,12 +133,13 @@ public void shouldAbortWhenTargetFutureWantsToAbort() throws Exception { List result = new SimpleReact().react(() -> 1) - .withRetrier(executor).capture(e -> error = e) + .withRetrier(executor) + .capture(e -> error = e) .retry(serviceMock).block(); assertThat(result.size(), is(0)); - + error.printStackTrace(); assertThat(error, instanceOf(AbortRetryException.class)); } @@ -147,7 +149,7 @@ public void shouldRethrowExceptionThatWasThrownFromUserTaskBeforeReturningFuture error = null; final RetryExecutor executor = new AsyncRetryExecutor(schedulerMock) - .abortOn(IllegalArgumentException.class); + .abortIf(t-> t.getCause().getClass().isAssignableFrom(IllegalArgumentException.class)); given(serviceMock.apply(anyInt())).willThrow( new IllegalArgumentException("DONT PANIC")); @@ -158,8 +160,9 @@ public void shouldRethrowExceptionThatWasThrownFromUserTaskBeforeReturningFuture assertThat(result.size(), is(0)); - assertThat(error, instanceOf(IllegalArgumentException.class)); - assertThat(error.getMessage(), is("DONT PANIC")); + error.printStackTrace(); + assertThat(error.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(error.getCause().getMessage(), is("DONT PANIC")); } From 61fa70e5425a7d95a7db91197f00cc97f034c4ce Mon Sep 17 00:00:00 2001 From: John McClean Date: Thu, 26 Mar 2015 00:19:57 +0000 Subject: [PATCH 3/3] tutorial & bug fixes, fix issue with block for LazyFutureStream, fix Seq apis for Eager and Lazy Future Streams --- .../react/stream/eager/EagerFutureStream.java | 83 ++++++ .../simple/react/stream/eager/EagerReact.java | 3 +- .../react/stream/lazy/LazyFutureStream.java | 83 +++++- .../simple/react/stream/lazy/LazyReact.java | 4 +- .../react/stream/traits/BlockingStream.java | 6 +- .../react/stream/traits/FutureStream.java | 1 + .../com/aol/simple/react/lazy/Tutorial.java | 254 +++++++++++------- .../aol/simple/react/simple/AllOfTest.java | 28 +- 8 files changed, 355 insertions(+), 107 deletions(-) diff --git a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java index 83e4fbc93e..0bde13c8a6 100644 --- a/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/eager/EagerFutureStream.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -21,8 +22,14 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -54,6 +61,11 @@ */ public interface EagerFutureStream extends FutureStream, EagerToQueue { + + default EagerFutureStream map(Function mapper) { + return (EagerFutureStream)FutureStream.super.map(mapper); + } + /** * @return a Stream that batches all completed elements from this stream * since last read attempt into a collection @@ -1326,6 +1338,77 @@ static EagerFutureStream futureStream(Iterator iterator) { spliteratorUnknownSize(iterator, ORDERED), false)); } + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#parallel() + */ + @Override + default EagerFutureStream parallel() { + return this; + } + + @Override + default EagerFutureStream stream() { + return (EagerFutureStream)FutureStream.super.stream(); + } + + + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#unordered() + */ + @Override + default EagerFutureStream unordered() { + return this; + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#onClose(java.lang.Runnable) + */ + @Override + default EagerFutureStream onClose(Runnable closeHandler) { + + return (EagerFutureStream)FutureStream.super.onClose(closeHandler); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted() + */ + @Override + default EagerFutureStream sorted() { + return (EagerFutureStream)fromStream(FutureStream.super.sorted()); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted(java.util.Comparator) + */ + @Override + default EagerFutureStream sorted(Comparator comparator) { + return (EagerFutureStream)fromStream(FutureStream.super.sorted(comparator)); + } + + /** + * Give a function access to the current stage of a SimpleReact Stream + * + * @param consumer + * Consumer that will recieve current stage + * @return Self (current stage) + */ + default EagerFutureStream self(Consumer> consumer) { + return ( EagerFutureStream)FutureStream.super.self(consumer); + } + + + } diff --git a/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java b/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java index dd61e8a83a..728ff60773 100644 --- a/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java +++ b/src/main/java/com/aol/simple/react/stream/eager/EagerReact.java @@ -146,7 +146,8 @@ public EagerFutureStream of(U... array) { return (EagerFutureStream)super.of(array); } - public EagerFutureStream react(final Supplier... actions) { + @SafeVarargs + public final EagerFutureStream react(final Supplier... actions) { return (EagerFutureStream)super.reactI(actions); diff --git a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java index 61b24496bb..d3f2d04f0c 100644 --- a/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/lazy/LazyFutureStream.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,6 +38,7 @@ import com.aol.simple.react.collectors.lazy.LazyResultConsumer; import com.aol.simple.react.exceptions.SimpleReactFailedStageException; import com.aol.simple.react.stream.CloseableIterator; +import com.aol.simple.react.stream.MissingValue; import com.aol.simple.react.stream.StreamWrapper; import com.aol.simple.react.stream.ThreadPools; import com.aol.simple.react.stream.traits.FutureStream; @@ -72,6 +74,10 @@ LazyFutureStream withErrorHandler( LazyFutureStream withLastActive(StreamWrapper streamWrapper); + + default LazyFutureStream map(Function mapper) { + return (LazyFutureStream)FutureStream.super.map(mapper); + } /** * Zip this Stream with an index, but Zip based on the underlying tasks, not completed results. * @@ -188,7 +194,7 @@ default LazyFutureStream> chunkSinceLastRead() { * e.g. * * - * EagerFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new + * LazyFutureStream.of(10,20,25,30,41,43).shard(ImmutableMap.of("even",new * Queue(),"odd",new Queue(),element-> element%2==0? "even" : "odd"); * * @@ -332,7 +338,7 @@ default LazyFutureStream jitter(long jitterInNanos) { * Stream. Note this doesn't neccessarily imply a fixed delay between * element creation (although it may do). e.g. * - * EagerFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours); + * LazyFutureStream.of(1,2,3,4).fixedDelay(1,TimeUnit.hours); * * Will emit 1 on start, then 2 after an hour, 3 after 2 hours and so on. * @@ -456,6 +462,7 @@ default LazyFutureStream> withLatest(FutureStream s) { * @param futureStreams Streams to race * @return First Stream to start emitting values */ + @SafeVarargs static LazyFutureStream firstOf(LazyFutureStream... futureStreams) { return (LazyFutureStream) FutureStream.firstOf(futureStreams); } @@ -1349,5 +1356,77 @@ public T next() { return Seq.seq(new LimitUntil()); } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#parallel() + */ + @Override + default LazyFutureStream parallel() { + return this; + } + + @Override + default LazyFutureStream stream() { + return (LazyFutureStream)FutureStream.super.stream(); + } + + + + + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#unordered() + */ + @Override + default LazyFutureStream unordered() { + return this; + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#onClose(java.lang.Runnable) + */ + @Override + default LazyFutureStream onClose(Runnable closeHandler) { + + return (LazyFutureStream)FutureStream.super.onClose(closeHandler); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted() + */ + @Override + default LazyFutureStream sorted() { + return (LazyFutureStream)fromStream(FutureStream.super.sorted()); + } + + /* + * (non-Javadoc) + * + * @see org.jooq.lambda.Seq#sorted(java.util.Comparator) + */ + @Override + default LazyFutureStream sorted(Comparator comparator) { + return (LazyFutureStream)fromStream(FutureStream.super.sorted(comparator)); + } + + /** + * Give a function access to the current stage of a SimpleReact Stream + * + * @param consumer + * Consumer that will recieve current stage + * @return Self (current stage) + */ + default LazyFutureStream self(Consumer> consumer) { + return ( LazyFutureStream)FutureStream.super.self(consumer); + } + } diff --git a/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java b/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java index 06d59ccfc6..89ad9d5ac6 100644 --- a/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java +++ b/src/main/java/com/aol/simple/react/stream/lazy/LazyReact.java @@ -88,7 +88,8 @@ public LazyFutureStream fromStream( return (LazyFutureStream)super.fromStream(stream); } - public LazyFutureStream react(final Supplier... actions) { + @SafeVarargs + public final LazyFutureStream react(final Supplier... actions) { return (LazyFutureStream)super.reactI(actions); @@ -114,6 +115,7 @@ public LazyFutureStream fromStreamWithoutFutures(Stream stream) { * @return EagerFutureStream * @see com.aol.simple.react.stream.BaseSimpleReact#fromStreamWithoutFutures(java.util.stream.Stream) */ + @SuppressWarnings("unchecked") @Override public LazyFutureStream fromPrimitiveStream(IntStream stream) { diff --git a/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java b/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java index c4b782c3b9..03425bfad2 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/BlockingStream.java @@ -9,6 +9,7 @@ import java.util.function.Predicate; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.aol.simple.react.blockers.Blocker; import com.aol.simple.react.collectors.ReactCollector; @@ -103,7 +104,10 @@ default R block(final Collector collector) { @SuppressWarnings({ "rawtypes", "unchecked" }) default R block(final Collector collector, final StreamWrapper lastActive) { - return (R) lastActive.stream().map((future) -> { + Stream stream = lastActive.stream(); + if(!isEager()) + stream = lastActive.stream().collect(Collectors.toList()).stream(); + return (R) stream.map((future) -> { return (U) getSafe(future,getErrorHandler()); }).filter(v -> v != MissingValue.MISSING_VALUE).collect(collector); } diff --git a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java index 7088c7be5f..5ad77dd117 100644 --- a/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java +++ b/src/main/java/com/aol/simple/react/stream/traits/FutureStream.java @@ -730,6 +730,7 @@ public R next() { return Seq.seq(new Zip()).filter(next->!(next instanceof Optional)); } + static Seq skipUntil(FutureStream left, FutureStream right) { diff --git a/src/test/java/com/aol/simple/react/lazy/Tutorial.java b/src/test/java/com/aol/simple/react/lazy/Tutorial.java index 63154ec2ac..2f74975a62 100644 --- a/src/test/java/com/aol/simple/react/lazy/Tutorial.java +++ b/src/test/java/com/aol/simple/react/lazy/Tutorial.java @@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableMap; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; +@Ignore public class Tutorial { @SuppressWarnings("unchecked") @@ -125,112 +126,168 @@ private String slow() { private String fast() { return "fast"; } - - - - - + @Test - public void errorHandling() { - AsyncRetryExecutor retrier = new AsyncRetryExecutor(Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors())). - retryOn(Throwable.class). - withMaxDelay(1_000). //1 seconds - withUniformJitter(). //add between +/- 100 ms randomly - withMaxRetries(1); - - List results =LazyFutureStream.sequentialBuilder().withRetrier(retrier) - .react(()->"new event1",()->"new event2") - .retry(this::unreliable) - .onFail(e->"default") - .peek(System.out::println) - .capture(Throwable::printStackTrace) + public void errorHandling() { + AsyncRetryExecutor retrier = new AsyncRetryExecutor( + Executors.newScheduledThreadPool(Runtime.getRuntime() + .availableProcessors())).retryOn(Throwable.class) + .withMaxDelay(1_000). // 1 seconds + withUniformJitter(). // add between +/- 100 ms randomly + withMaxRetries(1); + + List results = LazyFutureStream.sequentialBuilder() + .withRetrier(retrier) + .react(() -> "new event1", () -> "new event2") + .retry(this::unreliable).onFail(e -> "default") + .peek(System.out::println).capture(Throwable::printStackTrace) .block(); - - assertThat(results.size(),equalTo(2)); + + assertThat(results.size(), equalTo(2)); } - - - - + private String unreliable(Object o) { - throw new RuntimeException(); + throw new RuntimeException(); } @SuppressWarnings("unchecked") - @Test - public void shard(){ - Map> shards = new HashMap<>(); - shards.put(0,new Queue<>()); - shards.put(1,new Queue<>()); - shards.put(2,new Queue<>()); - - - Map> sharded = LazyFutureStream.sequentialBuilder() - .react(()->loadUserData()) - .flatMap(Collection::stream) - .shard(shards,user -> user.getUserId()%3); - - + public void shard() { + Map> shards = new HashMap<>(); + shards.put(0, new Queue<>()); + shards.put(1, new Queue<>()); + shards.put(2, new Queue<>()); + + Map> sharded = LazyFutureStream + .sequentialBuilder().react(() -> loadUserData()) + .flatMap(Collection::stream) + .shard(shards, user -> user.getUserId() % 3); + System.out.println("First shard"); sharded.get(0).forEach(System.out::println); - + System.out.println("Second shard"); sharded.get(1).forEach(System.out::println); - + System.out.println("Third shard"); sharded.get(2).forEach(System.out::println); } - + + @Test + public void firstOf(){ + + LazyFutureStream stream1 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromDb()) + .map(this::convertToStandardFormat); + + LazyFutureStream stream2 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromService1()) + .map(this::convertToStandardFormat); + + LazyFutureStream stream3 = LazyFutureStream.sequentialBuilder() + .react(() -> loadFromService2()) + .map(this::convertToStandardFormat); + + LazyFutureStream.firstOf(stream1, stream2, stream3) + .peek(System.out::println) + .map(this::saveData) + .runOnCurrent(); + + + } @Test - public void filterMapReduceFlatMap(){ - int totalVisits = LazyFutureStream.sequentialBuilder() - .react(()->loadUserData()) - .flatMap(Collection::stream) - .filter(User::hasPurchased) - .map(User::getTotalVisits) - .reduce(0, (acc,next) -> acc+next); + public void anyOf(){ - System.out.println("Total visits is : " + totalVisits); + + + LazyFutureStream.parallelBuilder(8).react(() -> loadFromDb(),() -> loadFromService1(), + () -> loadFromService2()) + .map(this::convertToStandardFormat) + .peek(System.out::println) + .map(this::saveData) + .block(); + + + } + + private String convertToStandardFormat(String input){ + if(count++%2==0){ + System.out.println("sleeping!" + input); + sleep(1000); + } + return "converted " + input; + } + private String loadFromDb(){ + + return "from db"; + } + private String loadFromService1(){ + + return "from service1"; + } + private String loadFromService2(){ + return "from service2"; } + + + @Test + public void allOf(){ + LazyFutureStream.sequentialBuilder().react(()->1,()->2,()->3) + .map(it->it+100) + .peek(System.out::println) + .allOf(c-> ImmutableMap.of("numbers",c)) + .peek(System.out::println) + .block(); + } + + + + + + + @Test + public void filterMapReduceFlatMap() { + int totalVisits = LazyFutureStream.sequentialBuilder() + .react(() -> loadUserData()).flatMap(Collection::stream) + .filter(User::hasPurchased).map(User::getTotalVisits) + .reduce(0, (acc, next) -> acc + next); + + System.out.println("Total visits is : " + totalVisits); + } + @AllArgsConstructor @ToString @Getter - class User{ + class User { boolean purchased; int totalVisits; final int userId = count++; - public boolean hasPurchased(){ + + public boolean hasPurchased() { return purchased; } } private Collection loadUserData() { - return Arrays.asList(new User(true,102),new User(false,501),new User(true,14),new User(true,23),new User(false,3),new User(true,531),new User(false,56)); + return Arrays.asList(new User(true, 102), new User(false, 501), + new User(true, 14), new User(true, 23), new User(false, 3), + new User(true, 531), new User(false, 56)); } - - - - - - @Test public void gettingStarted() { List results = new SimpleReact() .react(() -> readData("data1"), () -> readData("data2")) .onFail(RuntimeException.class, this::loadFromDb) - .peek(System.out::println) - .then(this::processData) - .block(); + .peek(System.out::println).then(this::processData).block(); } @@ -453,13 +510,12 @@ private static Object transformData(Object o) { return null; } - - @Test public void batchBySize() { - - LazyFutureStream.parallelCommonBuilder() - .iterateInfinitely("", last->nextFile()) + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) .map(this::readFileToString) .map(this::parseJson) .batchBySize(10) @@ -467,68 +523,66 @@ public void batchBySize() { .peek(batch -> System.out.println("batched : " + batch)) .map(this::processOrders) .flatMap(Collection::stream) - .peek(individual -> System.out.println("Flattened : " + individual)) - .forEach(this::save); + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); } - - + @Test public void batchByTime() { - - LazyFutureStream.parallelCommonBuilder() - .iterateInfinitely("", last->nextFile()) + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) .map(this::readFileToString) .map(this::parseJson) .batchByTime(1, TimeUnit.SECONDS) .peek(batch -> System.out.println("batched : " + batch)) .map(this::processOrders) .flatMap(Collection::stream) - .peek(individual -> System.out.println("Flattened : " + individual)) - .forEach(this::save); + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); } - - + @Test - public void chunkSinceLastRead() { - - LazyFutureStream.parallelCommonBuilder() - .iterateInfinitely("", last->nextFile()) + public void chunkSinceLastRead() { + + LazyFutureStream + .parallelCommonBuilder() + .iterateInfinitely("", last -> nextFile()) .map(this::readFileToString) .map(this::parseJson) .chunkSinceLastRead() .peek(batch -> System.out.println("batched : " + batch)) .map(this::processOrders) .flatMap(Collection::stream) - .peek(individual -> System.out.println("Flattened : " + individual)) - .forEach(this::save); + .peek(individual -> System.out.println("Flattened : " + + individual)).forEach(this::save); } - - - - - - private void save(Map map){ - + + private void save(Map map) { + } - - private Collection processOrders(Collection input){ + + private Collection processOrders(Collection input) { sleep(100); - return input.stream().map(m -> ImmutableMap.of("processed",m)).collect(Collectors.toList()); + return input.stream().map(m -> ImmutableMap.of("processed", m)) + .collect(Collectors.toList()); } - - private Map parseJson(String json){ - return ImmutableMap.of("id",count++,"type","order","date",new Date()); + + private Map parseJson(String json) { + return ImmutableMap.of("id", count++, "type", "order", "date", + new Date()); } - private String readFileToString(String name){ + private String readFileToString(String name) { return ""; } - + private String nextFile() { - + return null; } } diff --git a/src/test/java/com/aol/simple/react/simple/AllOfTest.java b/src/test/java/com/aol/simple/react/simple/AllOfTest.java index bf4a093c41..5782e2ffcf 100644 --- a/src/test/java/com/aol/simple/react/simple/AllOfTest.java +++ b/src/test/java/com/aol/simple/react/simple/AllOfTest.java @@ -1,25 +1,49 @@ package com.aol.simple.react.simple; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.Test; import com.aol.simple.react.extractors.Extractors; +import com.aol.simple.react.stream.lazy.LazyFutureStream; import com.aol.simple.react.stream.simple.SimpleReact; +import com.google.common.collect.ImmutableMap; public class AllOfTest { + @Test + public void allOf(){ + List>> result = new ArrayList<>(); + Supplier s = ()->result; + + LazyFutureStream.sequentialBuilder().react(()->1,()->2,()->3) + .map(it->it+100) + .peek(System.out::println) + .allOf(c-> { System.out.println(c);return ImmutableMap.of("numbers",c);}) + .peek(map -> System.out.println(map)) + .run(s); + + assertThat(result.size(),is(1)); + } + @Test public void testAllOfFailure(){ new SimpleReact().react(()-> { throw new RuntimeException();},()->"hello",()->"world")