Skip to content

Commit

Permalink
Add parallel-collectors samples (eugenp#7187)
Browse files Browse the repository at this point in the history
* Add ParallelCollectorsTest

* Add dependency

* Fix naming
  • Loading branch information
pivovarit authored Jun 26, 2019
1 parent 21cb183 commit 0c7c689
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 0 deletions.
5 changes: 5 additions & 0 deletions libraries-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
</repositories>

<dependencies>
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.baeldung.parallel_collectors;

import org.junit.Test;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static com.pivovarit.collectors.ParallelCollectors.parallel;
import static com.pivovarit.collectors.ParallelCollectors.parallelOrdered;
import static com.pivovarit.collectors.ParallelCollectors.parallelToCollection;
import static com.pivovarit.collectors.ParallelCollectors.parallelToList;
import static com.pivovarit.collectors.ParallelCollectors.parallelToMap;
import static com.pivovarit.collectors.ParallelCollectors.parallelToStream;

public class ParallelCollectorsUnitTest {

@Test
public void shouldProcessInParallelWithStreams() {
List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.parallelStream()
.map(i -> fetchById(i))
.collect(Collectors.toList());

System.out.println(results);
}

@Test
public void shouldProcessInParallelWithParallelCollectors() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
.collect(parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join());
}

@Test
public void shouldCollectToList() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
.collect(parallelToList(i -> fetchById(i), executor, 4))
.join();

System.out.println(results); // [user-1, user-2, user-3]
}

@Test
public void shouldCollectToCollection() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
.collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
.join();

System.out.println(results); // [user-1, user-2, user-3]
}

@Test
public void shouldCollectToStream() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, List<String>> results = ids.stream()
.collect(parallelToStream(i -> fetchById(i), executor, 4))
.thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
.join();

System.out.println(results); // [user-1, user-2, user-3]
}

@Test
public void shouldStreamInCompletionOrder() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

ids.stream()
.collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);
}

@Test
public void shouldStreamInOriginalOrder() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

ids.stream()
.collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
.forEach(System.out::println);
}

@Test
public void shouldCollectToMap() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
.join();

System.out.println(results); // {1=user-1, 2=user-2, 3=user-3}
}

@Test
public void shouldCollectToTreeMap() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
.join();

System.out.println(results); // {1=user-1, 2=user-2, 3=user-3}
}

@Test
public void shouldCollectToTreeMapAndResolveClashes() {
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
.collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
.join();

System.out.println(results); // {1=user-1, 2=user-2, 3=user-3}
}

private static String fetchById(int id) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore shamelessly
}

return "user-" + id;
}

private static String fetchByIdWithRandomDelay(int id) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
} catch (InterruptedException e) {
// ignore shamelessly
}

return "user-" + id;
}
}

0 comments on commit 0c7c689

Please sign in to comment.