diff --git a/src/main/java/net/dv8tion/jda/api/JDABuilder.java b/src/main/java/net/dv8tion/jda/api/JDABuilder.java index fea73990d1..170ee79607 100644 --- a/src/main/java/net/dv8tion/jda/api/JDABuilder.java +++ b/src/main/java/net/dv8tion/jda/api/JDABuilder.java @@ -63,8 +63,10 @@ public class JDABuilder protected final List listeners = new LinkedList<>(); protected final EnumSet automaticallyDisabled = EnumSet.noneOf(CacheFlag.class); - protected ScheduledExecutorService rateLimitPool = null; - protected boolean shutdownRateLimitPool = true; + protected ScheduledExecutorService rateLimitScheduler = null; + protected boolean shutdownRateLimitScheduler = true; + protected ExecutorService rateLimitElastic = null; + protected boolean shutdownRateLimitElastic = true; protected ScheduledExecutorService mainWsPool = null; protected boolean shutdownMainWsPool = true; protected ExecutorService callbackPool = null; @@ -913,8 +915,13 @@ public JDABuilder setWebsocketFactory(@Nullable WebSocketFactory factory) * The thread-pool to use for rate-limit handling * * @return The JDABuilder instance. Useful for chaining. + * + * @deprecated This pool is now split into two pools. + * You should use {@link #setRateLimitScheduler(ScheduledExecutorService)} and {@link #setRateLimitElastic(ExecutorService)} instead. */ @Nonnull + @Deprecated + @ReplaceWith("setRateLimitScheduler(pool)") public JDABuilder setRateLimitPool(@Nullable ScheduledExecutorService pool) { return setRateLimitPool(pool, pool == null); @@ -937,12 +944,113 @@ public JDABuilder setRateLimitPool(@Nullable ScheduledExecutorService pool) * Whether {@link JDA#shutdown()} should shutdown this pool * * @return The JDABuilder instance. Useful for chaining. + * + * @deprecated This pool is now split into two pools. + * You should use {@link #setRateLimitScheduler(ScheduledExecutorService, boolean)} and {@link #setRateLimitElastic(ExecutorService, boolean)} instead. */ @Nonnull + @Deprecated + @ReplaceWith("setRateLimitScheduler(pool, automaticShutdown)") public JDABuilder setRateLimitPool(@Nullable ScheduledExecutorService pool, boolean automaticShutdown) { - this.rateLimitPool = pool; - this.shutdownRateLimitPool = automaticShutdown; + this.rateLimitScheduler = pool; + this.shutdownRateLimitScheduler = automaticShutdown; + return this; + } + + /** + * Sets the {@link ScheduledExecutorService ScheduledExecutorService} that should be used in + * the JDA rate-limit handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This automatically disables the automatic shutdown of the rate-limit pool, you can enable + * it using {@link #setRateLimitPool(ScheduledExecutorService, boolean) setRateLimitPool(executor, true)} + * + *

This is used mostly by the Rate-Limiter to handle backoff delays by using scheduled executions. + * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} + * and similar methods. Requests are handed off to the {@link #setRateLimitElastic(ExecutorService) elastic pool} for blocking execution. + * + *

Default: {@link ScheduledThreadPoolExecutor} with 2 threads. + * + * @param pool + * The thread-pool to use for rate-limit handling + * + * @return The JDABuilder instance. Useful for chaining. + */ + @Nonnull + public JDABuilder setRateLimitScheduler(@Nullable ScheduledExecutorService pool) + { + return setRateLimitScheduler(pool, pool == null); + } + + /** + * Sets the {@link ScheduledExecutorService ScheduledExecutorService} that should be used in + * the JDA rate-limit handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + * + *

This is used mostly by the Rate-Limiter to handle backoff delays by using scheduled executions. + * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} + * and similar methods. Requests are handed off to the {@link #setRateLimitElastic(ExecutorService) elastic pool} for blocking execution. + * + *

Default: {@link ScheduledThreadPoolExecutor} with 2 threads. + * + * @param pool + * The thread-pool to use for rate-limit handling + * @param automaticShutdown + * Whether {@link JDA#shutdown()} should shutdown this pool + * + * @return The JDABuilder instance. Useful for chaining. + */ + @Nonnull + public JDABuilder setRateLimitScheduler(@Nullable ScheduledExecutorService pool, boolean automaticShutdown) + { + this.rateLimitScheduler = pool; + this.shutdownRateLimitScheduler = automaticShutdown; + return this; + } + + /** + * Sets the {@link ExecutorService ExecutorService} that should be used in + * the JDA request handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This automatically disables the automatic shutdown of the rate-limit elastic pool, you can enable + * it using {@link #setRateLimitElastic(ExecutorService, boolean) setRateLimitElastic(executor, true)} + * + *

This is used mostly by the Rate-Limiter to execute the blocking HTTP requests at runtime. + * + *

Default: {@link Executors#newCachedThreadPool()}. + * + * @param pool + * The thread-pool to use for executing http requests + * + * @return The JDABuilder instance. Useful for chaining. + */ + @Nonnull + public JDABuilder setRateLimitElastic(@Nullable ExecutorService pool) + { + return setRateLimitElastic(pool, pool == null); + } + + /** + * Sets the {@link ExecutorService ExecutorService} that should be used in + * the JDA request handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + * + *

This is used mostly by the Rate-Limiter to execute the blocking HTTP requests at runtime. + * + *

Default: {@link Executors#newCachedThreadPool()}. + * + * @param pool + * The thread-pool to use for executing http requests + * @param automaticShutdown + * Whether {@link JDA#shutdown()} should shutdown this pool + * + * @return The JDABuilder instance. Useful for chaining. + */ + @Nonnull + public JDABuilder setRateLimitElastic(@Nullable ExecutorService pool, boolean automaticShutdown) + { + this.rateLimitElastic = pool; + this.shutdownRateLimitElastic = automaticShutdown; return this; } @@ -1797,7 +1905,8 @@ public JDA build() ThreadingConfig threadingConfig = new ThreadingConfig(); threadingConfig.setCallbackPool(callbackPool, shutdownCallbackPool); threadingConfig.setGatewayPool(mainWsPool, shutdownMainWsPool); - threadingConfig.setRateLimitPool(rateLimitPool, shutdownRateLimitPool); + threadingConfig.setRateLimitScheduler(rateLimitScheduler, shutdownRateLimitScheduler); + threadingConfig.setRateLimitElastic(rateLimitElastic, shutdownRateLimitElastic); threadingConfig.setEventPool(eventPool, shutdownEventPool); threadingConfig.setAudioPool(audioPool, shutdownAudioPool); SessionConfig sessionConfig = new SessionConfig(controller, httpClient, wsFactory, voiceDispatchInterceptor, flags, maxReconnectDelay, largeThreshold); diff --git a/src/main/java/net/dv8tion/jda/api/exceptions/InvalidTokenException.java b/src/main/java/net/dv8tion/jda/api/exceptions/InvalidTokenException.java index af96b95a33..573cd151f4 100644 --- a/src/main/java/net/dv8tion/jda/api/exceptions/InvalidTokenException.java +++ b/src/main/java/net/dv8tion/jda/api/exceptions/InvalidTokenException.java @@ -26,18 +26,6 @@ public class InvalidTokenException extends RuntimeException */ public InvalidTokenException() { - super(); + super("The provided token is invalid!"); } - - /** - * Constructs an {@code InvalidTokenException} with the specified detail message. - * - * @param message - * The detail message. - */ - public InvalidTokenException(String message) - { - super(message); - } - } diff --git a/src/main/java/net/dv8tion/jda/api/requests/RestRateLimiter.java b/src/main/java/net/dv8tion/jda/api/requests/RestRateLimiter.java index 0673dde00a..c87d710a0b 100644 --- a/src/main/java/net/dv8tion/jda/api/requests/RestRateLimiter.java +++ b/src/main/java/net/dv8tion/jda/api/requests/RestRateLimiter.java @@ -16,12 +16,14 @@ package net.dv8tion.jda.api.requests; +import net.dv8tion.jda.annotations.ReplaceWith; import net.dv8tion.jda.api.JDA; import okhttp3.Response; import org.jetbrains.annotations.Blocking; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; @@ -248,13 +250,20 @@ public void setCloudflare(long timestamp) */ class RateLimitConfig { - private final ScheduledExecutorService pool; + private final ScheduledExecutorService scheduler; + private final ExecutorService elastic; private final GlobalRateLimit globalRateLimit; private final boolean isRelative; - public RateLimitConfig(@Nonnull ScheduledExecutorService pool, @Nonnull GlobalRateLimit globalRateLimit, boolean isRelative) + public RateLimitConfig(@Nonnull ScheduledExecutorService scheduler, @Nonnull GlobalRateLimit globalRateLimit, boolean isRelative) { - this.pool = pool; + this(scheduler, scheduler, globalRateLimit, isRelative); + } + + public RateLimitConfig(@Nonnull ScheduledExecutorService scheduler, @Nonnull ExecutorService elastic, @Nonnull GlobalRateLimit globalRateLimit, boolean isRelative) + { + this.scheduler = scheduler; + this.elastic = elastic; this.globalRateLimit = globalRateLimit; this.isRelative = isRelative; } @@ -265,9 +274,36 @@ public RateLimitConfig(@Nonnull ScheduledExecutorService pool, @Nonnull GlobalRa * @return The {@link ScheduledExecutorService} */ @Nonnull + @Deprecated + @ReplaceWith("getScheduler() or getElastic()") public ScheduledExecutorService getPool() { - return pool; + return scheduler; + } + + /** + * The {@link ScheduledExecutorService} used to schedule rate-limit tasks. + * + * @return The {@link ScheduledExecutorService} + */ + @Nonnull + public ScheduledExecutorService getScheduler() + { + return scheduler; + } + + /** + * The elastic {@link ExecutorService} used to execute rate-limit tasks. + *
This pool can potentially scale up and down depending on use. + * + *

It is also possible that this pool is identical to {@link #getScheduler()}. + * + * @return The elastic {@link ExecutorService} + */ + @Nonnull + public ExecutorService getElastic() + { + return elastic; } /** diff --git a/src/main/java/net/dv8tion/jda/api/requests/SequentialRestRateLimiter.java b/src/main/java/net/dv8tion/jda/api/requests/SequentialRestRateLimiter.java index bafc368ab8..1068ffb9fe 100644 --- a/src/main/java/net/dv8tion/jda/api/requests/SequentialRestRateLimiter.java +++ b/src/main/java/net/dv8tion/jda/api/requests/SequentialRestRateLimiter.java @@ -24,10 +24,7 @@ import javax.annotation.Nonnull; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; /** @@ -89,7 +86,7 @@ public final class SequentialRestRateLimiter implements RestRateLimiter public SequentialRestRateLimiter(@Nonnull RateLimitConfig config) { this.config = config; - this.cleanupWorker = config.getPool().scheduleAtFixedRate(this::cleanup, 30, 30, TimeUnit.SECONDS); + this.cleanupWorker = config.getScheduler().scheduleAtFixedRate(this::cleanup, 30, 30, TimeUnit.SECONDS); } @Override @@ -225,6 +222,35 @@ private Bucket getBucket(Route.CompiledRoute route) }); } + private void scheduleElastic(Bucket bucket) + { + if (isShutdown) + return; + + ExecutorService elastic = config.getElastic(); + ScheduledExecutorService scheduler = config.getScheduler(); + + try + { + // Avoid context switch if unnecessary + if (elastic == scheduler) + bucket.run(); + else + elastic.execute(bucket); + } + catch (RejectedExecutionException ex) + { + if (!isShutdown) + log.error("Failed to execute bucket worker", ex); + } + catch (Throwable t) + { + log.error("Caught throwable in bucket worker", t); + if (t instanceof Error) + throw t; + } + } + private void runBucket(Bucket bucket) { if (isShutdown) @@ -232,7 +258,10 @@ private void runBucket(Bucket bucket) // Schedule a new bucket worker if no worker is running MiscUtil.locked(lock, () -> rateLimitQueue.computeIfAbsent(bucket, - k -> config.getPool().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS))); + k -> config.getScheduler().schedule( + () -> scheduleElastic(bucket), + bucket.getRateLimit(), TimeUnit.MILLISECONDS)) + ); } private long parseLong(String input) diff --git a/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManager.java b/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManager.java index 1e6eb0f592..2ee98b3ff9 100644 --- a/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManager.java +++ b/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManager.java @@ -22,18 +22,18 @@ import net.dv8tion.jda.api.entities.Guild; import net.dv8tion.jda.api.entities.SelfUser; import net.dv8tion.jda.api.exceptions.InvalidTokenException; -import net.dv8tion.jda.api.requests.GatewayIntent; -import net.dv8tion.jda.api.requests.RestConfig; -import net.dv8tion.jda.api.requests.Route; +import net.dv8tion.jda.api.requests.*; import net.dv8tion.jda.api.utils.ChunkingFilter; import net.dv8tion.jda.api.utils.MiscUtil; import net.dv8tion.jda.api.utils.SessionController; import net.dv8tion.jda.api.utils.cache.ShardCacheView; +import net.dv8tion.jda.api.utils.data.DataObject; import net.dv8tion.jda.internal.JDAImpl; import net.dv8tion.jda.internal.entities.SelfUserImpl; import net.dv8tion.jda.internal.managers.PresenceImpl; import net.dv8tion.jda.internal.requests.RestActionImpl; import net.dv8tion.jda.internal.utils.Checks; +import net.dv8tion.jda.internal.utils.IOUtil; import net.dv8tion.jda.internal.utils.JDALogger; import net.dv8tion.jda.internal.utils.UnlockHook; import net.dv8tion.jda.internal.utils.cache.ShardCacheViewImpl; @@ -42,17 +42,23 @@ import net.dv8tion.jda.internal.utils.config.SessionConfig; import net.dv8tion.jda.internal.utils.config.ThreadingConfig; import net.dv8tion.jda.internal.utils.config.sharding.*; +import okhttp3.Call; import okhttp3.OkHttpClient; import org.slf4j.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.IntFunction; /** @@ -366,6 +372,9 @@ public void shutdown() { this.executor.shutdown(); } + + // Shutdown shared pools + this.threadingConfig.shutdown(); } @Override @@ -486,10 +495,17 @@ protected JDAImpl buildInstance(final int shardId) httpClient = sessionConfig.getHttpBuilder().build(); } + retrieveShardTotal(httpClient); + threadingConfig.init(queue.isEmpty() ? getShardsTotal() : queue.size()); + // imagine if we had macros or closures or destructuring :) - ExecutorPair rateLimitPair = resolveExecutor(threadingConfig.getRateLimitPoolProvider(), shardId); - ScheduledExecutorService rateLimitPool = rateLimitPair.executor; - boolean shutdownRateLimitPool = rateLimitPair.automaticShutdown; + ExecutorPair rateLimitSchedulerPair = resolveExecutor(threadingConfig.getRateLimitSchedulerProvider(), shardId); + ScheduledExecutorService rateLimitScheduler = rateLimitSchedulerPair.executor; + boolean shutdownRateLimitScheduler = rateLimitSchedulerPair.automaticShutdown; + + ExecutorPair rateLimitElasticPair = resolveExecutor(threadingConfig.getRateLimitElasticProvider(), shardId); + ExecutorService rateLimitElastic = rateLimitElasticPair.executor; + boolean shutdownRateLimitElastic = rateLimitElasticPair.automaticShutdown; ExecutorPair gatewayPair = resolveExecutor(threadingConfig.getGatewayPoolProvider(), shardId); ScheduledExecutorService gatewayPool = gatewayPair.executor; @@ -510,7 +526,8 @@ protected JDAImpl buildInstance(final int shardId) AuthorizationConfig authConfig = new AuthorizationConfig(token); SessionConfig sessionConfig = this.sessionConfig.toSessionConfig(httpClient); ThreadingConfig threadingConfig = new ThreadingConfig(); - threadingConfig.setRateLimitPool(rateLimitPool, shutdownRateLimitPool); + threadingConfig.setRateLimitScheduler(rateLimitScheduler, shutdownRateLimitScheduler); + threadingConfig.setRateLimitElastic(rateLimitElastic, shutdownRateLimitElastic); threadingConfig.setGatewayPool(gatewayPool, shutdownGatewayPool); threadingConfig.setCallbackPool(callbackPool, shutdownCallbackPool); threadingConfig.setEventPool(eventPool, shutdownEventPool); @@ -557,21 +574,9 @@ protected JDAImpl buildInstance(final int shardId) this.sessionConfig.getSessionController().setConcurrency(gateway.getConcurrency()); this.gatewayURL = gateway.getUrl(); if (this.gatewayURL == null) - LOG.error("Acquired null gateway url from SessionController"); + throw new IllegalStateException("Acquired null gateway url from SessionController"); else LOG.info("Login Successful!"); - - if (getShardsTotal() == -1) - { - shardingConfig.setShardsTotal(gateway.getShardTotal()); - this.shards = new ShardCacheViewImpl(getShardsTotal()); - - synchronized (queue) - { - for (int i = 0; i < getShardsTotal(); i++) - queue.add(i); - } - } } final JDA.ShardInfo shardInfo = new JDA.ShardInfo(shardId, getShardsTotal()); @@ -591,9 +596,7 @@ protected JDAImpl buildInstance(final int shardId) jda.setSelfUser(selfUser); jda.setStatus(JDA.Status.INITIALIZED); //This is already set by JDA internally, but this is to make sure the listeners catch it. - final int shardTotal = jda.login(this.gatewayURL, shardInfo, this.metaConfig.getCompression(), false, shardingConfig.getIntents(), this.metaConfig.getEncoding()); - if (getShardsTotal() == -1) - shardingConfig.setShardsTotal(shardTotal); + jda.login(this.gatewayURL, shardInfo, this.metaConfig.getCompression(), false, shardingConfig.getIntents(), this.metaConfig.getEncoding()); return jda; } @@ -635,6 +638,51 @@ public void setStatusProvider(IntFunction statusProvider) presenceConfig.setStatusProvider(statusProvider); } + private synchronized void retrieveShardTotal(OkHttpClient httpClient) + { + if (getShardsTotal() != -1) + return; + + LOG.debug("Fetching shard total using temporary rate-limiter"); + + CompletableFuture future = new CompletableFuture<>(); + ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(task -> { + Thread thread = new Thread(task, "DefaultShardManager retrieveShardTotal"); + thread.setDaemon(true); + return thread; + }); + + try + { + RestRateLimiter.RateLimitConfig rateLimitConfig = new RestRateLimiter.RateLimitConfig(pool, RestRateLimiter.GlobalRateLimit.create(), true); + SequentialRestRateLimiter rateLimiter = new SequentialRestRateLimiter(rateLimitConfig); + rateLimiter.enqueue(new ShardTotalTask(future, httpClient)); + + int shardTotal = future.join(); + this.shardingConfig.setShardsTotal(shardTotal); + this.shards = new ShardCacheViewImpl(shardTotal); + + synchronized (queue) + { + for (int i = 0; i < shardTotal; i++) + queue.add(i); + } + } + catch (CompletionException ex) + { + if (ex.getCause() instanceof RuntimeException) + throw (RuntimeException) ex.getCause(); + if (ex.getCause() instanceof Error) + throw (Error) ex.getCause(); + throw ex; + } + finally + { + future.cancel(false); + pool.shutdownNow(); + } + } + /** * This method creates the internal {@link java.util.concurrent.ScheduledExecutorService ScheduledExecutorService}. * It is intended as a hook for custom implementations to create their own executor. @@ -673,4 +721,141 @@ protected ExecutorPair(E executor, boolean automaticShutdown) this.automaticShutdown = automaticShutdown; } } + + protected class ShardTotalTask implements RestRateLimiter.Work + { + private final CompletableFuture future; + private final OkHttpClient httpClient; + private int failedAttempts = 0; + + protected ShardTotalTask(CompletableFuture future, OkHttpClient httpClient) + { + this.future = future; + this.httpClient = httpClient; + } + + @Nonnull + @Override + public Route.CompiledRoute getRoute() + { + return Route.Misc.GATEWAY_BOT.compile(); + } + + @Nonnull + @Override + public JDA getJDA() + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public okhttp3.Response execute() + { + try + { + RestConfig config = restConfigProvider.apply(0); + String url = config.getBaseUrl() + getRoute().getCompiledRoute(); + LOG.debug("Requesting shard total with url {}", url); + + okhttp3.Request.Builder builder = new okhttp3.Request.Builder() + .get() + .url(url) + .header("authorization", "Bot " + token) + .header("accept-encoding", "gzip") + .header("user-agent", config.getUserAgent()); + + Consumer customBuilder = config.getCustomBuilder(); + if (customBuilder != null) + customBuilder.accept(builder); + + Call call = httpClient.newCall(builder.build()); + okhttp3.Response response = call.execute(); + + try + { + LOG.debug("Received response with code {}", response.code()); + InputStream body = IOUtil.getBody(response); + + if (response.isSuccessful()) + { + DataObject json = DataObject.fromJson(body); + int shardTotal = json.getInt("shards"); + future.complete(shardTotal); + } + else if (response.code() == 401) + { + future.completeExceptionally(new InvalidTokenException()); + } + else if (response.code() != 429 && response.code() < 500 || ++failedAttempts > 4) + { + future.completeExceptionally(new IllegalStateException( + "Failed to fetch recommended shard total! Code: " + response.code() + "\n" + + new String(IOUtil.readFully(body), StandardCharsets.UTF_8) + )); + } + else if (response.code() >= 500) + { + int backoff = 1 << failedAttempts; + LOG.warn("Failed to retrieve recommended shard total. Code: {} ... retrying in {}s", response.code(), backoff); + response = response.newBuilder() + .headers(response.headers() + .newBuilder() + .set(RestRateLimiter.RESET_AFTER_HEADER, String.valueOf(backoff)) + .set(RestRateLimiter.REMAINING_HEADER, String.valueOf(0)) + .set(RestRateLimiter.LIMIT_HEADER, String.valueOf(1)) + .set(RestRateLimiter.SCOPE_HEADER, "custom") + .build()) + .build(); + } + + return response; + } + finally + { + response.close(); + } + } + catch (IOException e) + { + future.completeExceptionally(e); + throw new UncheckedIOException(e); + } + catch (Throwable e) + { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public boolean isSkipped() + { + return isCancelled(); + } + + @Override + public boolean isDone() + { + return future.isDone(); + } + + @Override + public boolean isPriority() + { + return true; + } + + @Override + public boolean isCancelled() + { + return future.isCancelled(); + } + + @Override + public void cancel() + { + future.cancel(false); + } + } } diff --git a/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManagerBuilder.java b/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManagerBuilder.java index c6e0ef2e8b..02700abd03 100644 --- a/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManagerBuilder.java +++ b/src/main/java/net/dv8tion/jda/api/sharding/DefaultShardManagerBuilder.java @@ -36,6 +36,7 @@ import net.dv8tion.jda.api.utils.cache.CacheFlag; import net.dv8tion.jda.internal.JDAImpl; import net.dv8tion.jda.internal.utils.Checks; +import net.dv8tion.jda.internal.utils.concurrent.CountingThreadFactory; import net.dv8tion.jda.internal.utils.config.flags.ConfigFlag; import net.dv8tion.jda.internal.utils.config.flags.ShardingConfigFlag; import net.dv8tion.jda.internal.utils.config.sharding.*; @@ -82,8 +83,23 @@ public class DefaultShardManagerBuilder protected IntFunction activityProvider = null; protected IntFunction> contextProvider = null; protected IntFunction eventManagerProvider = null; - protected ThreadPoolProvider rateLimitPoolProvider = null; - protected ThreadPoolProvider gatewayPoolProvider = null; + protected ThreadPoolProvider rateLimitSchedulerProvider = ThreadPoolProvider.lazy( + (total) -> Executors.newScheduledThreadPool(Math.max(2, 2 * (int) Math.log(total)), new CountingThreadFactory(() -> "JDA", "RateLimit-Scheduler", true)) + ); + protected ThreadPoolProvider rateLimitElasticProvider = ThreadPoolProvider.lazy( + (total) -> { + ExecutorService pool = Executors.newCachedThreadPool(new CountingThreadFactory(() -> "JDA", "RateLimit-Elastic", true)); + if (pool instanceof ThreadPoolExecutor) + { + ((ThreadPoolExecutor) pool).setCorePoolSize(Math.max(1, (int) Math.log(total))); + ((ThreadPoolExecutor) pool).setKeepAliveTime(2, TimeUnit.MINUTES); + } + return pool; + } + ); + protected ThreadPoolProvider gatewayPoolProvider = ThreadPoolProvider.lazy( + (total) -> Executors.newScheduledThreadPool(Math.max(1, (int) Math.log(total)), new CountingThreadFactory(() -> "JDA", "Gateway")) + ); protected ThreadPoolProvider callbackPoolProvider = null; protected ThreadPoolProvider eventPoolProvider = null; protected ThreadPoolProvider audioPoolProvider = null; @@ -1309,14 +1325,19 @@ public DefaultShardManagerBuilder setHttpClient(@Nullable OkHttpClient client) * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} * and similar methods. * - *

Default: {@link ScheduledThreadPoolExecutor} with 5 threads (per shard). + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * }{@link #setShardsTotal(int) shard_total}) threads. * * @param pool * The thread-pool to use for rate-limit handling * * @return The DefaultShardManagerBuilder instance. Useful for chaining. + * + * @deprecated This pool is now split into two pools. + * You should use {@link #setRateLimitScheduler(ScheduledExecutorService)} and {@link #setRateLimitElastic(ExecutorService)} instead. */ @Nonnull + @Deprecated + @ReplaceWith("setRateLimitScheduler(pool) or setRateLimitElastic(pool)") public DefaultShardManagerBuilder setRateLimitPool(@Nullable ScheduledExecutorService pool) { return setRateLimitPool(pool, pool == null); @@ -1332,7 +1353,7 @@ public DefaultShardManagerBuilder setRateLimitPool(@Nullable ScheduledExecutorSe * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} * and similar methods. * - *

Default: {@link ScheduledThreadPoolExecutor} with 5 threads (per shard). + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * }{@link #setShardsTotal(int) shard_total}) threads. * * @param pool * The thread-pool to use for rate-limit handling @@ -1340,8 +1361,13 @@ public DefaultShardManagerBuilder setRateLimitPool(@Nullable ScheduledExecutorSe * Whether {@link net.dv8tion.jda.api.JDA#shutdown()} should automatically shutdown this pool * * @return The DefaultShardManagerBuilder instance. Useful for chaining. + * + * @deprecated This pool is now split into two pools. + * You should use {@link #setRateLimitScheduler(ScheduledExecutorService, boolean)} and {@link #setRateLimitElastic(ExecutorService, boolean)} instead. */ @Nonnull + @Deprecated + @ReplaceWith("setRateLimitScheduler(pool, automaticShutdown) or setRateLimitElastic(pool, automaticShutdown)") public DefaultShardManagerBuilder setRateLimitPool(@Nullable ScheduledExecutorService pool, boolean automaticShutdown) { return setRateLimitPoolProvider(pool == null ? null : new ThreadPoolProviderImpl<>(pool, automaticShutdown)); @@ -1356,17 +1382,164 @@ public DefaultShardManagerBuilder setRateLimitPool(@Nullable ScheduledExecutorSe * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} * and similar methods. * - *

Default: {@link ScheduledThreadPoolExecutor} with 5 threads (per shard). + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * }{@link #setShardsTotal(int) shard_total}) threads. * * @param provider * The thread-pool provider to use for rate-limit handling * * @return The DefaultShardManagerBuilder instance. Useful for chaining. + * + * @deprecated This pool is now split into two pools. + * You should use {@link #setRateLimitPoolProvider(ThreadPoolProvider)} and {@link #setRateLimitElasticProvider(ThreadPoolProvider)} instead. */ @Nonnull + @Deprecated + @ReplaceWith("setRateLimitSchedulerProvider(provider) or setRateLimitElasticProvider(provider)") public DefaultShardManagerBuilder setRateLimitPoolProvider(@Nullable ThreadPoolProvider provider) { - this.rateLimitPoolProvider = provider; + this.rateLimitSchedulerProvider = provider; + return this; + } + + /** + * Sets the {@link ScheduledExecutorService ScheduledExecutorService} that should be used in + * the JDA rate-limit handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This will override the rate-limit pool provider set from {@link #setRateLimitPoolProvider(ThreadPoolProvider)}. + *
This automatically disables the automatic shutdown of the rate-limit pool, you can enable + * it using {@link #setRateLimitPool(ScheduledExecutorService, boolean) setRateLimiPool(executor, true)} + * + *

This is used mostly by the Rate-Limiter to handle backoff delays by using scheduled executions. + * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} + * and similar methods. Requests are handed off to the {@link #setRateLimitElastic(ExecutorService) elastic pool} for blocking execution. + * + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * } log({@link #setShardsTotal(int) shard_total})) threads. + * + * @param pool + * The thread-pool to use for rate-limit handling + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitScheduler(@Nullable ScheduledExecutorService pool) + { + return setRateLimitScheduler(pool, pool == null); + } + + /** + * Sets the {@link ScheduledExecutorService ScheduledExecutorService} that should be used in + * the JDA rate-limit handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This will override the rate-limit pool provider set from {@link #setRateLimitPoolProvider(ThreadPoolProvider)}. + * + *

This is used mostly by the Rate-Limiter to handle backoff delays by using scheduled executions. + * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} + * and similar methods. Requests are handed off to the {@link #setRateLimitElastic(ExecutorService) elastic pool} for blocking execution. + * + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * } log({@link #setShardsTotal(int) shard_total})) threads. + * + * @param pool + * The thread-pool to use for rate-limit handling + * @param automaticShutdown + * Whether {@link net.dv8tion.jda.api.JDA#shutdown()} should automatically shutdown this pool + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitScheduler(@Nullable ScheduledExecutorService pool, boolean automaticShutdown) + { + return setRateLimitSchedulerProvider(pool == null ? null : new ThreadPoolProviderImpl<>(pool, automaticShutdown)); + } + + /** + * Sets the {@link ScheduledExecutorService ScheduledExecutorService} provider that should be used in + * the JDA rate-limit handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + * + *

This is used mostly by the Rate-Limiter to handle backoff delays by using scheduled executions. + * Besides that it is also used by planned execution for {@link net.dv8tion.jda.api.requests.RestAction#queueAfter(long, TimeUnit)} + * and similar methods. Requests are handed off to the {@link #setRateLimitElastic(ExecutorService) elastic pool} for blocking execution. + * + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code 2 * } log({@link #setShardsTotal(int) shard_total})) threads. + * + * @param provider + * The thread-pool provider to use for rate-limit handling + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitSchedulerProvider(@Nullable ThreadPoolProvider provider) + { + this.rateLimitSchedulerProvider = provider; + return this; + } + + /** + * Sets the {@link ExecutorService} that should be used in + * the JDA request handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This will override the rate-limit pool provider set from {@link #setRateLimitElasticProvider(ThreadPoolProvider)}. + *
This automatically disables the automatic shutdown of the rate-limit elastic pool, you can enable + * it using {@link #setRateLimitElastic(ExecutorService, boolean) setRateLimitElastic(executor, true)} + * + *

This is used mostly by the Rate-Limiter to execute the blocking HTTP requests at runtime. + * + *

Default: {@link Executors#newCachedThreadPool()} shared between all shards. + * + * @param pool + * The thread-pool to use for executing http requests + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitElastic(@Nullable ExecutorService pool) + { + return setRateLimitElastic(pool, pool == null); + } + + /** + * Sets the {@link ExecutorService} that should be used in + * the JDA request handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + *
This will override the rate-limit pool provider set from {@link #setRateLimitElasticProvider(ThreadPoolProvider)}. + *
This automatically disables the automatic shutdown of the rate-limit elastic pool, you can enable + * it using {@link #setRateLimitElastic(ExecutorService, boolean) setRateLimitElastic(executor, true)} + * + *

This is used mostly by the Rate-Limiter to execute the blocking HTTP requests at runtime. + * + *

Default: {@link Executors#newCachedThreadPool()} shared between all shards. + * + * @param pool + * The thread-pool to use for executing http requests + * @param automaticShutdown + * Whether {@link net.dv8tion.jda.api.JDA#shutdown()} should automatically shutdown this pool + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitElastic(@Nullable ExecutorService pool, boolean automaticShutdown) + { + return setRateLimitElasticProvider(pool == null ? null : new ThreadPoolProviderImpl<>(pool, automaticShutdown)); + } + + /** + * Sets the {@link ExecutorService} that should be used in + * the JDA request handler. Changing this can drastically change the JDA behavior for RestAction execution + * and should be handled carefully. Only change this pool if you know what you're doing. + * + *

This is used mostly by the Rate-Limiter to execute the blocking HTTP requests at runtime. + * + *

Default: {@link Executors#newCachedThreadPool()} shared between all shards. + * + * @param provider + * The thread-pool provider to use for executing http requests + * + * @return The DefaultShardManagerBuilder instance. Useful for chaining. + */ + @Nonnull + public DefaultShardManagerBuilder setRateLimitElasticProvider(@Nullable ThreadPoolProvider provider) + { + this.rateLimitElasticProvider = provider; return this; } @@ -1389,7 +1562,7 @@ public DefaultShardManagerBuilder setRateLimitPoolProvider(@Nullable ThreadPoolP * Once a new payload is sent we switch to "rapid mode" which means more tasks will be submitted until no more payloads * have to be sent. * - *

Default: {@link ScheduledThreadPoolExecutor} with 1 thread (per shard) + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code log}({@link #setShardsTotal(int) shard_total})) threads. * * @param pool * The thread-pool to use for main WebSocket workers @@ -1419,7 +1592,7 @@ public DefaultShardManagerBuilder setGatewayPool(@Nullable ScheduledExecutorServ * Once a new payload is sent we switch to "rapid mode" which means more tasks will be submitted until no more payloads * have to be sent. * - *

Default: {@link ScheduledThreadPoolExecutor} with 1 thread (per shard) + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code log}({@link #setShardsTotal(int) shard_total})) threads. * * @param pool * The thread-pool to use for main WebSocket workers @@ -1450,7 +1623,7 @@ public DefaultShardManagerBuilder setGatewayPool(@Nullable ScheduledExecutorServ * Once a new payload is sent we switch to "rapid mode" which means more tasks will be submitted until no more payloads * have to be sent. * - *

Default: {@link ScheduledThreadPoolExecutor} with 1 thread (per shard) + *

Default: Shared {@link ScheduledThreadPoolExecutor} with ({@code log}({@link #setShardsTotal(int) shard_total})) threads. * * @param provider * The thread-pool provider to use for main WebSocket workers @@ -1542,6 +1715,8 @@ public DefaultShardManagerBuilder setCallbackPoolProvider(@Nullable ThreadPoolPr *

The executor will not be shutdown automatically when the shard is shutdown. * To shut it down automatically use {@link #setEventPool(ExecutorService, boolean)}. * + *

Default: Disabled + * * @param executor * The executor for the event proxy, or null to use calling thread * @@ -1559,6 +1734,8 @@ public DefaultShardManagerBuilder setEventPool(@Nullable ExecutorService executo * Sets the {@link ExecutorService ExecutorService} that should be used by the * event proxy to schedule events. This will be done on the calling thread by default. * + *

Default: Disabled + * * @param executor * The executor for the event proxy, or null to use calling thread * @param automaticShutdown @@ -1583,7 +1760,7 @@ public DefaultShardManagerBuilder setEventPool(@Nullable ExecutorService executo *

This is used to handle callbacks of {@link RestAction#queue()}, similarly it is used to * finish {@link RestAction#submit()} and {@link RestAction#complete()} tasks which build on queue. * - *

Default: {@link ForkJoinPool#commonPool()} + *

Default: Disabled * * @param provider * The thread-pool provider to use for callback handling @@ -2239,7 +2416,7 @@ public ShardManager build(boolean login) throws IllegalArgumentException presenceConfig.setActivityProvider(activityProvider); presenceConfig.setStatusProvider(statusProvider); presenceConfig.setIdleProvider(idleProvider); - final ThreadingProviderConfig threadingConfig = new ThreadingProviderConfig(rateLimitPoolProvider, gatewayPoolProvider, callbackPoolProvider, eventPoolProvider, audioPoolProvider, threadFactory); + final ThreadingProviderConfig threadingConfig = new ThreadingProviderConfig(rateLimitSchedulerProvider, rateLimitElasticProvider, gatewayPoolProvider, callbackPoolProvider, eventPoolProvider, audioPoolProvider, threadFactory); final ShardingSessionConfig sessionConfig = new ShardingSessionConfig(sessionController, voiceDispatchInterceptor, httpClient, httpClientBuilder, wsFactory, audioSendFactory, flags, shardingFlags, maxReconnectDelay, largeThreshold); final ShardingMetaConfig metaConfig = new ShardingMetaConfig(maxBufferSize, contextProvider, cacheFlags, flags, compression, encoding); final DefaultShardManager manager = new DefaultShardManager(this.token, this.shards, shardingConfig, eventConfig, presenceConfig, threadingConfig, sessionConfig, metaConfig, restConfigProvider, chunkingFilter); @@ -2304,6 +2481,7 @@ else if (!membersIntent && chunkingFilter != ChunkingFilter.NONE) throw new IllegalArgumentException("Cannot use CacheFlag." + flag + " without GatewayIntent." + intent + "!"); } } + //Avoid having multiple anonymous classes private static class ThreadPoolProviderImpl implements ThreadPoolProvider { diff --git a/src/main/java/net/dv8tion/jda/api/sharding/ThreadPoolProvider.java b/src/main/java/net/dv8tion/jda/api/sharding/ThreadPoolProvider.java index 0281ea6971..f3ad44706a 100644 --- a/src/main/java/net/dv8tion/jda/api/sharding/ThreadPoolProvider.java +++ b/src/main/java/net/dv8tion/jda/api/sharding/ThreadPoolProvider.java @@ -16,8 +16,12 @@ package net.dv8tion.jda.api.sharding; +import net.dv8tion.jda.internal.utils.Checks; + +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; +import java.util.function.IntFunction; /** * Called by {@link DefaultShardManager} when building a JDA instance. @@ -52,4 +56,75 @@ default boolean shouldShutdownAutomatically(int shardId) { return false; } + + /** + * Provider that initializes with a {@link DefaultShardManagerBuilder#setShardsTotal(int) shard_total} + * and provides the same pool to share between shards. + * + * @param init + * Function to initialize the shared pool, called with the shard total + * + * @param + * The type of executor + * + * @return The lazy pool provider + */ + @Nonnull + static LazySharedProvider lazy(@Nonnull IntFunction init) + { + Checks.notNull(init, "Initializer"); + return new LazySharedProvider<>(init); + } + + final class LazySharedProvider implements ThreadPoolProvider + { + private final IntFunction initializer; + private T pool; + + LazySharedProvider(@Nonnull IntFunction initializer) + { + this.initializer = initializer; + } + + /** + * Called with the shard total to initialize the shared pool. + * + *

This also destroys the temporary pool created for fetching the recommended shard total. + * + * @param shardTotal + * The shard total + */ + public synchronized void init(int shardTotal) + { + if (pool == null) + pool = initializer.apply(shardTotal); + } + + /** + * Shuts down the shared pool and the temporary pool. + */ + public synchronized void shutdown() + { + if (pool != null) + { + pool.shutdown(); + pool = null; + } + } + + /** + * Provides the initialized pool or the temporary pool if not initialized yet. + * + * @param shardId + * The current shard id + * + * @return The thread pool instance + */ + @Nullable + @Override + public synchronized T provide(int shardId) + { + return pool; + } + } } diff --git a/src/main/java/net/dv8tion/jda/api/utils/SessionControllerAdapter.java b/src/main/java/net/dv8tion/jda/api/utils/SessionControllerAdapter.java index cdb64dcdc4..adadd90fd8 100644 --- a/src/main/java/net/dv8tion/jda/api/utils/SessionControllerAdapter.java +++ b/src/main/java/net/dv8tion/jda/api/utils/SessionControllerAdapter.java @@ -96,7 +96,7 @@ public void handleResponse(Response response, Request request) else if (response.code == 401) { api.shutdownNow(); - request.onFailure(new InvalidTokenException("The provided token is invalid!")); + request.onFailure(new InvalidTokenException()); } else { diff --git a/src/main/java/net/dv8tion/jda/internal/JDAImpl.java b/src/main/java/net/dv8tion/jda/internal/JDAImpl.java index cdab669427..16e916943f 100644 --- a/src/main/java/net/dv8tion/jda/internal/JDAImpl.java +++ b/src/main/java/net/dv8tion/jda/internal/JDAImpl.java @@ -280,7 +280,8 @@ public void initRequester() return; RestRateLimiter rateLimiter = this.restConfig.getRateLimiterFactory().apply( new RestRateLimiter.RateLimitConfig( - this.threadConfig.getRateLimitPool(), + this.threadConfig.getRateLimitScheduler(), + this.threadConfig.getRateLimitElastic(), getSessionController().getRateLimitHandle(), this.sessionConfig.isRelativeRateLimit() && this.restConfig.isRelativeRateLimit() )); @@ -412,7 +413,7 @@ else if (response.code == 401) return; } - throw new InvalidTokenException("The provided token is invalid!"); + throw new InvalidTokenException(); } catch (Throwable error) { @@ -569,7 +570,7 @@ public int cancelRequests() @Override public ScheduledExecutorService getRateLimitPool() { - return threadConfig.getRateLimitPool(); + return threadConfig.getRateLimitScheduler(); } @Nonnull diff --git a/src/main/java/net/dv8tion/jda/internal/utils/config/ThreadingConfig.java b/src/main/java/net/dv8tion/jda/internal/utils/config/ThreadingConfig.java index 1bc7057e73..edd867b90d 100644 --- a/src/main/java/net/dv8tion/jda/internal/utils/config/ThreadingConfig.java +++ b/src/main/java/net/dv8tion/jda/internal/utils/config/ThreadingConfig.java @@ -20,23 +20,22 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.*; import java.util.function.Supplier; public class ThreadingConfig { private final Object audioLock = new Object(); - private ScheduledExecutorService rateLimitPool; + private ScheduledExecutorService rateLimitScheduler; + private ExecutorService rateLimitElastic; private ScheduledExecutorService gatewayPool; private ExecutorService callbackPool; private ExecutorService eventPool; private ScheduledExecutorService audioPool; - private boolean shutdownRateLimitPool; + private boolean shutdownRateLimitScheduler; + private boolean shutdownRateLimitElastic; private boolean shutdownGatewayPool; private boolean shutdownCallbackPool; private boolean shutdownEventPool; @@ -46,16 +45,23 @@ public ThreadingConfig() { this.callbackPool = ForkJoinPool.commonPool(); - this.shutdownRateLimitPool = true; + this.shutdownRateLimitScheduler = true; + this.shutdownRateLimitElastic = true; this.shutdownGatewayPool = true; this.shutdownCallbackPool = false; this.shutdownAudioPool = true; } - public void setRateLimitPool(@Nullable ScheduledExecutorService executor, boolean shutdown) + public void setRateLimitScheduler(@Nullable ScheduledExecutorService executor, boolean shutdown) { - this.rateLimitPool = executor; - this.shutdownRateLimitPool = shutdown; + this.rateLimitScheduler = executor; + this.shutdownRateLimitScheduler = shutdown; + } + + public void setRateLimitElastic(@Nullable ExecutorService executor, boolean shutdown) + { + this.rateLimitElastic = executor; + this.shutdownRateLimitScheduler = shutdown; } public void setGatewayPool(@Nullable ScheduledExecutorService executor, boolean shutdown) @@ -84,10 +90,19 @@ public void setAudioPool(@Nullable ScheduledExecutorService executor, boolean sh public void init(@Nonnull Supplier identifier) { - if (this.rateLimitPool == null) - this.rateLimitPool = newScheduler(5, identifier, "RateLimit", false); + if (this.rateLimitScheduler == null) + this.rateLimitScheduler = newScheduler(2, identifier, "RateLimit-Scheduler", false); if (this.gatewayPool == null) this.gatewayPool = newScheduler(1, identifier, "Gateway"); + if (this.rateLimitElastic == null) + { + this.rateLimitElastic = Executors.newCachedThreadPool(new CountingThreadFactory(identifier, "RateLimit-Elastic", false)); + if (this.rateLimitElastic instanceof ThreadPoolExecutor) + { + ((ThreadPoolExecutor) this.rateLimitElastic).setCorePoolSize(1); + ((ThreadPoolExecutor) this.rateLimitElastic).setKeepAliveTime(2, TimeUnit.MINUTES); + } + } } public void shutdown() @@ -104,8 +119,10 @@ public void shutdown() public void shutdownRequester() { - if (shutdownRateLimitPool) - rateLimitPool.shutdown(); + if (shutdownRateLimitScheduler) + rateLimitScheduler.shutdown(); + if (shutdownRateLimitElastic) + rateLimitElastic.shutdown(); } public void shutdownNow() @@ -114,8 +131,10 @@ public void shutdownNow() callbackPool.shutdownNow(); if (shutdownGatewayPool) gatewayPool.shutdownNow(); - if (shutdownRateLimitPool) - rateLimitPool.shutdownNow(); + if (shutdownRateLimitScheduler) + rateLimitScheduler.shutdownNow(); + if (shutdownRateLimitElastic) + rateLimitElastic.shutdownNow(); if (shutdownEventPool && eventPool != null) eventPool.shutdownNow(); if (shutdownAudioPool && audioPool != null) @@ -123,9 +142,15 @@ public void shutdownNow() } @Nonnull - public ScheduledExecutorService getRateLimitPool() + public ScheduledExecutorService getRateLimitScheduler() { - return rateLimitPool; + return rateLimitScheduler; + } + + @Nonnull + public ExecutorService getRateLimitElastic() + { + return rateLimitElastic; } @Nonnull @@ -162,9 +187,14 @@ public ScheduledExecutorService getAudioPool(@Nonnull Supplier identifie return pool; } - public boolean isShutdownRateLimitPool() + public boolean isShutdownRateLimitScheduler() + { + return shutdownRateLimitScheduler; + } + + public boolean isShutdownRateLimitElastic() { - return shutdownRateLimitPool; + return shutdownRateLimitElastic; } public boolean isShutdownGatewayPool() diff --git a/src/main/java/net/dv8tion/jda/internal/utils/config/sharding/ThreadingProviderConfig.java b/src/main/java/net/dv8tion/jda/internal/utils/config/sharding/ThreadingProviderConfig.java index f9af30bd1f..9173a75612 100644 --- a/src/main/java/net/dv8tion/jda/internal/utils/config/sharding/ThreadingProviderConfig.java +++ b/src/main/java/net/dv8tion/jda/internal/utils/config/sharding/ThreadingProviderConfig.java @@ -26,7 +26,8 @@ public class ThreadingProviderConfig { - private final ThreadPoolProvider rateLimitPoolProvider; + private final ThreadPoolProvider rateLimitSchedulerProvider; + private final ThreadPoolProvider rateLimitElasticProvider; private final ThreadPoolProvider gatewayPoolProvider; private final ThreadPoolProvider callbackPoolProvider; private final ThreadPoolProvider eventPoolProvider; @@ -34,14 +35,16 @@ public class ThreadingProviderConfig private final ThreadFactory threadFactory; public ThreadingProviderConfig( - @Nullable ThreadPoolProvider rateLimitPoolProvider, + @Nullable ThreadPoolProvider rateLimitSchedulerProvider, + @Nullable ThreadPoolProvider rateLimitElasticProvider, @Nullable ThreadPoolProvider gatewayPoolProvider, @Nullable ThreadPoolProvider callbackPoolProvider, @Nullable ThreadPoolProvider eventPoolProvider, @Nullable ThreadPoolProvider audioPoolProvider, @Nullable ThreadFactory threadFactory) { - this.rateLimitPoolProvider = rateLimitPoolProvider; + this.rateLimitSchedulerProvider = rateLimitSchedulerProvider; + this.rateLimitElasticProvider = rateLimitElasticProvider; this.gatewayPoolProvider = gatewayPoolProvider; this.callbackPoolProvider = callbackPoolProvider; this.eventPoolProvider = eventPoolProvider; @@ -55,10 +58,48 @@ public ThreadFactory getThreadFactory() return threadFactory; } + private void init(ThreadPoolProvider provider, int shardTotal) + { + if (provider instanceof ThreadPoolProvider.LazySharedProvider) + ((ThreadPoolProvider.LazySharedProvider) provider).init(shardTotal); + } + + private void shutdown(ThreadPoolProvider provider) + { + if (provider instanceof ThreadPoolProvider.LazySharedProvider) + ((ThreadPoolProvider.LazySharedProvider) provider).shutdown(); + } + + public void init(int shardTotal) + { + init(rateLimitSchedulerProvider, shardTotal); + init(rateLimitElasticProvider, shardTotal); + init(gatewayPoolProvider, shardTotal); + init(callbackPoolProvider, shardTotal); + init(eventPoolProvider, shardTotal); + init(audioPoolProvider, shardTotal); + } + + public void shutdown() + { + shutdown(rateLimitSchedulerProvider); + shutdown(rateLimitElasticProvider); + shutdown(gatewayPoolProvider); + shutdown(callbackPoolProvider); + shutdown(eventPoolProvider); + shutdown(audioPoolProvider); + } + + @Nullable + public ThreadPoolProvider getRateLimitSchedulerProvider() + { + return rateLimitSchedulerProvider; + } + @Nullable - public ThreadPoolProvider getRateLimitPoolProvider() + public ThreadPoolProvider getRateLimitElasticProvider() { - return rateLimitPoolProvider; + return rateLimitElasticProvider; } @Nullable @@ -88,6 +129,6 @@ public ThreadPoolProvider getAudioPoolProvid @Nonnull public static ThreadingProviderConfig getDefault() { - return new ThreadingProviderConfig(null, null, null, null, null, null); + return new ThreadingProviderConfig(null, null, null, null, null, null, null); } }