From c7910bb401f7f7fd09010bede0d80f5d2164afd5 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Sat, 11 Jan 2025 17:13:18 +0100 Subject: [PATCH] Core: Fix possible deadlock in ParallelIterable (#11781) * Fix ParallelIterable deadlock It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool. Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. Consider scenario: S3 connection pool size=1 approximateMaxQueueSize=1 workerPoolSize=1 ParallelIterable1: starts TaskP1 ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection) ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool ParallelIterable1: result gets consumed, TaskP1 is scheduled again ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded. Fixes https://github.com/apache/iceberg/issues/11768 * Do not submit a task when there is no space in queue --- .../apache/iceberg/util/ParallelIterable.java | 27 +++++-- .../iceberg/util/TestParallelIterable.java | 80 ++++++++++++------- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d40f64844797..7acab8762fb8 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -86,6 +86,7 @@ static class ParallelIterator implements CloseableIterator { private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private final int maxQueueSize; private ParallelIterator( Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { @@ -97,6 +98,7 @@ private ParallelIterator( this.workerPool = workerPool; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.maxQueueSize = maxQueueSize; } @Override @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() { try { Optional> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() { } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); + } } if (taskFutures[i] != null) { @@ -257,17 +263,24 @@ private static class Task implements Supplier>>, Closeable { @Override public Optional> get() { try { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + // + // Tasks might hold references (via iterator) to constrained resources + // (e.g. pooled connections). Hence, tasks should yield only when + // iterator is not instantiated. Otherwise, there could be + // a deadlock when yielded tasks are waiting to be executed while + // currently executed tasks are waiting for the resources that are held + // by the yielded tasks. + return Optional.of(this); + } + if (iterator == null) { iterator = input.iterator(); } while (iterator.hasNext()) { - if (queue.size() >= approximateMaxQueueSize) { - // Yield when queue is over the size limit. Task will be resubmitted later and continue - // the work. - return Optional.of(this); - } - T next = iterator.next(); if (closed.get()) { break; diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 410e33058d0c..a1e14a22a74d 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -39,6 +40,7 @@ import org.apache.iceberg.util.ParallelIterable.ParallelIterator; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; public class TestParallelIterable { @Test @@ -143,7 +145,7 @@ public CloseableIterator iterator() { @Test public void limitQueueSize() { - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newSingleThreadExecutor(); try { List> iterables = ImmutableList.of( @@ -167,7 +169,7 @@ public void limitQueueSize() { while (iterator.hasNext()) { assertThat(iterator.queueSize()) .as("iterator internal queue size") - .isLessThanOrEqualTo(maxQueueSize + iterables.size()); + .isLessThanOrEqualTo(100); actualValues.add(iterator.next()); } @@ -182,41 +184,61 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - ExecutorService executor = Executors.newCachedThreadPool(); + @Timeout(10) + public void noDeadlock() { + // This test simulates a scenario where iterators use a constrained resource + // (e.g. an S3 connection pool that has a limit on the number of connections). + // In this case, the constrained resource shouldn't cause a deadlock when queue + // is full and the iterator is waiting for the queue to be drained. + ExecutorService executor = Executors.newFixedThreadPool(1); try { - List> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + Semaphore semaphore = new Semaphore(1); - Multiset expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); + List> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); - ParallelIterable parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator iterator = (ParallelIterator) parallelIterable.iterator(); + ParallelIterable parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); - Multiset actualValues = HashMultiset.create(); + parallelIterableA.iterator().next(); + parallelIterableB.iterator().next(); + } finally { + executor.shutdownNow(); + } + } - while (iterator.hasNext()) { - assertThat(iterator.queueSize()) - .as("iterator internal queue size") - .isLessThanOrEqualTo(1 + iterables.size()); - actualValues.add(iterator.next()); + private CloseableIterable testIterable( + RunnableWithException open, RunnableWithException close, Iterator iterator) { + return new CloseableIterable() { + @Override + public void close() { + try { + close.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } } - assertThat(actualValues) - .as("multiset of values returned by the iterator") - .isEqualTo(expectedValues); + @Override + public CloseableIterator iterator() { + try { + open.run(); + return CloseableIterator.withClose(iterator); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } - iterator.close(); - } finally { - executor.shutdown(); - } + private interface RunnableWithException { + void run() throws Exception; } private void queueHasElements(ParallelIterator iterator) {