Skip to content

Commit

Permalink
experimental special executor for nuanced zkCallback parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
magibney committed Oct 31, 2024
1 parent d6a38cf commit 80885d3
Showing 1 changed file with 179 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -88,8 +95,7 @@ public MapWriter getMetrics() {
return metrics::writeMap;
}

private final ExecutorService zkCallbackExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("zkCallback"));
private final ExecutorService zkCallbackExecutor;
private final ExecutorService zkConnManagerCallbackExecutor =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("zkConnectionManagerCallback"));
Expand Down Expand Up @@ -134,6 +140,7 @@ private SolrZkClient(
Compressor compressor,
SolrClassLoader solrClassLoader) {

zkCallbackExecutor = getZkCallbackExecutor();
if (zkServerAddress == null) {
// only tests should create one without server address
return;
Expand Down Expand Up @@ -223,6 +230,176 @@ private SolrZkClient(
}
}

/**
* Set the core pool size fairly low -- we set `allowCoreThreadTimeout=true`, so the
* only purpose of limited core pool size is to determine the threshold at which to
* place tasks on the queue. The pool size can exceed the number of available
* processors, however, because some of the tasks are likely to involve async waiting.
*/
private static final int ZK_CALLBACK_CORE_POOL_SIZE = 8;

/**
* Queue size should be very large; we know we're not going to deadlock, and beyond a
* certain threshold, executing tasks "immediately" simply means more thread scheduling
* contention, and actually hurts throughput. With deadlock detection in place, it's
* possible we could even have this queue be unbounded...
*/
private static final int ZK_CALLBACK_QUEUE_SIZE = 32768;

/**
* Interval (in millis) at which we poll the queue for its progress, in order to
* detect possible deadlock if insufficient progress was made in the preceding
* interval.
*/
private static final long DEADLOCK_DETECTION_INTERVAL_MILLIS = 1000L;

/**
* If we've processed fewer than this number of tasks in the deadlock detection interval,
* then assume we may be deadlocked, and take action to address the situation.
*/
private static final long PROGRESS_THRESHOLD = 128;

/**
* Upon detecting possible deadlock, this is the number of threads that should be sent
* from the queue to the `fallback` executor.
*/
private static final int DEADLOCK_BATCH_SIZE = 128;

private static ExecutorUtil.MDCAwareThreadPoolExecutor getZkCallbackExecutor() {
LongAdder removed = new LongAdder();
// work queue tracks the number of tasks that have been pulled off for execution
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(ZK_CALLBACK_QUEUE_SIZE) {
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
removed.increment();
return super.poll(timeout, unit);
}
@Override
public Runnable poll() {
removed.increment();
return super.poll();
}
@Override
public Runnable take() throws InterruptedException {
removed.increment();
return super.take();
}
@Override
public boolean remove(Object o) {
removed.increment();
return super.remove(o);
}
};
ThreadFactory threadFactory = new SolrNamedThreadFactory("zkCallback");
// set a much shorter keepAlive time on the overflow `fallback` executor.
ExecutorService fallback = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
RejectedExecutionHandler handler = (r, executor) -> {
// we want tasks executed roughly in order (as far as we can manage it -- at
// least ideally we don't want to make the situation _worse_), so pull tasks
// for immediate execution from the head of the `workQueue` and add ours to
// the end.
do {
Runnable other = workQueue.poll();
if (other != null) {
// use `fallback.execute()` here, not `fallback.submit()`, because `other`
// should already be a `RunnableFuture`.
fallback.execute(other);
}
// we must add directly to `workQueue` here instead of `executor.submit()`,
// because we must loop in order to avoid StackOverflowError. Also, `r`
// should already be a `RunnableFuture`, so this is perfectly fine.
} while (!workQueue.add(r));
};
CountDownLatch shutdownLatch = new CountDownLatch(1);
ExecutorUtil.MDCAwareThreadPoolExecutor exec = new ExecutorUtil.MDCAwareThreadPoolExecutor(ZK_CALLBACK_CORE_POOL_SIZE, ZK_CALLBACK_CORE_POOL_SIZE, 60L, TimeUnit.SECONDS, workQueue, threadFactory, handler) {
@Override
public void shutdown() {
shutdownLatch.countDown();
try {
fallback.shutdown();
} finally {
super.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
shutdownLatch.countDown();
List<Runnable> runnables = null;
try {
runnables = super.shutdownNow();
} finally {
if (runnables == null) {
runnables = fallback.shutdownNow();
} else {
runnables.addAll(fallback.shutdownNow());
}
}
return runnables;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long timeoutNanos = unit.toNanos(timeout);
boolean ret = false;
long start = System.nanoTime();
try {
ret = fallback.awaitTermination(timeoutNanos, TimeUnit.NANOSECONDS);
} finally {
ret &= super.awaitTermination(timeoutNanos - (System.nanoTime() - start), TimeUnit.NANOSECONDS);
}
return ret;
}
};
exec.allowCoreThreadTimeOut(true);
fallback.submit(() -> {
long lastRemoved = 0;
int deadlockBatchSize = DEADLOCK_BATCH_SIZE;
while (!shutdownLatch.await(DEADLOCK_DETECTION_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)) {
try {
long currentRemoved = removed.sum();
int size = workQueue.size();
if (size == 0) {
// backset this to ensure that we don't detect deadlock on
// the very next iteration
lastRemoved = currentRemoved - PROGRESS_THRESHOLD;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
long diffRemoved = currentRemoved - lastRemoved;
if (diffRemoved > PROGRESS_THRESHOLD) {
// we are progressing quickly enough
lastRemoved = currentRemoved;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
// we are progressing slowly ... find out why
if (diffRemoved > size) {
// we haven't removed many, but we removed more during the last interval
// than the current size of the queue (thus may expect to do the same
// during the next interval), so no intervention is needed.
// This protects against the case where a slow/steady drip of tasks is
// coming in, and low `diffRemoved` is simply due to not having that many
// tasks.
lastRemoved = currentRemoved;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
// possible deadlock detected; delegate a batch of tasks to the `fallback`
// executor to try to unblock things
Runnable head;
for (int i = deadlockBatchSize; i > 0 && (head = workQueue.poll()) != null; i--) {
fallback.submit(head);
}
deadlockBatchSize += DEADLOCK_BATCH_SIZE; // increase linearly if we're still blocked
lastRemoved = removed.sum();
}
}
}
} catch (Throwable t) {
log.warn("error polling workQueue", t);
}
}
return null;
});
return exec;
}

public ConnectionManager getConnectionManager() {
return connManager;
}
Expand Down

0 comments on commit 80885d3

Please sign in to comment.