Skip to content

Commit

Permalink
0.99.2 enforce cyclops as provided dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Sep 11, 2015
1 parent 7c26f92 commit b9328ea
Show file tree
Hide file tree
Showing 19 changed files with 10,236 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ apply plugin: 'jacoco'
apply from: custom('jacoco-version')

sourceCompatibility = 1.8
version = '0.99.1'
version = '0.99.2'
jar {
manifest {
attributes 'Implementation-Title': 'Simple React', 'Implementation-Version': version
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aol/simple/react/async/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import org.jooq.lambda.Seq;

import com.aol.cyclops.lambda.utils.ExceptionSoftener;
import com.aol.simple.react.async.factories.QueueFactories;
import com.aol.simple.react.async.factories.QueueToBlockingQueueWrapper;
import com.aol.simple.react.async.subscription.AlwaysContinue;
import com.aol.simple.react.async.subscription.Continueable;
import com.aol.simple.react.async.wait.DirectWaitStrategy;
import com.aol.simple.react.async.wait.WaitStrategy;
import com.aol.simple.react.exceptions.ExceptionSoftener;
import com.aol.simple.react.exceptions.SimpleReactProcessingException;
import com.aol.simple.react.stream.traits.Continuation;
import com.aol.simple.react.util.SimpleTimer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,19 @@



import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import lombok.AllArgsConstructor;
import lombok.Getter;
import uk.co.real_logic.agrona.concurrent.ManyToOneConcurrentArrayQueue;

import com.aol.cyclops.lambda.utils.ExceptionSoftener;
import com.aol.simple.react.exceptions.ExceptionSoftener;
import com.aol.simple.react.exceptions.SimpleReactCompletionException;
/*
* @author John McClean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.Collection;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import lombok.AllArgsConstructor;

import com.aol.cyclops.streams.StreamUtils;
import com.aol.simple.react.stream.eager.EagerReact;
import com.aol.simple.react.stream.traits.EagerFutureStream;

Expand Down Expand Up @@ -143,7 +143,8 @@ public <T> EagerFutureStream<T> of(Collection<T> collection) {
* @return EagerFutureStream
*/
public <T> EagerFutureStream<T> ofIterable(Iterable<T> iterable) {
return react(StreamUtils.stream(iterable).map(e -> supplier(e)));
return react(StreamSupport.stream(iterable.spliterator(),
false).map(e -> supplier(e)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import java.util.Collection;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import lombok.AllArgsConstructor;

import com.aol.cyclops.streams.StreamUtils;
import com.aol.simple.react.stream.lazy.LazyReact;
import com.aol.simple.react.stream.traits.LazyFutureStream;

Expand Down Expand Up @@ -143,7 +143,8 @@ public <T> LazyFutureStream<T> of(Collection<T> collection) {
* @return LazyFutureStream
*/
public <T> LazyFutureStream<T> ofIterable(Iterable<T> iterable) {
return react(StreamUtils.stream(iterable).map(e -> supplier(e)));
return react(StreamSupport.stream(iterable.spliterator(),
false).map(e -> supplier(e)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import static com.aol.simple.react.stream.traits.LazyFutureStream.lazyFutureStream;

import java.util.concurrent.Executor;
import java.util.stream.Stream;

import lombok.Value;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import com.aol.simple.react.stream.traits.LazyFutureStream;
import com.aol.simple.react.stream.ThreadPools;
import com.aol.simple.react.stream.lazy.LazyReact;

/**
*
Expand All @@ -30,24 +32,24 @@ public class JDKReactiveStreamsPublisher<T> implements Publisher<T>{

boolean synchronous;
Stream<T> wrappedStream;

Executor exec;
/**
* This creates a synchronous publisher that publishes on the calling thread.
*
* @param stream JDK Stream to turn into a Reactive Streams Publisher
* @return Reactive Streams Publisher
*/
public static <T> JDKReactiveStreamsPublisher<T> ofSync(Stream<T> stream){
return new JDKReactiveStreamsPublisher<T>(true,stream);
return new JDKReactiveStreamsPublisher<T>(true,stream,null);
}
/**
* This creates an asynchronous publisher that publishes on an external thread
*
* @param stream JDK Stream to turn into a Reactive Streams Publisher
* @return Reactive Streams Publisher
*/
public static <T> JDKReactiveStreamsPublisher<T> ofAsync(Stream<T> stream){
return new JDKReactiveStreamsPublisher<T>(false,stream);
public static <T> JDKReactiveStreamsPublisher<T> ofAsync(Stream<T> stream,Executor exec){
return new JDKReactiveStreamsPublisher<T>(false,stream,exec);
}
/*
* @param s Reactive Streams subscriber
Expand All @@ -57,8 +59,10 @@ public static <T> JDKReactiveStreamsPublisher<T> ofAsync(Stream<T> stream){
public void subscribe(Subscriber<? super T> s) {
if(synchronous)
lazyFutureStream(wrappedStream).sync().subscribe(s);
else
lazyFutureStream(wrappedStream).async().subscribe(s);
else{
new LazyReact(ThreadPools.getCurrentThreadExecutor()).withPublisherExecutor(exec).from(wrappedStream).async().subscribe(s);
}


}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.aol.simple.react.reactivestreams.jdk;


import java.util.concurrent.Executors;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
Expand All @@ -21,7 +22,8 @@ public TckAsynchronousPublisherTest(){

@Override
public Publisher<Long> createPublisher(long elements) {
return JDKReactiveStreamsPublisher.ofAsync(Stream.iterate(0l, i->i+1l).limit(elements));
return JDKReactiveStreamsPublisher.ofAsync(Stream.iterate(0l, i->i+1l).limit(elements),
Executors.newFixedThreadPool(1));

}

Expand Down
Loading

0 comments on commit b9328ea

Please sign in to comment.