From 0c7c689d3b619c448fda1731ac9e2c1f367314df Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Wed, 26 Jun 2019 22:07:16 +0200 Subject: [PATCH] Add parallel-collectors samples (#7187) * Add ParallelCollectorsTest * Add dependency * Fix naming --- libraries-2/pom.xml | 5 + .../ParallelCollectorsUnitTest.java | 168 ++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 libraries-2/src/test/java/com/baeldung/parallel_collectors/ParallelCollectorsUnitTest.java diff --git a/libraries-2/pom.xml b/libraries-2/pom.xml index c18b4aae64ac..f7be463f1a3e 100644 --- a/libraries-2/pom.xml +++ b/libraries-2/pom.xml @@ -30,6 +30,11 @@ + + com.pivovarit + parallel-collectors + 1.1.0 + org.assertj assertj-core diff --git a/libraries-2/src/test/java/com/baeldung/parallel_collectors/ParallelCollectorsUnitTest.java b/libraries-2/src/test/java/com/baeldung/parallel_collectors/ParallelCollectorsUnitTest.java new file mode 100644 index 000000000000..adc753a8ada7 --- /dev/null +++ b/libraries-2/src/test/java/com/baeldung/parallel_collectors/ParallelCollectorsUnitTest.java @@ -0,0 +1,168 @@ +package com.baeldung.parallel_collectors; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static com.pivovarit.collectors.ParallelCollectors.parallel; +import static com.pivovarit.collectors.ParallelCollectors.parallelOrdered; +import static com.pivovarit.collectors.ParallelCollectors.parallelToCollection; +import static com.pivovarit.collectors.ParallelCollectors.parallelToList; +import static com.pivovarit.collectors.ParallelCollectors.parallelToMap; +import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; + +public class ParallelCollectorsUnitTest { + + @Test + public void shouldProcessInParallelWithStreams() { + List ids = Arrays.asList(1, 2, 3); + + List results = ids.parallelStream() + .map(i -> fetchById(i)) + .collect(Collectors.toList()); + + System.out.println(results); + } + + @Test + public void shouldProcessInParallelWithParallelCollectors() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + CompletableFuture> results = ids.stream() + .collect(parallelToList(i -> fetchById(i), executor, 4)); + + System.out.println(results.join()); + } + + @Test + public void shouldCollectToList() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + List results = ids.stream() + .collect(parallelToList(i -> fetchById(i), executor, 4)) + .join(); + + System.out.println(results); // [user-1, user-2, user-3] + } + + @Test + public void shouldCollectToCollection() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + List results = ids.stream() + .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4)) + .join(); + + System.out.println(results); // [user-1, user-2, user-3] + } + + @Test + public void shouldCollectToStream() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + Map> results = ids.stream() + .collect(parallelToStream(i -> fetchById(i), executor, 4)) + .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length()))) + .join(); + + System.out.println(results); // [user-1, user-2, user-3] + } + + @Test + public void shouldStreamInCompletionOrder() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + ids.stream() + .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4)) + .forEach(System.out::println); + } + + @Test + public void shouldStreamInOriginalOrder() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + ids.stream() + .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4)) + .forEach(System.out::println); + } + + @Test + public void shouldCollectToMap() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + Map results = ids.stream() + .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4)) + .join(); + + System.out.println(results); // {1=user-1, 2=user-2, 3=user-3} + } + + @Test + public void shouldCollectToTreeMap() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + Map results = ids.stream() + .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4)) + .join(); + + System.out.println(results); // {1=user-1, 2=user-2, 3=user-3} + } + + @Test + public void shouldCollectToTreeMapAndResolveClashes() { + ExecutorService executor = Executors.newFixedThreadPool(10); + + List ids = Arrays.asList(1, 2, 3); + + Map results = ids.stream() + .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4)) + .join(); + + System.out.println(results); // {1=user-1, 2=user-2, 3=user-3} + } + + private static String fetchById(int id) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore shamelessly + } + + return "user-" + id; + } + + private static String fetchByIdWithRandomDelay(int id) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(1000)); + } catch (InterruptedException e) { + // ignore shamelessly + } + + return "user-" + id; + } +}