Skip to content

Commit

Permalink
Core: Fix possible deadlock in ParallelIterable (#11781)
Browse files Browse the repository at this point in the history
* Fix ParallelIterable deadlock

It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes #11768

* Do not submit a task when there is no space in queue
  • Loading branch information
sopel39 authored Jan 11, 2025
1 parent a100e6a commit c7910bb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 36 deletions.
27 changes: 20 additions & 7 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static class ParallelIterator<T> implements CloseableIterator<T> {
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final int maxQueueSize;

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
Expand All @@ -97,6 +98,7 @@ private ParallelIterator(
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
this.maxQueueSize = maxQueueSize;
}

@Override
Expand Down Expand Up @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
try {
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
taskFutures[i] = null;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
}
}

taskFutures[i] = submitNextTask();
// submit a new task if there is space in the queue
if (queue.size() < maxQueueSize) {
taskFutures[i] = submitNextTask();
}
}

if (taskFutures[i] != null) {
Expand Down Expand Up @@ -257,17 +263,24 @@ private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
@Override
public Optional<Task<T>> get() {
try {
if (queue.size() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
//
// Tasks might hold references (via iterator) to constrained resources
// (e.g. pooled connections). Hence, tasks should yield only when
// iterator is not instantiated. Otherwise, there could be
// a deadlock when yielded tasks are waiting to be executed while
// currently executed tasks are waiting for the resources that are held
// by the yielded tasks.
return Optional.of(this);
}

if (iterator == null) {
iterator = input.iterator();
}

while (iterator.hasNext()) {
if (queue.size() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
}

T next = iterator.next();
if (closed.get()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestParallelIterable {
@Test
Expand Down Expand Up @@ -143,7 +145,7 @@ public CloseableIterator<Integer> iterator() {

@Test
public void limitQueueSize() {
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
List<Iterable<Integer>> iterables =
ImmutableList.of(
Expand All @@ -167,7 +169,7 @@ public void limitQueueSize() {
while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(maxQueueSize + iterables.size());
.isLessThanOrEqualTo(100);
actualValues.add(iterator.next());
}

Expand All @@ -182,41 +184,61 @@ public void limitQueueSize() {
}

@Test
public void queueSizeOne() {
ExecutorService executor = Executors.newCachedThreadPool();
@Timeout(10)
public void noDeadlock() {
// This test simulates a scenario where iterators use a constrained resource
// (e.g. an S3 connection pool that has a limit on the number of connections).
// In this case, the constrained resource shouldn't cause a deadlock when queue
// is full and the iterator is waiting for the queue to be drained.
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
List<Iterable<Integer>> iterables =
ImmutableList.of(
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator());
Semaphore semaphore = new Semaphore(1);

Multiset<Integer> expectedValues =
IntStream.range(0, 100)
.boxed()
.flatMap(i -> Stream.of(i, i, i))
.collect(ImmutableMultiset.toImmutableMultiset());
List<Iterable<Integer>> iterablesA =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator()));
List<Iterable<Integer>> iterablesB =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator()));

ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1);
ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1);
ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1);

Multiset<Integer> actualValues = HashMultiset.create();
parallelIterableA.iterator().next();
parallelIterableB.iterator().next();
} finally {
executor.shutdownNow();
}
}

while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(1 + iterables.size());
actualValues.add(iterator.next());
private <T> CloseableIterable<T> testIterable(
RunnableWithException open, RunnableWithException close, Iterator<T> iterator) {
return new CloseableIterable<T>() {
@Override
public void close() {
try {
close.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

assertThat(actualValues)
.as("multiset of values returned by the iterator")
.isEqualTo(expectedValues);
@Override
public CloseableIterator<T> iterator() {
try {
open.run();
return CloseableIterator.withClose(iterator);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}

iterator.close();
} finally {
executor.shutdown();
}
private interface RunnableWithException {
void run() throws Exception;
}

private void queueHasElements(ParallelIterator<Integer> iterator) {
Expand Down

0 comments on commit c7910bb

Please sign in to comment.