From d63ca5016d603d78b12e255a5dbc88b447a96337 Mon Sep 17 00:00:00 2001 From: John McClean Date: Thu, 20 Oct 2016 11:26:53 +0100 Subject: [PATCH] parallel streaming fixes --- build.gradle | 2 +- .../cyclops/data/async/AdaptersModule.java | 10 ++- .../com/aol/cyclops/data/async/Queue.java | 7 ++- .../com/aol/cyclops/data/async/QueueTest.java | 63 +++++++++++++++++-- 4 files changed, 71 insertions(+), 11 deletions(-) diff --git a/build.gradle b/build.gradle index eb032e65ae..8dc4923555 100644 --- a/build.gradle +++ b/build.gradle @@ -127,7 +127,7 @@ test { javadoc { configure((CoreJavadocOptions) getOptions()) { - addStringOption('sourcepath', "~/github/jool-0.9.11-sources.jar") + addStringOption('sourcepath', "/Sources/jool-0.9.11-sources.jar") } } modifyPom { diff --git a/src/main/java/com/aol/cyclops/data/async/AdaptersModule.java b/src/main/java/com/aol/cyclops/data/async/AdaptersModule.java index ecdbd7e437..197d98d524 100644 --- a/src/main/java/com/aol/cyclops/data/async/AdaptersModule.java +++ b/src/main/java/com/aol/cyclops/data/async/AdaptersModule.java @@ -1,5 +1,7 @@ package com.aol.cyclops.data.async; + + import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -322,14 +324,16 @@ public boolean tryAdvance(final Consumer action) { } } -/** + @Override public Spliterator trySplit() { - return new ClosingSpliterator( estimate >>>= 1, s, subscription, queue); + } - **/ + + } + } diff --git a/src/main/java/com/aol/cyclops/data/async/Queue.java b/src/main/java/com/aol/cyclops/data/async/Queue.java index 69214eae9a..c66990911e 100644 --- a/src/main/java/com/aol/cyclops/data/async/Queue.java +++ b/src/main/java/com/aol/cyclops/data/async/Queue.java @@ -162,7 +162,12 @@ public ReactiveSeq stream() { * @return Java 8 Stream connnected to this Queue */ public Stream jdkStream() { - listeningStreams.incrementAndGet(); + int cores = Runtime.getRuntime().availableProcessors(); + String par = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"); + int connected = par !=null ? Integer.valueOf(par) : cores; + for(int i=0;i queue = QueueFactories.boundedQueue(5000).build(); new Thread(() -> { - while(!queue.isOpen()); - System.err.println(queue.close()); + while(!queue.isOpen()){ + System.out.println("Queue isn't open yet!"); + } + System.err.println("Closing " + queue.close()); }).start(); - + Stream stream = queue.jdkStream(); stream = stream.parallel(); @@ -63,16 +67,33 @@ public void parallelStreamClose(){ } } @Test + public void closedParallelStream(){ + Queue q = QueueFactories.boundedQueue(100).build(); + for(int i=0;i<1000;i++){ + q.add(i); + } + q.close(); + q.jdkStream().parallel().forEach(System.out::println); + } + @Test public void parallelStream(){ + + success = false; AtomicLong threadId = new AtomicLong(Thread.currentThread().getId()); - Queue q = QueueFactories.boundedQueue(100).build(); + Queue q = QueueFactories.boundedQueue(2000).build(); for(int i=0;i<10000;i++){ q.add(i); } + System.out.println(" queue " + q.size()); + System.out.println(threadId.get()); q.jdkStream() .parallel() - .peek(i-> { if(threadId.get()!= Thread.currentThread().getId()){ + .peek(System.out::println) + .peek(i-> { + System.out.println(Thread.currentThread().getId()); + if(threadId.get()!= Thread.currentThread().getId()){ + System.out.println("closing"); success=true; q.close(); }}) @@ -83,6 +104,36 @@ public void parallelStream(){ } @Test + public void parallelStreamSmallBounds(){ + + for(int x=0;x<10;x++){ + System.out.println("Run " + x); + success = false; + AtomicLong threadId = new AtomicLong(Thread.currentThread().getId()); + Queue q = QueueFactories.boundedQueue(100).build(); + for(int i=0;i<10000;i++){ + q.add(i); + } + System.out.println(" queue " + q.size()); + System.out.println(threadId.get()); + q.jdkStream() + .parallel() + .peek(System.out::println) + .peek(i-> { + System.out.println(Thread.currentThread().getId()); + if(threadId.get()!= Thread.currentThread().getId()){ + System.out.println("closing"); + success=true; + q.close(); + }}) + .peek(i->System.out.println(Thread.currentThread().getId())) + .forEach(System.out::println); + + assertTrue(success); + } + + } + @Test public void closeQueue(){ Queue q = QueueFactories.boundedQueue(100).build(); q.add(1);