Skip to content

Commit

Permalink
Merge pull request #11 from aol/auto-closing-experimental
Browse files Browse the repository at this point in the history
auto-closing queues inside LazyFutureStreams
  • Loading branch information
johnmcclean committed Mar 20, 2015
2 parents 9010d78 + e685e69 commit 675f601
Show file tree
Hide file tree
Showing 32 changed files with 820 additions and 159 deletions.
38 changes: 38 additions & 0 deletions src/main/java/com/aol/simple/react/async/AlwaysContinue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.aol.simple.react.async;

public class AlwaysContinue implements Continueable{

public void closeQueueIfFinished(Queue queue){

}

@Override
public void addQueue(Queue queue) {


}
public void registerSkip(long skip){

}
public void registerLimit(long limit){

}

@Override
public void closeAll() {


}

@Override
public boolean closed() {

return false;
}

@Override
public void closeQueueIfFinishedStateless(Queue queue) {


}
}
22 changes: 20 additions & 2 deletions src/main/java/com/aol/simple/react/async/ClosingSpliterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,24 @@
public class ClosingSpliterator<T> implements Spliterator<T> {
private long estimate;
final Supplier<T> s;
private final Continueable subscription;
private final Queue queue;

protected ClosingSpliterator(long estimate,Supplier<T> s) {
protected ClosingSpliterator(long estimate,Supplier<T> s,
Continueable subscription,
Queue queue) {
this.estimate = estimate;
this.s = s;
this.subscription = subscription;
this.queue = queue;
this.subscription.addQueue(queue);
}
public ClosingSpliterator(long estimate,Supplier<T> s,
Continueable subscription) {
this.estimate = estimate;
this.s = s;
this.subscription = subscription;
this.queue = null;
}

@Override
Expand All @@ -34,8 +48,12 @@ public int characteristics() {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);


try{
subscription.closeQueueIfFinishedStateless(queue);
action.accept(s.get());
subscription.closeQueueIfFinished(queue);
return true;
}catch(ClosedQueueException e){
return false;
Expand All @@ -49,7 +67,7 @@ public boolean tryAdvance(Consumer<? super T> action) {
@Override
public Spliterator<T> trySplit() {

return new ClosingSpliterator(estimate >>>= 1, s);
return new ClosingSpliterator(estimate >>>= 1, s,subscription,queue);
}


Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/aol/simple/react/async/Continueable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.aol.simple.react.async;

public interface Continueable {

public void closeQueueIfFinished(Queue queue);

public void addQueue(Queue queue);
public void registerSkip(long skip);
public void registerLimit(long limit);
public void closeAll();

public boolean closed();

public void closeQueueIfFinishedStateless(Queue queue);
}
46 changes: 37 additions & 9 deletions src/main/java/com/aol/simple/react/async/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.aol.simple.react.exceptions.ExceptionSoftener;
import com.aol.simple.react.exceptions.SimpleReactProcessingException;
import com.aol.simple.react.util.SimpleTimer;

/**
* Inspired by scalaz-streams async.Queue (functionally similar, but Blocking)
Expand Down Expand Up @@ -53,6 +54,7 @@ public class Queue<T> implements Adapter<T> {
@Getter
private final Signal<Integer> sizeSignal;


/**
* Construct a Queue backed by a LinkedBlockingQueue
*/
Expand Down Expand Up @@ -86,14 +88,17 @@ public Queue(BlockingQueue<T> queue) {
*/
public Seq<T> stream() {
listeningStreams.incrementAndGet(); //assumes all Streams that ever connected, remain connected
return Seq.seq(closingStream(this::ensureOpen));
return Seq.seq(closingStream(this::ensureOpen,new AlwaysContinue()));
}
public Seq<T> stream(Continueable s) {
listeningStreams.incrementAndGet(); //assumes all Streams that ever connected, remain connected
return Seq.seq(closingStream(this::ensureOpen,s));
}

private Stream<T> closingStream(Supplier<T> s){

private Stream<T> closingStream(Supplier<T> s, Continueable sub){

Stream<T> st = StreamSupport.stream(
new ClosingSpliterator(Long.MAX_VALUE, s), false);
new ClosingSpliterator(Long.MAX_VALUE, s,sub,this), false);

return st;
}
Expand Down Expand Up @@ -185,8 +190,10 @@ private static class PoisonPill { }
public boolean add(T data){
try{
boolean result = queue.add((T)nullSafe(data));
if(sizeSignal!=null)
this.sizeSignal.set(queue.size());
if(true){
if(sizeSignal!=null)
this.sizeSignal.set(queue.size());
}
return result;

}catch(IllegalStateException e){
Expand All @@ -205,10 +212,19 @@ public boolean add(T data){
*/
@Override
public boolean offer(T data) {
if(!open)
throw new ClosedQueueException();


try {
boolean result = this.queue.offer((T)nullSafe(data),this.offerTimeout,this.offerTimeUnit);

boolean result = false;
SimpleTimer timer = new SimpleTimer();
do{

if(!open)
throw new ClosedQueueException();
result = this.queue.offer((T)nullSafe(data),1l,TimeUnit.MICROSECONDS);
}while(!result && !timeout(timer));

if(sizeSignal!=null)
this.sizeSignal.set(queue.size());
return result;
Expand All @@ -221,6 +237,13 @@ public boolean offer(T data) {
}


private boolean timeout(SimpleTimer timer) {

if(timer.getElapsedNanoseconds()>=offerTimeUnit.toNanos(this.offerTimeout))
return true;
return false;
}

private Object nillSafe(T data) {
if(NILL==data)
return null;
Expand Down Expand Up @@ -249,6 +272,11 @@ public boolean close() {
return true;
}

public void closeAndClear(){
this.open = false;
queue.clear();
}

private final NIL NILL = new NIL();
private static class NIL {}
}
120 changes: 120 additions & 0 deletions src/main/java/com/aol/simple/react/async/Subscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.aol.simple.react.async;


import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.Setter;

import org.jooq.lambda.Seq;

@Getter
@Setter
public class Subscription implements Continueable{
private final Map<Queue,AtomicLong> limits = new HashMap<>();

private final Map<Queue,AtomicBoolean> unlimited = new HashMap<>();
private final Map<Queue,AtomicLong> count = new HashMap<>();
private final List<Queue> queues = new LinkedList<>();

private final AtomicBoolean closed= new AtomicBoolean(false);

public void registerSkip(long skip){
if(queues.size()>0)
limits.get(currentQueue()).addAndGet(skip);
}
public void registerLimit(long limit){

if(queues.size()>0){
if(unlimited.get(currentQueue()).get())
limits.get(currentQueue()).set(0);

limits.get(currentQueue()).addAndGet(limit);
unlimited.get(currentQueue()).set(false);

queues.stream().forEach(this::closeQueueIfFinishedStateless);

}
}
private Queue currentQueue() {
return queues.get(queues.size()-1);
}
public void addQueue(Queue q){

queues.add(q);
limits.put(q, new AtomicLong(Long.MAX_VALUE-1));
unlimited.put(q, new AtomicBoolean(true));
count.put(q, new AtomicLong(0l));

}


public void closeQueueIfFinished(Queue queue){

closeQueueIfFinished(queue,AtomicLong::incrementAndGet);

}
private void closeQueueIfFinished(Queue queue, Function<AtomicLong,Long> fn){

if(queues.size()==0)
return;

long queueCount = fn.apply(count.get(queue));
long limit = valuesToRight(queue).stream().reduce((acc,next)-> Math.min(acc, next)).get();




if(queueCount>=limit){ //last entry - close THIS queue only!

queue.closeAndClear();
closed.set(true);
}


}
public void closeQueueIfFinishedStateless(Queue queue){

closeQueueIfFinished(queue,AtomicLong::get);

}
private List<Long> valuesToRight(Queue queue) {
return Seq.seq(queues.stream()).splitAt(findQueue(queue)).v2.map(limits::get).map(AtomicLong::get).collect(Collectors.toList());

}

private int findQueue(Queue queue){
for(int i=0;i< queues.size();i++){
if(queues.get(i) == queue)
return i;
}
return -1;
}
@Override
public void closeAll() {
closed.set(true);
queues.stream().forEach(Queue::closeAndClear);

}
@Override
public boolean closed() {
return closed.get();
}
}
/**
stream.map().iterator().limit(4).flatMap(..).limit(2).map(..).limit(8)
subscription
stream no limit
q1:limit (4)
q2:limit (2)
q3:limit (8)
**/

42 changes: 42 additions & 0 deletions src/main/java/com/aol/simple/react/stream/BaseLazySimpleReact.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.aol.simple.react.stream;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.aol.simple.react.async.Subscription;
import com.aol.simple.react.stream.traits.SimpleReactStream;

public abstract class BaseLazySimpleReact extends BaseSimpleReact{
/**
* Generate an infinite reactive flow. Requires a lazy flow.
*
* The flow will run indefinitely unless / until the provided Supplier throws an Exception
*
* @see com.aol.simple.react.async.Queue SimpleReact Queue for a way to create a more managable infinit flow
*
* @param s Supplier to generate the infinite flow
* @return Next stage in the flow
*/
public <U> SimpleReactStream< U> reactInfinitely(final Supplier<U> s) {
if(isEager())
throw new InfiniteProcessingException("To reactInfinitely use a lazy stream");
Subscription sub = new Subscription();
SimpleReactStream stream = construct(StreamSupport.stream(
new InfiniteClosingSpliterator(Long.MAX_VALUE, () -> CompletableFuture.completedFuture(s.get()),sub), false),
this.getExecutor(),getRetrier(),false).withSubscription(sub);

return stream;


}

public <U> SimpleReactStream<U> iterateInfinitely(final U seed, final UnaryOperator<U> f){
if(isEager())
throw new InfiniteProcessingException("To iterateInfinitely use a lazy stream");
return construct(Stream.iterate(seed, it -> f.apply(it)).map(it -> CompletableFuture.completedFuture(it)),
this.getExecutor(),getRetrier(),false);
}
}
Loading

0 comments on commit 675f601

Please sign in to comment.