Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAI-5086: experimental special executor for nuanced zkCallback parallelism #237

Draft
wants to merge 1 commit into
base: fs/branch_9_3
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -88,8 +95,7 @@ public MapWriter getMetrics() {
return metrics::writeMap;
}

private final ExecutorService zkCallbackExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("zkCallback"));
private final ExecutorService zkCallbackExecutor;
private final ExecutorService zkConnManagerCallbackExecutor =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("zkConnectionManagerCallback"));
Expand Down Expand Up @@ -134,6 +140,7 @@ private SolrZkClient(
Compressor compressor,
SolrClassLoader solrClassLoader) {

zkCallbackExecutor = getZkCallbackExecutor();
if (zkServerAddress == null) {
// only tests should create one without server address
return;
Expand Down Expand Up @@ -223,6 +230,176 @@ private SolrZkClient(
}
}

/**
* Set the core pool size fairly low -- we set `allowCoreThreadTimeout=true`, so the
* only purpose of limited core pool size is to determine the threshold at which to
* place tasks on the queue. The pool size can exceed the number of available
* processors, however, because some of the tasks are likely to involve async waiting.
*/
private static final int ZK_CALLBACK_CORE_POOL_SIZE = 8;

/**
* Queue size should be very large; we know we're not going to deadlock, and beyond a
* certain threshold, executing tasks "immediately" simply means more thread scheduling
* contention, and actually hurts throughput. With deadlock detection in place, it's
* possible we could even have this queue be unbounded...
*/
private static final int ZK_CALLBACK_QUEUE_SIZE = 32768;

/**
* Interval (in millis) at which we poll the queue for its progress, in order to
* detect possible deadlock if insufficient progress was made in the preceding
* interval.
*/
private static final long DEADLOCK_DETECTION_INTERVAL_MILLIS = 1000L;

/**
* If we've processed fewer than this number of tasks in the deadlock detection interval,
* then assume we may be deadlocked, and take action to address the situation.
*/
private static final long PROGRESS_THRESHOLD = 128;

/**
* Upon detecting possible deadlock, this is the number of threads that should be sent
* from the queue to the `fallback` executor.
*/
private static final int DEADLOCK_BATCH_SIZE = 128;

private static ExecutorUtil.MDCAwareThreadPoolExecutor getZkCallbackExecutor() {
LongAdder removed = new LongAdder();
// work queue tracks the number of tasks that have been pulled off for execution
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(ZK_CALLBACK_QUEUE_SIZE) {
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
removed.increment();
return super.poll(timeout, unit);
}
@Override
public Runnable poll() {
removed.increment();
return super.poll();
}
@Override
public Runnable take() throws InterruptedException {
removed.increment();
return super.take();
}
@Override
public boolean remove(Object o) {
removed.increment();
return super.remove(o);
}
};
ThreadFactory threadFactory = new SolrNamedThreadFactory("zkCallback");
// set a much shorter keepAlive time on the overflow `fallback` executor.
ExecutorService fallback = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
RejectedExecutionHandler handler = (r, executor) -> {
// we want tasks executed roughly in order (as far as we can manage it -- at
// least ideally we don't want to make the situation _worse_), so pull tasks
// for immediate execution from the head of the `workQueue` and add ours to
// the end.
do {
Runnable other = workQueue.poll();
if (other != null) {
// use `fallback.execute()` here, not `fallback.submit()`, because `other`
// should already be a `RunnableFuture`.
fallback.execute(other);
}
// we must add directly to `workQueue` here instead of `executor.submit()`,
// because we must loop in order to avoid StackOverflowError. Also, `r`
// should already be a `RunnableFuture`, so this is perfectly fine.
} while (!workQueue.add(r));
};
CountDownLatch shutdownLatch = new CountDownLatch(1);
ExecutorUtil.MDCAwareThreadPoolExecutor exec = new ExecutorUtil.MDCAwareThreadPoolExecutor(ZK_CALLBACK_CORE_POOL_SIZE, ZK_CALLBACK_CORE_POOL_SIZE, 60L, TimeUnit.SECONDS, workQueue, threadFactory, handler) {
@Override
public void shutdown() {
shutdownLatch.countDown();
try {
fallback.shutdown();
} finally {
super.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
shutdownLatch.countDown();
List<Runnable> runnables = null;
try {
runnables = super.shutdownNow();
} finally {
if (runnables == null) {
runnables = fallback.shutdownNow();
} else {
runnables.addAll(fallback.shutdownNow());
}
}
return runnables;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long timeoutNanos = unit.toNanos(timeout);
boolean ret = false;
long start = System.nanoTime();
try {
ret = fallback.awaitTermination(timeoutNanos, TimeUnit.NANOSECONDS);
} finally {
ret &= super.awaitTermination(timeoutNanos - (System.nanoTime() - start), TimeUnit.NANOSECONDS);
}
return ret;
}
};
exec.allowCoreThreadTimeOut(true);
fallback.submit(() -> {
long lastRemoved = 0;
int deadlockBatchSize = DEADLOCK_BATCH_SIZE;
while (!shutdownLatch.await(DEADLOCK_DETECTION_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)) {
try {
long currentRemoved = removed.sum();
int size = workQueue.size();
if (size == 0) {
// backset this to ensure that we don't detect deadlock on
// the very next iteration
lastRemoved = currentRemoved - PROGRESS_THRESHOLD;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
long diffRemoved = currentRemoved - lastRemoved;
if (diffRemoved > PROGRESS_THRESHOLD) {
// we are progressing quickly enough
lastRemoved = currentRemoved;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
// we are progressing slowly ... find out why
if (diffRemoved > size) {
// we haven't removed many, but we removed more during the last interval
// than the current size of the queue (thus may expect to do the same
// during the next interval), so no intervention is needed.
// This protects against the case where a slow/steady drip of tasks is
// coming in, and low `diffRemoved` is simply due to not having that many
// tasks.
lastRemoved = currentRemoved;
deadlockBatchSize = DEADLOCK_BATCH_SIZE;
} else {
// possible deadlock detected; delegate a batch of tasks to the `fallback`
// executor to try to unblock things
Runnable head;
for (int i = deadlockBatchSize; i > 0 && (head = workQueue.poll()) != null; i--) {
fallback.submit(head);
}
deadlockBatchSize += DEADLOCK_BATCH_SIZE; // increase linearly if we're still blocked
lastRemoved = removed.sum();
}
}
}
} catch (Throwable t) {
log.warn("error polling workQueue", t);
}
}
return null;
});
return exec;
}

public ConnectionManager getConnectionManager() {
return connManager;
}
Expand Down
Loading