Skip to content

A simple API, and a Rich API

johnmcclean-aol edited this page Aug 29, 2015 · 9 revisions

SimpleReact Provides 3 Streams

EagerFutureStream and LazyFutureStream get all the functionality in this page.

SimpleReactStream is a simpler, more focused API with only the SimpleReact Core API features.

SimpleReact Core API

The core SimpleReact API remains very simple. Although it has expanded slightly since the initial release it is today :-

  • with
  • then
  • doOnEach
  • retry
  • onFail
  • capture
  • block
  • allOf
  • anyOf
  • run
  • toQueue
  • flatMap
  • peek
  • filter
  • merge

These are the concurrent non-blocking operations (except for block!) that represent the core of the API.

#java.util.stream.Stream

With SimpleReact v0.3 we have also added all the methods of the Stream api to this 👍

  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • forEach(Consumer<? super T>)
  • forEachOrdered(Consumer<? super T>)
  • toArray()
  • toArray(IntFunction<A[]>)
  • reduce(T, BinaryOperator)
  • reduce(BinaryOperator)
  • reduce(U, BiFunction<U, ? super T, U>, BinaryOperator)
  • collect(Supplier, BiConsumer<R, ? super T>, BiConsumer<R, R>)
  • collect(Collector<? super T, A, R>)
  • min(Comparator<? super T>)
  • max(Comparator<? super T>)
  • count()
  • anyMatch(Predicate<? super T>)
  • allMatch(Predicate<? super T>)
  • noneMatch(Predicate<? super T>)
  • findFirst()
  • findAny()
  • builder()
  • empty()
  • of(T)
  • of(T...)
  • iterate(T, UnaryOperator)
  • generate(Supplier)
  • concat(Stream<? extends T>, Stream<? extends T>)

#org.jooq.lambda.Seq

And we have also implemented Seq, which adds the following functions

  • stream()
  • concat(Stream)
  • concat(T)
  • concat(T...)
  • cycle()
  • zip(Seq)
  • zip(Seq, BiFunction<T, U, R>)
  • zipWithIndex()
  • foldLeft(U, BiFunction<U, ? super T, U>)
  • foldRight(U, BiFunction<? super T, U, U>)
  • scanLeft(U, BiFunction<U, ? super T, U>)
  • scanRight(U, BiFunction<? super T, U, U>)
  • reverse()
  • shuffle()
  • shuffle(Random)
  • skipWhile(Predicate<? super T>)
  • skipUntil(Predicate<? super T>)
  • limitWhile(Predicate<? super T>)
  • limitUntil(Predicate<? super T>)
  • intersperse(T)
  • duplicate()
  • partition(Predicate<? super T>)
  • splitAt(long)
  • splitAtHead()
  • slice(long, long)
  • toCollection(Supplier)
  • toList()
  • toSet()
  • toMap(Function<T, K>, Function<T, V>)
  • toString(String)
  • minBy(Function<T, U>)
  • minBy(Function<T, U>, Comparator<? super U>)
  • maxBy(Function<T, U>)
  • maxBy(Function<T, U>, Comparator<? super U>)
  • ofType(Class)
  • cast(Class)
  • groupBy(Function<? super T, ? extends K>)
  • groupBy(Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join()
  • join(CharSequence)
  • join(CharSequence, CharSequence, CharSequence)
  • of(T)
  • of(T...)
  • empty()
  • iterate(T, UnaryOperator)
  • generate()
  • generate(T)
  • generate(Supplier)
  • seq(Stream)
  • seq(Iterable)
  • seq(Iterator)
  • seq(Map<K, V>)
  • seq(Optional)
  • cycle(Stream)
  • unzip(Stream<Tuple2<T1, T2>>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<T1, U1>, Function<T2, U2>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<Tuple2<T1, T2>, Tuple2<U1, U2>>)
  • unzip(Stream<Tuple2<T1, T2>>, BiFunction<T1, T2, Tuple2<U1, U2>>)
  • zip(Stream, Stream)
  • zip(Stream, Stream, BiFunction<T1, T2, R>)
  • zipWithIndex(Stream)
  • foldLeft(Stream, U, BiFunction<U, ? super T, U>)
  • foldRight(Stream, U, BiFunction<? super T, U, U>)
  • scanLeft(Stream, U, BiFunction<U, ? super T, U>)
  • scanRight(Stream, U, BiFunction<? super T, U, U>)
  • unfold(U, Function<U, Optional<Tuple2<T, U>>>)
  • reverse(Stream)
  • shuffle(Stream)
  • shuffle(Stream, Random)
  • concat(Stream...)
  • duplicate(Stream)
  • toString(Stream<?>)
  • toString(Stream<?>, String)
  • toCollection(Stream, Supplier)
  • toList(Stream)
  • toSet(Stream)
  • toMap(Stream<Tuple2<K, V>>)
  • toMap(Stream, Function<T, K>, Function<T, V>)
  • slice(Stream, long, long)
  • skip(Stream, long)
  • skipWhile(Stream, Predicate<? super T>)
  • skipUntil(Stream, Predicate<? super T>)
  • limit(Stream, long)
  • limitWhile(Stream, Predicate<? super T>)
  • limitUntil(Stream, Predicate<? super T>)
  • intersperse(Stream, T)
  • partition(Stream, Predicate<? super T>)
  • splitAt(Stream, long)
  • splitAtHead(Stream)
  • ofType(Stream, Class)
  • cast(Stream, Class)
  • groupBy(Stream, Function<? super T, ? extends K>)
  • groupBy(Stream, Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Stream, Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join(Stream<?>)
  • join(Stream<?>, CharSequence)
  • join(Stream<?>, CharSequence, CharSequence, CharSequence)
  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • onClose(Runnable)
  • close()
  • sequential()
  • parallel()
  • unordered()
  • spliterator()
  • forEach(Consumer<? super T>)

Additional operators (RxJava inspired)


zipping operators

  • combineLatest
  • withLatest

sharding operators :

  • shard (map, fn)

Control operators -

  • debounce
  • onePer
  • xPer
  • control (fn)
  • skipUntil (stream)
  • takeUntil (stream)
  • jitter
  • fixedDelay

Batching operators

  • batchBySize
  • batchByTime
  • batch (fn)

Chunking operators

  • chunkSinceLastRead
  • chunkSinceLastReadIterator

Futures operators

  • limitFutures

  • skipFutures

  • sliceFutures

  • duplicateFutures

  • partitionFutures

  • splitAtFutures

  • zipFutures

  • zipFuturesWithIndex

  • firstOf

batchBySizeAndTime : batches results until a time or size limit is reached

e.g. batch in 10's or whatever has returned within 5 seconds

               lazyReact.from(urls)
                        .map(this::load)
                        .batchBySizeAndTime(10,5,TimeUnit.SECONDS)
                        .toList();

switchOnNextValue : creates a new stream that takes the lasted value from a number of streams

        	LazyFutureStream<Integer> fast =  ... //  [1,2,3,4,5,6,7..]
        	LazyFutureStream<Integer> slow =  ... //  [100,200,300,400,500,600..]
	  
        	LazyFutureStream<Integer> merged = fast.switchOnNextValue(Stream.of(slow));  
        	//[1,2,3,4,5,6,7,8,100,9,10,11,12,13,14,15,16,200..] 

copy : copies a Stream the specified number of times

            LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.copy(5)
				.forEach(s -> System.out.println(s.toList()));

toLazyCollection : creates a Collection placeholder but doesn't block. EagerFutureStreams and SimpleReactStreams can populate the Collection asynchronously immediately and LazyFutureStreams won't populate it until a method is invoked

             Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.toLazyCollection();

toConcurrentLazyCollection : creates a lazy collection that can be shared across threads

           Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.toConcurrentLazyCollection(); 

firstValue : return the first value in a stream, must be present - no optional

           int first = LazyFutureStream.of(1,2,3,4)
                                        .firstValue();
	   
	    //first is 1

single : return a single entry, exception if no entries or multiple

          int num  =	LazyFutureStream.of(1)
                                        .single();
	  
	   //num is 1

futureOperations() & futureOperations(executor) - access terminal operations that return a future and execute asyncrhonously

          CompletableFuture<Integer>  sum = LazyFutureStream.of(1,2,3,4,5)
												.map(it -> it*100)
												.futureOperations()
												.reduce( 50,(acc,next) -> acc+next);

         //sum is CompletableFuture[1550]

sliding(size) : creates a sliding window over the data in the stream

          //futureStream of [1,2,3,4,5,6]
	 		 
	  List<List<Integer>> list = futureStream.sliding(2)
									.collect(Collectors.toList());
		
	
		assertThat(list.get(0),hasItems(1,2));
		assertThat(list.get(1),hasItems(2,3));

sliding(size,increment) creates a sliding window over the data in the stream

          //futureStream of [1,2,3,4,5,6,7,8]
	 		 
	   List<List<Integer>> list = futureStream.sliding(3,2)
									.collect(Collectors.toList());
		
	
		assertThat(list.get(0),hasItems(1,2,3));
		assertThat(list.get(1),hasItems(3,4,5));

cycle : repeat a Stream infinitely

          LazyFutureStream.of(1,2,2).cycle().limit(6)
								.collect(Collectors.toList()),
	 //[1,2,2,1,2,2]

cycle(times) : repeat a Stream set number of times

        LazyFutureStream.of(1,2,2).cycle(3)
								.collect(Collectors.toList());
	//[1,2,2,1,2,2,1,2,2]

cycleUntil : repeat Stream until condition holds

       LazyFutureStream.of(1,2,2,3).cycleUntil(next -> count++>10 )
											.collect(Collectors.toList());
       //[1, 2, 2, 3, 1, 2, 2, 3, 1, 2, 2]

cycleWhile : repeat Stream until condition doesn't hold

         LazyFutureStream.of(1,2,2).cycleWhile(next -> count++<6 )
											.collect(Collectors.toList()), 
        //[1,2,2,1,2,2]

New operators from jooλ 0.97

crossJoin : SQL Style cross join of two Streams

 LazyFutureStream.of(1, 2)
                               .crossJoin(LazyFutureStream.of("a", "b"))
//(tuple(1, "a"), tuple(1, "b"), tuple(2, "a"), tuple(2, "b"))

leftOuterJoin : SQL Style left outer join of two Streams

LazyFutureStream.of(1, 2, 3)
                             .leftOuterJoin(Seq.of(1, 2), t -> Objects.equals(t.v1, t.v2))
// (tuple(1, 1), tuple(2, 2), tuple(3, null))

rightOuterJoin : SQL Style right outer join of two Streams

LazyFutureStream.of(1, 2)
                             .rightOuterJoin(Seq.of(1, 2, 3), t -> Objects.equals(t.v1, t.v2))
// (tuple(1, 1), tuple(2, 2), tuple(null, 3))

innerJoin : SQL Style inner join of two Streams

LazyFutureStream.of(1, 2, 3)
                             .innerJoin(Stream.of(1, 2), t -> Objects.equals(t.v1, t.v2))
//// (tuple(1, 1), tuple(2, 2))

onEmpty : specify a default value for an empty Stream

LazyFutureStream.of()
                              .onEmpty(1)
                              .toList();
//[1]

onEmptyGet : specify a lazy default value for an empty Stream

LazyFutureStream.of()
                              .onEmptyGet(()->1)
                              .toList();
//[1]

onEmptyThrow : throw an exception when Stream is empty

LazyFutureStream.of()
                              .onEmptyThrow(()->new RuntimeTimeException())
                              .toList();
//RuntimeException
Clone this wiki locally