Skip to content

Acting on Futures (actOnFutures operator)

johnmcclean-aol edited this page Oct 13, 2015 · 6 revisions

The actOnFutures Operator ensures that the next operation is performed directly on the Stream of underlying Futures. The standard behaviour is act on results.

Comparing zipWithIndex

The normal behaviour of zipWithIndex is to act on the result of the previous stage.

Where the result is already available

LazyFutureStream.of("a","b","c","d")
                .zipWithIndex()
                .forEach(System.out::println)


["a",1l]
["b",2l]
["c",3l]
["d",4l]

Where the result is realised asyncrhonously

If the values are not already present, but computed or loaded asyncrhonously then completion order will determine index assigned to each result.

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
                .zipWithIndex()
                .forEach(System.out::println)


["c",1l]   <-- c completes first
["b",2l]   <-- b completes second 
["a",3l]   <-- a completes third
["d",4l]   <-- d completes fourth

Where the result is realised asyncrhonously and actOnFutures is used

When actOnFutures is used the index represents the index of the future task (and therefore the original order, or the order of the Futures input into that stage).

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
                .actOnFutures()
                .zipWithIndex()
                .forEach(System.out::println)


["c",3l]   <-- c completes first
["b",2l]   <-- b completes second 
["a",1l]   <-- a completes third
["d",4l]   <-- d completes fourth

Multithreaded reduction

The standard behaviour for reduction (or collection / mutable reduction) in simple-react is that occurs on a single thread. There is an option to batch results and process them in parallel. An alternative to both mechanisms is to make use of the CompletableFuture api to involve multiple threads during reduction.

Methods such as thenCombine or allOf can be used to perform reduction operations on the processing threads (or even resubmit a task to the configured executor, if the async api methods are used).

new LazyReact().react(()->load("a"),()->load("b"),()->load("c"),()->load("d"))
               .actOnFutures()
               .reduce((future1,future2)-> future1.thenCombine(future2, (f1,f2)-> f1+","+f2))

actOnFutures operators

Reverse

Reverse can (efficiently for some Streams) reverse the order of elements in a Stream. When called in actOnFutures the order of the underlying Futures is simply reversed.

 public LazyFutureStream<T> reverse()

LazyFutureStream.of(1, 2, 3)
                .actOnFutures()
                .reverse()
	        .toList();
	        						
//3,2,1   						

Cycle

Cycling repeats a Stream a set number of times (or infinitely). When called in actOnFutures, it is the Stream of underlying futures that is cycled (resulting eventually in a Stream of populated results from those futures).

public LazyFutureStream<T> cycle(int times)

LazyFutureStream.of(1,2,2)
                .actOnFutures()
                .cycle(3)
                .collect(Collectors.toList())
//1,2,2,1,2,2,1,2,2
public LazyFutureStream<T> cycle(int times)

LazyFutureStream.of(1,2,2)
                .actOnFutures()
                .cycle()
                .limit(6)
                .collect(Collectors.toList())

//1,2,2,1,2,2

Copying (duplicate, triplicate, quadriplicate) Streams

A Stream can be lazily duplicated using the copy related operators. When used via actOnFutures the underlying Stream of Futures is duplicated, with Future instances being shared across Streams.

Tuple2<LazyFutureStream<T>, LazyFutureStream<T>> duplicate()

Tuple2<LazyFutureStream<<Integer>, LazyFutureStream<<Integer>> copies = LazyFutureStream.of(1, 2, 3, 4, 5, 6)
                                                                                .actOnFutures()
                                                                                .duplicate();
               

//[1, 2, 3, 4, 5, 6], [1, 2, 3, 4, 5, 6]
Tuple3<LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>> triplicate()
Tuple4<LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>, LazyFutureStream<T>> quadruplicate() 
Clone this wiki locally