Skip to content

Commit

Permalink
Merge pull request #332 from aol/parallel-stream-queue
Browse files Browse the repository at this point in the history
parallel streaming fixes
  • Loading branch information
johnmcclean authored Oct 20, 2016
2 parents 11ba9e2 + d63ca50 commit d7d0eb8
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ test {

javadoc {
configure((CoreJavadocOptions) getOptions()) {
addStringOption('sourcepath', "~/github/jool-0.9.11-sources.jar")
addStringOption('sourcepath', "/Sources/jool-0.9.11-sources.jar")
}
}
modifyPom {
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/aol/cyclops/data/async/AdaptersModule.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.aol.cyclops.data.async;



import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -322,14 +324,16 @@ public boolean tryAdvance(final Consumer<? super T> action) {
}

}
/**

@Override
public Spliterator<T> trySplit() {
return new ClosingSpliterator(
estimate >>>= 1, s, subscription, queue);

}
**/



}

}
7 changes: 6 additions & 1 deletion src/main/java/com/aol/cyclops/data/async/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ public ReactiveSeq<T> stream() {
* @return Java 8 Stream connnected to this Queue
*/
public Stream<T> jdkStream() {
listeningStreams.incrementAndGet();
int cores = Runtime.getRuntime().availableProcessors();
String par = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
int connected = par !=null ? Integer.valueOf(par) : cores;
for(int i=0;i<connected*2;i++){
listeningStreams.incrementAndGet();
}
return closingStream(this::get, new AlwaysContinue());
}

Expand Down
63 changes: 57 additions & 6 deletions src/test/java/com/aol/cyclops/data/async/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -22,6 +23,7 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.internal.verification.Times;

import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.control.SimpleReact;
Expand All @@ -42,16 +44,18 @@ public void setup() {
public void parallelStreamClose(){
int cores = Runtime.getRuntime().availableProcessors();
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(cores*4));

for(int k=0; k < 10;k++) {

com.aol.cyclops.data.async.Queue<Integer> queue = QueueFactories.<Integer>boundedQueue(5000).build();

new Thread(() -> {
while(!queue.isOpen());
System.err.println(queue.close());
while(!queue.isOpen()){
System.out.println("Queue isn't open yet!");
}
System.err.println("Closing " + queue.close());
}).start();

Stream<Integer> stream = queue.jdkStream();

stream = stream.parallel();
Expand All @@ -63,16 +67,33 @@ public void parallelStreamClose(){
}
}
@Test
public void closedParallelStream(){
Queue<Integer> q = QueueFactories.<Integer>boundedQueue(100).build();
for(int i=0;i<1000;i++){
q.add(i);
}
q.close();
q.jdkStream().parallel().forEach(System.out::println);
}
@Test
public void parallelStream(){


success = false;
AtomicLong threadId = new AtomicLong(Thread.currentThread().getId());
Queue<Integer> q = QueueFactories.<Integer>boundedQueue(100).build();
Queue<Integer> q = QueueFactories.<Integer>boundedQueue(2000).build();
for(int i=0;i<10000;i++){
q.add(i);
}
System.out.println(" queue " + q.size());
System.out.println(threadId.get());
q.jdkStream()
.parallel()
.peek(i-> { if(threadId.get()!= Thread.currentThread().getId()){
.peek(System.out::println)
.peek(i-> {
System.out.println(Thread.currentThread().getId());
if(threadId.get()!= Thread.currentThread().getId()){
System.out.println("closing");
success=true;
q.close();
}})
Expand All @@ -83,6 +104,36 @@ public void parallelStream(){

}
@Test
public void parallelStreamSmallBounds(){

for(int x=0;x<10;x++){
System.out.println("Run " + x);
success = false;
AtomicLong threadId = new AtomicLong(Thread.currentThread().getId());
Queue<Integer> q = QueueFactories.<Integer>boundedQueue(100).build();
for(int i=0;i<10000;i++){
q.add(i);
}
System.out.println(" queue " + q.size());
System.out.println(threadId.get());
q.jdkStream()
.parallel()
.peek(System.out::println)
.peek(i-> {
System.out.println(Thread.currentThread().getId());
if(threadId.get()!= Thread.currentThread().getId()){
System.out.println("closing");
success=true;
q.close();
}})
.peek(i->System.out.println(Thread.currentThread().getId()))
.forEach(System.out::println);

assertTrue(success);
}

}
@Test
public void closeQueue(){
Queue<Integer> q = QueueFactories.<Integer>boundedQueue(100).build();
q.add(1);
Expand Down

0 comments on commit d7d0eb8

Please sign in to comment.