Releases: aol/cyclops
v0.99.7 of simple-react
New Feature Overview
Scheduling!
cron, fixedDelay or fixedRate
LazyFutureStream.generate( () -> "new job")
.map(this::process)
.schedule("0 * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.forEach(this::logToDb);
Pattern Matching!
Apply decision making seamlessly within a Stream
List<String> result = LazyFutureStream.of(1,2,3,4)
.capture(e->e.printStackTrace())
.patternMatch("",
c->c.hasValuesWhere( (Integer i)->i%2==0 ).then(i->"even"),
c->c.hasValuesWhere( (Integer i)->i%2!=0).then(i->"odd")
)
.toList();
assertThat(result,equalTo(Arrays.asList("odd","even","odd","even")));
For Comprehensions
Nested iteration over multiple Stream types.
LazyFutureStream.of(1,2,3)
.forEach2(a->IntStream.range(0,10),
a->b-> a+b)
.toList()
//List[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)]
singleOptional operator
//Optional[1]
LazyFutureStream.of(1).singleOptional();
//Optional.empty
LazyFutureStream.of().singleOpional();
//Optional.empty
LazyFutureStream.of(1,2,3).singleOptional();
The 0.99.7 release updates the version of the following dependencies
- cyclops to 7.2.0
- Agrona to 0.49
and fixes the following issues
- SimpleReact fromIterable should return SimpleReactStream
- Queue leaks memory through sizeSignal bug #92 opened 2 days ago by lisa-lionheart 0.99.7
- Add onEmptySwitch operator to LazyFutureStream enhancement #90 opened 2 days ago by johnmcclean-aol 0.99.7
- Add elapsed operator to LazyFutureStream enhancement #89 opened 2 days ago by johnmcclean-aol 0.99.7
- SimpleReact creation methods return the Base class rather than SimpleReactStream bug #88 opened 5 days ago by
- dependency upgrade : upgrade to agron 0.4.9 enhancement #86 opened 14 days ago by johnmcclean-aol 0.99.7
- New operators : schedule enhancement #78 opened 25 days ago by johnmcclean-aol 0.99.7
- New operators : for-comprehensions enhancement #77 opened 27 days ago by johnmcclean-aol 0.99.7
- New operators : subStream operator enhancement #76 opened 27 days ago by johnmcclean-aol 0.99.7
- New operators : pattern matching operators enhancement #75 opened 27 days ago by johnmcclean-aol 0.99.7
- New operators : permutations & combinations enhancement #74 opened 27 days ago by johnmcclean-aol 0.99.7
- Issue constructing a LazyFutureStream from a single CompletableFuture bug #72 opened on Dec 2, 2015 by johnmcclean-aol
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.7’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.7</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.7
v0.99.6 of simple-react
The 0.99.6 release updates the version of the following dependencies
- cyclops to 6.2.3
and fixes the following issues
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.6’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.5</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.6
v0.99.5 of simple-react
The 0.99.5 release updates the version of the following dependencies
- cyclops to 6.2.0
- agrona to 0.47
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.5’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.5</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.5
0.99.4 of simple-react : depedency version updates
The 0.99.4 release updates the version of the following dependencies
- cyclops to 6.0.3
- agrona to 0.45
The 0.99.4 release also fixes the following bugs
slice, scanLeft, scanRight creating a nested LazyFutureStream
LazyFutureStream#shuffle has the wrong method signature
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.4’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.4</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.4
v0.99.3 of simple-react : Cyclops integration
The 0.99.3 release integrates simple-react with cyclops-streams.
- simple-react provides a reactive-streams implementation for SequenceM from cyclops-streams
- EagerFutureStream has been removed
- SimpleReactStream takes over some of the old EagerFutureStream functionality
- LazyFutureStream extends SequenceM
The addition of SequenceM to the Stream hierarchy adds a raft of new operators (particularly for Stream manipulation and windowing / batching).
The 3 Stream types
- SimpleReactStream - eager stream of futures
- SequenceM - a fast, powerful, sequential reactive-streams implementation
- LazyFutureStream - powerful, lazy Stream of futures.
New features
auto-memoization (caching)
LazyFutureStreams can be configured to cache the results of operations at each stage. Caches can be pluggable so LRU or TTL based caches can be used.
Map cache = new ConcurrentHashMap<>();
LazyReact react = new LazyReact().autoMemoizeOn((key,fn)-> cache.computeIfAbsent(key,fn));
List result = react.of("data1","data1","data2","data2")
.map(i->calc(i))
.toList();
In the above example only 2 calculations will be performed, the cached results will be reused.
actOnFutures
Operations in LazyFutureStream that work directly on the underlying Futures have been moved to their own interface and are accessible via actOnFutures. A host of new Future based operations have been added, including batching, appending, zipping, combining, collection and reduction.
Operators on LazyFutureStream not invoked via actOnFutures, now always act on the results of the previous stage. Operations invoked via actOnFutures operate on the underlying Futures and are invoked in order of input.
Operations via actOnFutures are also likely to perform better.
Simple reduction example
CompletableFuture<Integer> sum = LazyFutureStream.of(1, 2, 3)
.actOnFutures()
.reduce(CompletableFuture.completedFuture(0),(cf1,cf2)-> cf1.thenCombine(cf2, (a,b)->a+b));
assertThat(sum.join(),equalTo(6));
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.3’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.3</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.3
v0.99.2 of simple-react
The 0.99.2 release fixes a number of minor issues.
- An undeclared runtime dependency on cyclops has been removed
- A bug fix for async reactive streams publisher support JDK 8 Stream
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.2’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.2</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.2
v0.99.1 of simple-react
The 0.99.1 release of simple-react is a staged release on the road to v1.0.0 that focused on performance enhancements.
Performance enhancements in simple-react v0.99.1
FastFuture
We've replaced JDK CompletableFutures in LazyFutureStreams with a custom Future implementation that is highly optimized for our use case, in particularly this Future predefines an immutable execution pipeline for a Stream once and shares it across Futures. This can increase throughput in a LazyFutureStream by over 2.5 times compared with previous versions. Our simple benchmark for throughput increased from 330 million identity functions applied to just over 800 million identity functions applied.
FastFuture pooling
For large long running Streams it is possible to configure LazyFutureStreams to reuse Future instances. This in turn decreases GC impact for an application, whereas in previous versions of simple-react throughput fell significantly from ~330 million identity functions per second to less than half that, for continuous processing with v0.99.1, throughput remains remarkably healthy at ~750 million identity functions applied per second (compared with short-burst capacity of just over 800 million per second).
LazyReact lazy = LazyReact.parallel(10)
.objectPoolingOn()
.autoOptimize();
lazy.range(0,2_000_000_000)
.map(this::loadById)
.map(this::process)
.forEach(this::save);
autoOptimize
The autoOptimize facility introduced in simple-react v0.99.1 helps to efficiently distribute your tasks over multiple threads. simple-react LazyFutureStreams are especially well suited for handling tasks related to blocking I/O, and autoOptmize makes it especially easy to, for example, make multiple rest calls (or load multiple files) and efficiently process the results.
fan out across threads with autoOptimize
Sequential execution enhancements
Wrapping objects in Futures is excellent for handling tasks that deliver data asynchronously, it imposes a penalty when processing data sequentially in a single thread. We've made some optimizations here to lower this overhead (the introduction of FastFuture and the shared pipeline helps, and Object pooling helps for large Streams). Performance reasonably similar to that of JDK Streams is achievable for small Streams performing operations that don't require writing to a Queue.
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.99.1’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.99.1</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.99.1
v.099 of simple-react
New in simple-react v0.99
- 20+ new core Operators
- Ability to run all terminal operations asynchronously (recieve a CompletableFuture)
- Current Thread based Executor - default in many situations
- Creational operator reorganisation
- Pluggable wait strategies for working with bounded Wait Free Queues
- Bug fixes
- Enhancements to EagerFutureStream behaviour on many operators
- Enhanced Reactive Streams support with the ability to publish and subcribe to simple-react Queues, Topics and Signals
- Reactive plugin migrated from Microserver to make using simple-react even easier
Details below, followed by how to get simple-react instructions.
New core operators
batchBySizeAndTime : batches results until a time or size limit is reached
e.g. batch in 10's or whatever has returned within 5 seconds
lazyReact.from(urls)
.map(this::load)
.batchBySizeAndTime(10,5,TimeUnit.SECONDS)
.toList();
switchOnNextValue : creates a new stream that takes the lasted value from a number of streams
LazyFutureStream<Integer> fast = ... // [1,2,3,4,5,6,7..]
LazyFutureStream<Integer> slow = ... // [100,200,300,400,500,600..]
LazyFutureStream<Integer> merged = fast.switchOnNextValue(Stream.of(slow));
//[1,2,3,4,5,6,7,8,100,9,10,11,12,13,14,15,16,200..]
copy : copies a Stream the specified number of times
LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.copy(5)
.forEach(s -> System.out.println(s.toList()));
toLazyCollection : creates a Collection placeholder but doesn't block. EagerFutureStreams and SimpleReactStreams can populate the Collection asynchronously immediately and LazyFutureStreams won't populate it until a method is invoked
Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.toLazyCollection();
toConcurrentLazyCollection : creates a lazy collection that can be shared across threads
Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.toConcurrentLazyCollection();
firstValue : return the first value in a stream, must be present - no optional
int first = LazyFutureStream.of(1,2,3,4)
.firstValue();
//first is 1
single : return a single entry, exception if no entries or multiple
int num = LazyFutureStream.of(1)
.single();
//num is 1
futureOperations() & futureOperations(executor) - access terminal operations that return a future and execute asyncrhonously
CompletableFuture<Integer> sum = LazyFutureStream.of(1,2,3,4,5)
.map(it -> it*100)
.futureOperations()
.reduce( 50,(acc,next) -> acc+next);
//sum is CompletableFuture[1550]
sliding(size) : creates a sliding window over the data in the stream
//futureStream of [1,2,3,4,5,6]
List<List<Integer>> list = futureStream.sliding(2)
.collect(Collectors.toList());
assertThat(list.get(0),hasItems(1,2));
assertThat(list.get(1),hasItems(2,3));
sliding(size,increment) creates a sliding window over the data in the stream
//futureStream of [1,2,3,4,5,6,7,8]
List<List<Integer>> list = futureStream.sliding(3,2)
.collect(Collectors.toList());
assertThat(list.get(0),hasItems(1,2,3));
assertThat(list.get(1),hasItems(3,4,5));
cycle : repeat a Stream infinitely
LazyFutureStream.of(1,2,2).cycle().limit(6)
.collect(Collectors.toList()),
//[1,2,2,1,2,2]
cycle(times) : repeat a Stream set number of times
LazyFutureStream.of(1,2,2).cycle(3)
.collect(Collectors.toList());
//[1,2,2,1,2,2,1,2,2]
cycleUntil : repeat Stream until condition holds
LazyFutureStream.of(1,2,2,3).cycleUntil(next -> count++>10 )
.collect(Collectors.toList());
//[1, 2, 2, 3, 1, 2, 2, 3, 1, 2, 2]
cycleWhile : repeat Stream until condition doesn't hold
LazyFutureStream.of(1,2,2).cycleWhile(next -> count++<6 )
.collect(Collectors.toList()),
//[1,2,2,1,2,2]
New operators from jooλ 0.97
crossJoin : SQL Style cross join of two Streams
LazyFutureStream.of(1, 2)
.crossJoin(LazyFutureStream.of("a", "b"))
//(tuple(1, "a"), tuple(1, "b"), tuple(2, "a"), tuple(2, "b"))
leftOuterJoin : SQL Style left outer join of two Streams
LazyFutureStream.of(1, 2, 3)
.leftOuterJoin(Seq.of(1, 2), t -> Objects.equals(t.v1, t.v2))
// (tuple(1, 1), tuple(2, 2), tuple(3, null))
rightOuterJoin : SQL Style right outer join of two Streams
LazyFutureStream.of(1, 2)
.rightOuterJoin(Seq.of(1, 2, 3), t -> Objects.equals(t.v1, t.v2))
// (tuple(1, 1), tuple(2, 2), tuple(null, 3))
innerJoin : SQL Style inner join of two Streams
LazyFutureStream.of(1, 2, 3)
.innerJoin(Stream.of(1, 2), t -> Objects.equals(t.v1, t.v2))
//// (tuple(1, 1), tuple(2, 2))
onEmpty : specify a default value for an empty Stream
LazyFutureStream.of()
.onEmpty(1)
.toList();
//[1]
onEmptyGet : specify a lazy default value for an empty Stream
LazyFutureStream.of()
.onEmptyGet(()->1)
.toList();
//[1]
onEmptyThrow : throw an exception when Stream is empty
LazyFutureStream.of()
.onEmptyThrow(()->new RuntimeTimeException())
.toList();
//RuntimeException
FutureOperations
Terminal operations can now all be called asynchronously (with and without parallel reduction if necessary). e.g.
CompletableFuture<Integer> size = LazyFutureStream.of(1,2,3,4).futureOperations().count();
Available operations
- public CompletableFuture<List> toList()
Asynchronously perform a mutable reduction to a JDK List
CompletableFuture<List<Data>> myList = EagerFutureStream.of(1,2,3,4)
.map(this::loadFromDb)
.withTaskExecutor(parallelBuilder().getExecutor())
.map(this::processOnDifferentExecutor)
.toList();
- public CompletableFuture<Set> toSet()
Asynchronously perform a mutable reduction to a JDK Set
CompletableFuture<Set<Data>> myList = LazyFutureStream.of(1,2,3,4)
.map(this::loadFromDb)
.withTaskExecutor(parallelBuilder().getExecutor())
.map(this::processOnDifferentExecutor)
.toSet();
- public <U extends Comparable> CompletableFuture<Optional> minBy(Function<T, U> function)
Asynchronously capture the minimum value in this stream using the provided function
CompletableFuture<Optional<Integer>> min = LazyFutureStream.of(1, 2, 3, 4, 5, 6)
.futureOperations()
.minBy(t -> Math.abs(t - 5));
//min CompletableFuture[Optional[5]] //5-5 =0
- public <U extends Comparable> CompletableFuture<Optional> maxBy(Function<T, U> function)
Asynchronously capture the maximum value in this stream using the provided function
CompletableFuture<Optional<Integer>> max = LazyFutureStream.of(1, 2, 3, 4, 5, 6)
.futureOperations()
.maxBy(t -> Math.abs(t - 5));
//min CompletableFuture[Optional[1]] //Math.abs(1-5) =4
- public <R, A> CompletableFuture collect(Collector<? super T, A, R> collector)
Asynchronously perform a Stream collection
CompletableFuture<List<Integer>> list = LazyFutureStream.of(1,2,3,4,5)
.futureOperations()
.collect(Collectors.toList());
//CompletableFuture[1,2,3,4,5]
- public CompletableFuture<Optional> reduce(BinaryOperator accumulator)
CompletableFuture<Optional<Integer>> sum = LazyFutureStream.of(1,2,3,4,5)
.map(it -> it*100).futureOperations()
.reduce( (acc,next) -> acc+next)
- public CompletableFuture<A[]> toArray(IntFunction<A[]> generator)
CompletableFuture<Integer[]> array = LazyFutureStream.of(1,5,3,4,2).futureOperations()
.toArray(it->new Integer[it]);
- public CompletableFuture<Object[]> toArray()
CompletableFuture<Integer[]> array = LazyFutureStream.of(1,5,3,4,2).futureOperations()
.toArray()
- public CompletableFuture<Map<K, List>> groupBy(Function<? super T, ? exten...
simple-react v0.98
New in simple-react v0.98
- Reactive Streams support for JDK 8 Streams (and by extension Jooλ.Seq and Cyclops.SequenceM)
- Syncrhonous Reactive Stream Publisher for LazyFutureStream
- Upgrade to Agrona 0.4.2, upgrade to Asyncretry 0.0.7
Reactive Streams Publisher
simple-react v0.98 includes a synchronous Reactive Streams Publisher for both LazyFutureStreams and JDK 8 Streams. The synchronous publisher, publishes Reactive Streams events on the calling thread.
Example creating a synchronous Reactive Streams publisher from a JDK 8 Stream
Publisher p = JDKReactiveStreamsPublisher.ofSync(Stream.iterate(0l, i->i+1l).limit(elements))
Example subscribe to a synchronous LazyFutureStream publisher
LazyFutureStream.react(this::load1,this::load2).map(this::process).sync().subscribe(subscriber);
Example creating a asynchronous Reactive Streams publisher from a JDK 8 Stream
Publisher p = JDKReactiveStreamsPublisher.ofAsync(Stream.iterate(0l, i->i+1l).limit(elements))
Example subscribe to a asynchronous LazyFutureStream publisher
LazyFutureStream.react(this::load1,this::load2).map(this::process).async().subscribe(subscriber);
Reactive Streams Subscriber
To have a JDK 8 Stream subscribe to another Reactive Stream use the FutureStreamSubscriber class.
Simply create a new instance
JDKReactiveStreamsSubscriber<Integer> subscriber = new JDKReactiveStreamsSubscriber<>();
pass it to a Publisher
Publisher publisher;
publisher.subscribe(subscriber);
Then access the Stream via the stream method
Stream<Integer> stream = subscriber.stream();
Data will be pulled into the stream when it requests it from the publisher (after a terminal operation - such as forEach as has been invoked).
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.98’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.98</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.98
v0.97 of simple-react
New in simple-react v0.97
Reactive Streams support
LazyFutureStream provides Reactive Streams Publisher and Subscriber implementations
0.97 of simple-react adds support for the Reactive Streams api. This allows for interoperability between different Reactive Stream implementations (other implementations include Pivotal Reactor, The Reactive Streams reference implementation, Akka Streams, Rx Java and Quasar).
Publisher
LazyFutureStream implements org.reactivestreams.Publisher interface in two ways Asyncrhonously for this release, with a Syncrhonous option coming in simple-react v0.98. With the Asynchronous publisher calls to onNext are made asyncrhonously after request has been called. The Syncrhonous publisher, will by contrast, synchronously call onNext after request has been called.
To subscribe to a LazyFutureStream, simply call the subscribe method on the LazyFutureStream itself.
LazyFutureStream.react(()->load(1),()->load(2),()->load(3))
.subscribe(subscriber);
Subscriber
To have a LazyFutureStream subscribe to another Reactive Stream use the FutureStreamSubscriber class.
Simply create a new instance
FutureStreamSubscriber<Integer> subscriber = new FutureStreamSubscriber<>();
pass it to a Publisher
Publisher publisher;
publisher.subscribe(subscriber);
Then access the LazyFutureStream via the stream method
LazyFutureStream<Integer> stream = subscriber.stream();
Data will be pulled into the stream when it requests it from the publisher (after a terminal operation - such as forEach as has been invoked).
Adding simple-react as a Dependency
Gradle
compile group: 'com.aol.simplereact', name:'simple-react', version:'0.97’
Maven
<dependency>
<groupId>com.aol.simplereact</groupId>
<artifactId>simple-react</artifactId>
<version>0.97</version>
<scope>compile</scope>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.simplereact/simple-react/0.97