From 80885d31c717bcbea2246a2cb6a578c9a932a690 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Thu, 31 Oct 2024 12:45:54 -0400 Subject: [PATCH] experimental special executor for nuanced zkCallback parallelism --- .../solr/common/cloud/SolrZkClient.java | 181 +++++++++++++++++- 1 file changed, 179 insertions(+), 2 deletions(-) diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java index b7aeec05b7d..27f80542bc5 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -25,8 +25,15 @@ import java.nio.file.Path; import java.util.List; import java.util.Locale; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; @@ -88,8 +95,7 @@ public MapWriter getMetrics() { return metrics::writeMap; } - private final ExecutorService zkCallbackExecutor = - ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("zkCallback")); + private final ExecutorService zkCallbackExecutor; private final ExecutorService zkConnManagerCallbackExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor( new SolrNamedThreadFactory("zkConnectionManagerCallback")); @@ -134,6 +140,7 @@ private SolrZkClient( Compressor compressor, SolrClassLoader solrClassLoader) { + zkCallbackExecutor = getZkCallbackExecutor(); if (zkServerAddress == null) { // only tests should create one without server address return; @@ -223,6 +230,176 @@ private SolrZkClient( } } + /** + * Set the core pool size fairly low -- we set `allowCoreThreadTimeout=true`, so the + * only purpose of limited core pool size is to determine the threshold at which to + * place tasks on the queue. The pool size can exceed the number of available + * processors, however, because some of the tasks are likely to involve async waiting. + */ + private static final int ZK_CALLBACK_CORE_POOL_SIZE = 8; + + /** + * Queue size should be very large; we know we're not going to deadlock, and beyond a + * certain threshold, executing tasks "immediately" simply means more thread scheduling + * contention, and actually hurts throughput. With deadlock detection in place, it's + * possible we could even have this queue be unbounded... + */ + private static final int ZK_CALLBACK_QUEUE_SIZE = 32768; + + /** + * Interval (in millis) at which we poll the queue for its progress, in order to + * detect possible deadlock if insufficient progress was made in the preceding + * interval. + */ + private static final long DEADLOCK_DETECTION_INTERVAL_MILLIS = 1000L; + + /** + * If we've processed fewer than this number of tasks in the deadlock detection interval, + * then assume we may be deadlocked, and take action to address the situation. + */ + private static final long PROGRESS_THRESHOLD = 128; + + /** + * Upon detecting possible deadlock, this is the number of threads that should be sent + * from the queue to the `fallback` executor. + */ + private static final int DEADLOCK_BATCH_SIZE = 128; + + private static ExecutorUtil.MDCAwareThreadPoolExecutor getZkCallbackExecutor() { + LongAdder removed = new LongAdder(); + // work queue tracks the number of tasks that have been pulled off for execution + BlockingQueue workQueue = new ArrayBlockingQueue<>(ZK_CALLBACK_QUEUE_SIZE) { + @Override + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { + removed.increment(); + return super.poll(timeout, unit); + } + @Override + public Runnable poll() { + removed.increment(); + return super.poll(); + } + @Override + public Runnable take() throws InterruptedException { + removed.increment(); + return super.take(); + } + @Override + public boolean remove(Object o) { + removed.increment(); + return super.remove(o); + } + }; + ThreadFactory threadFactory = new SolrNamedThreadFactory("zkCallback"); + // set a much shorter keepAlive time on the overflow `fallback` executor. + ExecutorService fallback = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); + RejectedExecutionHandler handler = (r, executor) -> { + // we want tasks executed roughly in order (as far as we can manage it -- at + // least ideally we don't want to make the situation _worse_), so pull tasks + // for immediate execution from the head of the `workQueue` and add ours to + // the end. + do { + Runnable other = workQueue.poll(); + if (other != null) { + // use `fallback.execute()` here, not `fallback.submit()`, because `other` + // should already be a `RunnableFuture`. + fallback.execute(other); + } + // we must add directly to `workQueue` here instead of `executor.submit()`, + // because we must loop in order to avoid StackOverflowError. Also, `r` + // should already be a `RunnableFuture`, so this is perfectly fine. + } while (!workQueue.add(r)); + }; + CountDownLatch shutdownLatch = new CountDownLatch(1); + ExecutorUtil.MDCAwareThreadPoolExecutor exec = new ExecutorUtil.MDCAwareThreadPoolExecutor(ZK_CALLBACK_CORE_POOL_SIZE, ZK_CALLBACK_CORE_POOL_SIZE, 60L, TimeUnit.SECONDS, workQueue, threadFactory, handler) { + @Override + public void shutdown() { + shutdownLatch.countDown(); + try { + fallback.shutdown(); + } finally { + super.shutdown(); + } + } + @Override + public List shutdownNow() { + shutdownLatch.countDown(); + List runnables = null; + try { + runnables = super.shutdownNow(); + } finally { + if (runnables == null) { + runnables = fallback.shutdownNow(); + } else { + runnables.addAll(fallback.shutdownNow()); + } + } + return runnables; + } + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long timeoutNanos = unit.toNanos(timeout); + boolean ret = false; + long start = System.nanoTime(); + try { + ret = fallback.awaitTermination(timeoutNanos, TimeUnit.NANOSECONDS); + } finally { + ret &= super.awaitTermination(timeoutNanos - (System.nanoTime() - start), TimeUnit.NANOSECONDS); + } + return ret; + } + }; + exec.allowCoreThreadTimeOut(true); + fallback.submit(() -> { + long lastRemoved = 0; + int deadlockBatchSize = DEADLOCK_BATCH_SIZE; + while (!shutdownLatch.await(DEADLOCK_DETECTION_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)) { + try { + long currentRemoved = removed.sum(); + int size = workQueue.size(); + if (size == 0) { + // backset this to ensure that we don't detect deadlock on + // the very next iteration + lastRemoved = currentRemoved - PROGRESS_THRESHOLD; + deadlockBatchSize = DEADLOCK_BATCH_SIZE; + } else { + long diffRemoved = currentRemoved - lastRemoved; + if (diffRemoved > PROGRESS_THRESHOLD) { + // we are progressing quickly enough + lastRemoved = currentRemoved; + deadlockBatchSize = DEADLOCK_BATCH_SIZE; + } else { + // we are progressing slowly ... find out why + if (diffRemoved > size) { + // we haven't removed many, but we removed more during the last interval + // than the current size of the queue (thus may expect to do the same + // during the next interval), so no intervention is needed. + // This protects against the case where a slow/steady drip of tasks is + // coming in, and low `diffRemoved` is simply due to not having that many + // tasks. + lastRemoved = currentRemoved; + deadlockBatchSize = DEADLOCK_BATCH_SIZE; + } else { + // possible deadlock detected; delegate a batch of tasks to the `fallback` + // executor to try to unblock things + Runnable head; + for (int i = deadlockBatchSize; i > 0 && (head = workQueue.poll()) != null; i--) { + fallback.submit(head); + } + deadlockBatchSize += DEADLOCK_BATCH_SIZE; // increase linearly if we're still blocked + lastRemoved = removed.sum(); + } + } + } + } catch (Throwable t) { + log.warn("error polling workQueue", t); + } + } + return null; + }); + return exec; + } + public ConnectionManager getConnectionManager() { return connManager; }