Skip to content

Commit

Permalink
Merge pull request #14 from aol/performance-reliability
Browse files Browse the repository at this point in the history
performance and reliability enhancements
  • Loading branch information
johnmcclean committed Mar 23, 2015
2 parents 85fa919 + f25a1a2 commit 7347e13
Show file tree
Hide file tree
Showing 40 changed files with 4,066 additions and 903 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ apply plugin: 'com.github.johnrengelman.shadow'


sourceCompatibility = 1.8
version = '0.70'
version = '0.80'
jar {
manifest {
attributes 'Implementation-Title': 'Simple React', 'Implementation-Version': version
Expand Down Expand Up @@ -57,7 +57,7 @@ modifyPom {

groupId 'com.aol.simplereact'
artifactId 'simple-react'
version '0.70'
version '0.80'

scm {
url 'scm:[email protected]:aol/simple-react.git'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ public boolean tryAdvance(Consumer<? super T> action) {


try{
subscription.closeQueueIfFinishedStateless(queue);

action.accept(s.get());
subscription.closeQueueIfFinished(queue);
return true;
}catch(ClosedQueueException e){
if(e.isDataPresent())
action.accept((T)e.getCurrentData());
return false;
}catch(Exception e){

Expand Down
97 changes: 83 additions & 14 deletions src/main/java/com/aol/simple/react/async/Queue.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.aol.simple.react.async;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -20,6 +22,7 @@
import com.aol.simple.react.exceptions.ExceptionSoftener;
import com.aol.simple.react.exceptions.SimpleReactProcessingException;
import com.aol.simple.react.util.SimpleTimer;
import com.google.common.collect.Lists;

/**
* Inspired by scalaz-streams async.Queue (functionally similar, but Blocking)
Expand Down Expand Up @@ -95,6 +98,24 @@ public Seq<T> stream(Continueable s) {
return Seq.seq(closingStream(this::ensureOpen,s));
}

public Seq<Collection<T>> streamBatch(Continueable s,Function<Supplier<T>,Supplier<Collection<T>>> batcher) {

listeningStreams.incrementAndGet(); //assumes all Streams that ever connected, remain connected
return Seq.seq(closingStreamBatch(batcher.apply(this::ensureOpen),s));
}
public Seq<T> streamControl(Continueable s,Function<Supplier<T>,Supplier<T>> batcher) {

listeningStreams.incrementAndGet(); //assumes all Streams that ever connected, remain connected
return Seq.seq(closingStream(batcher.apply(this::ensureOpen),s));
}

private Stream<Collection<T>> closingStreamBatch(Supplier<Collection<T>> s, Continueable sub){

Stream<Collection<T>> st = StreamSupport.stream(
new ClosingSpliterator(Long.MAX_VALUE, s,sub,this), false);

return st;
}
private Stream<T> closingStream(Supplier<T> s, Continueable sub){

Stream<T> st = StreamSupport.stream(
Expand Down Expand Up @@ -160,9 +181,21 @@ private T ensureOpen() {
* @author johnmcclean
*
*/
@AllArgsConstructor

public static class ClosedQueueException extends
SimpleReactProcessingException {
private static final long serialVersionUID = 1L;
@Getter
private final Object currentData;
private final Object NOT_PRESENT = new Object();
public ClosedQueueException() {
currentData = NOT_PRESENT;
}

public boolean isDataPresent(){
return currentData != NOT_PRESENT;
}
}

/**
Expand Down Expand Up @@ -213,17 +246,10 @@ public boolean add(T data){
@Override
public boolean offer(T data) {


if(!open)
throw new ClosedQueueException();
try {

boolean result = false;
SimpleTimer timer = new SimpleTimer();
do{

if(!open)
throw new ClosedQueueException();
result = this.queue.offer((T)nullSafe(data),1l,TimeUnit.MICROSECONDS);
}while(!result && !timeout(timer));
boolean result = this.queue.offer((T)nullSafe(data),this.offerTimeout,this.offerTimeUnit);

if(sizeSignal!=null)
this.sizeSignal.set(queue.size());
Expand Down Expand Up @@ -265,8 +291,11 @@ private Object nullSafe(T data) {
@Override
public boolean close() {
this.open = false;
for(int i=0;i<Math.min(maxPoisonPills, listeningStreams.get());i++){
queue.add((T)POISON_PILL);

if(this.queue.remainingCapacity()>0){
for(int i=0;i<Math.min(maxPoisonPills, listeningStreams.get());i++){
queue.add((T)POISON_PILL);
}
}

return true;
Expand All @@ -277,6 +306,46 @@ public void closeAndClear(){
queue.clear();
}

private final NIL NILL = new NIL();
private static class NIL {}
public static final NIL NILL = new NIL();
public static class NIL {}

@AllArgsConstructor
public static class QueueReader<T>{
@Getter
Queue<T> queue;
public boolean notEmpty() {
return queue.queue.size()!=0;
}

@Getter
private volatile T last = null;
private int size(){
return queue.queue.size();
}
public T next(){
last = queue.ensureOpen();
return last;
}
public boolean isOpen() {
return queue.open;
}
public Collection<T> drainToOrBlock() {
Collection<T> result = Lists.newArrayList();
if(size()>0)
queue.queue.drainTo(result);
else{
try{
result.add(queue.ensureOpen());
}catch(ClosedQueueException e){
queue.open=false;
throw e;
}
}

return result.stream().filter(it -> it!=POISON_PILL).collect(Collectors.toList());
}
}



}
1 change: 1 addition & 0 deletions src/main/java/com/aol/simple/react/async/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private int findQueue(Queue queue){
}
@Override
public void closeAll() {

closed.set(true);
queues.stream().forEach(Queue::closeAndClear);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public <U> SimpleReactStream< U> reactInfinitely(final Supplier<U> s) {
Subscription sub = new Subscription();
SimpleReactStream stream = construct(StreamSupport.stream(
new InfiniteClosingSpliterator(Long.MAX_VALUE, () -> CompletableFuture.completedFuture(s.get()),sub), false),
this.getExecutor(),getRetrier(),false).withSubscription(sub);
this.getExecutor(),getRetrier(),false,null).withSubscription(sub);

return stream;

Expand All @@ -50,7 +50,7 @@ public <U> SimpleReactStream< U> reactInfinitelyAsync(final Supplier<U> s) {
Subscription sub = new Subscription();
SimpleReactStream stream = construct(StreamSupport.stream(
new InfiniteClosingSpliterator(Long.MAX_VALUE, () -> CompletableFuture.supplyAsync(s),sub), false),
this.getExecutor(),getRetrier(),false).withSubscription(sub);
this.getExecutor(),getRetrier(),false,null).withSubscription(sub);

return stream;

Expand Down Expand Up @@ -83,7 +83,7 @@ public CompletableFuture<U> next() {
}
};
return construct(StreamSupport.stream( new InfiniteClosingSpliteratorFromIterator(Long.MAX_VALUE,iterator,sub),false),
this.getExecutor(),getRetrier(),false);
this.getExecutor(),getRetrier(),false,null);

}
}
55 changes: 45 additions & 10 deletions src/main/java/com/aol/simple/react/stream/BaseSimpleReact.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.aol.simple.react.async.ClosingSpliterator;
import com.aol.simple.react.async.Subscription;
import com.aol.simple.react.generators.Generator;
import com.aol.simple.react.generators.ParallelGenerator;
import com.aol.simple.react.generators.ReactIterator;
Expand All @@ -29,7 +28,8 @@ public abstract class BaseSimpleReact {
protected abstract boolean isEager();
protected abstract RetryExecutor getRetrier();

public abstract <U> SimpleReactStream<U> construct(Stream s, ExecutorService e, RetryExecutor r, boolean eager);
public abstract <U> SimpleReactStream<U> construct(Stream s, ExecutorService e, RetryExecutor r, boolean eager,
List<CompletableFuture> org);



Expand All @@ -43,7 +43,7 @@ public abstract class BaseSimpleReact {
public <U> SimpleReactStream<U> fromStream(final Stream<CompletableFuture<U>> stream) {

Stream s = stream;
return construct( s,getExecutor(),getRetrier(), isEager());
return construct( s,getExecutor(),getRetrier(), isEager(),null);
}
/**
* Start a reactive dataflow from a stream.
Expand All @@ -54,8 +54,43 @@ public <U> SimpleReactStream<U> fromStream(final Stream<CompletableFuture<U>> st
public <U> SimpleReactStream<U> fromStreamWithoutFutures(final Stream<U> stream) {

Stream s = stream.map(it -> CompletableFuture.completedFuture(it));
return construct( s,this.getExecutor(), getRetrier(),isEager());
return construct( s,this.getExecutor(), getRetrier(),isEager(),null);
}
/**
* Start a reactive dataflow from a stream.
*
* @param stream that will be used to drive the reactive dataflow
* @return Next stage in the reactive flow
*/
public <U> SimpleReactStream<U> fromPrimitiveStream(final IntStream stream) {

return (SimpleReactStream<U>)fromStreamWithoutFutures(stream.boxed());

}
/**
* Start a reactive dataflow from a stream.
*
* @param stream that will be used to drive the reactive dataflow
* @return Next stage in the reactive flow
*/
public <U> SimpleReactStream<U> fromPrimitiveStream(final DoubleStream stream) {

return (SimpleReactStream<U>)fromStreamWithoutFutures(stream.boxed());

}
/**
* Start a reactive dataflow from a stream.
*
* @param stream that will be used to drive the reactive dataflow
* @return Next stage in the reactive flow
*/
public <U> SimpleReactStream<U> fromPrimitiveStream(final LongStream stream) {

return (SimpleReactStream<U>)fromStreamWithoutFutures(stream.boxed());

}



public <U> SimpleReactStream<U> of(U...array){
return fromStreamWithoutFutures(Stream.of(array));
Expand Down Expand Up @@ -136,7 +171,7 @@ public <R> SimpleReactStream<R> reactToCollection(final Collection<R> collection
public <U> SimpleReactStream< U> react(final Supplier<U> s, Generator t) {

return construct(t.generate(s),
this.getExecutor(),getRetrier(),isEager());
this.getExecutor(),getRetrier(),isEager(),null);

}

Expand Down Expand Up @@ -182,7 +217,7 @@ public static ParallelGenerator timesInSequence(int times){
public <U> SimpleReactStream<U> react(final Function<U,U> f,ReactIterator<U> t) {

Stream s = t.iterate(f);
return construct(s,this.getExecutor(),getRetrier(),isEager());
return construct(s,this.getExecutor(),getRetrier(),isEager(),null);

}
/**
Expand Down Expand Up @@ -224,7 +259,7 @@ protected <U> SimpleReactStream<U> reactI(final Supplier<U>... actions) {

return construct(Stream.of(actions).map(
next -> CompletableFuture.supplyAsync(next, this.getExecutor())),
this.getExecutor(),getRetrier(),isEager());
this.getExecutor(),getRetrier(),isEager(),null);


}
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/aol/simple/react/stream/CloseableIterator.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.aol.simple.react.stream;

import java.util.Iterator;
import java.util.function.Consumer;

import lombok.AllArgsConstructor;
import lombok.experimental.Delegate;
Expand All @@ -10,11 +11,20 @@
@AllArgsConstructor
public class CloseableIterator<T> implements Iterator<T>{

@Delegate

private final Iterator<T> iterator;
private final Continueable subscription;

public boolean hasNext(){
if(!iterator.hasNext())
close();
return iterator.hasNext();
}
public void close(){
subscription.closeAll();
}
public T next() {
return iterator.next();
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/aol/simple/react/stream/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public boolean run(StreamWrapper lastActive,EmptyCollector collector) {
}catch(java.util.concurrent.CompletionException e){

}catch(Throwable e){
e.printStackTrace();

}

runnable.run();
Expand Down
Loading

0 comments on commit 7347e13

Please sign in to comment.