From cb790b747c2b421abcd3956bf7c87622561895df Mon Sep 17 00:00:00 2001 From: Michael Rittmeister Date: Fri, 25 Oct 2024 01:38:22 +0200 Subject: [PATCH] Implement distributed rate-limiting --- .idea/kotlinc.xml | 4 +- .../mikbot/plugin/api/settings/SettingsApi.kt | 1 + core/kubernetes/build.gradle.kts | 3 + .../dev/schlaubi/mikbot/core/health/Config.kt | 1 + .../mikbot/core/health/KubernetesPlugin.kt | 2 + .../ratelimit/DistributedRateLimiter.kt | 90 +++++++++++++++++++ .../mikbot/core/health/ratelimit/Setup.kt | 23 +++++ gradle/libs.versions.toml | 5 +- runtime/plugins.txt | 4 +- 9 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/DistributedRateLimiter.kt create mode 100644 core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/Setup.kt diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 6d0ee1c2a..9ad0cbdd5 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -1,6 +1,6 @@ - - \ No newline at end of file + diff --git a/api/src/main/kotlin/dev/schlaubi/mikbot/plugin/api/settings/SettingsApi.kt b/api/src/main/kotlin/dev/schlaubi/mikbot/plugin/api/settings/SettingsApi.kt index be65e2685..bb3a99c91 100644 --- a/api/src/main/kotlin/dev/schlaubi/mikbot/plugin/api/settings/SettingsApi.kt +++ b/api/src/main/kotlin/dev/schlaubi/mikbot/plugin/api/settings/SettingsApi.kt @@ -1,6 +1,7 @@ package dev.schlaubi.mikbot.plugin.api.settings import com.kotlindiscord.kord.extensions.commands.application.slash.SlashCommand +import dev.kord.common.entity.ApplicationIntegrationType import dev.kord.common.entity.InteractionContextType import dev.kord.common.entity.Permission import dev.schlaubi.mikbot.plugin.api.* diff --git a/core/kubernetes/build.gradle.kts b/core/kubernetes/build.gradle.kts index 43c970cbf..528c5f32b 100644 --- a/core/kubernetes/build.gradle.kts +++ b/core/kubernetes/build.gradle.kts @@ -19,6 +19,9 @@ dependencies { implementation(libs.kubernetes.client) implementation(libs.kotlin.jsonpatch) + implementation(libs.bucket4j) + implementation("io.lettuce:lettuce-core:6.4.0.RELEASE") + testImplementation(kotlin("test-junit5")) testImplementation(projects.api) diff --git a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/Config.kt b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/Config.kt index 1479af4b9..3eeaa63ee 100644 --- a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/Config.kt +++ b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/Config.kt @@ -10,4 +10,5 @@ object Config : EnvironmentConfig() { val STATEFUL_SET_NAME by this val NAMESPACE by getEnv("default") val CONTAINER_NAME by this + val REDIS_URL by this } diff --git a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/KubernetesPlugin.kt b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/KubernetesPlugin.kt index 6737343c7..fe743244e 100644 --- a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/KubernetesPlugin.kt +++ b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/KubernetesPlugin.kt @@ -3,6 +3,7 @@ package dev.schlaubi.mikbot.core.health import com.kotlindiscord.kord.extensions.builders.ExtensibleBotBuilder import com.kotlindiscord.kord.extensions.utils.loadModule import dev.schlaubi.mikbot.core.health.check.HealthCheck +import dev.schlaubi.mikbot.core.health.ratelimit.setupDistributedRateLimiter import dev.schlaubi.mikbot.plugin.api.* import dev.schlaubi.mikbot.plugin.api.config.Environment import mu.KotlinLogging @@ -42,6 +43,7 @@ class KubernetesPlugin(context: PluginContext) : Plugin(context) { LOG.debug { "Scaling is enabled " } kord { sharding { calculateShards() } + setupDistributedRateLimiter() } applicationCommands { register = Config.POD_ID == (Config.TOTAL_SHARDS / Config.SHARDS_PER_POD) diff --git a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/DistributedRateLimiter.kt b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/DistributedRateLimiter.kt new file mode 100644 index 000000000..b4a46d3ce --- /dev/null +++ b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/DistributedRateLimiter.kt @@ -0,0 +1,90 @@ +package dev.schlaubi.mikbot.core.health.ratelimit + +import dev.kord.rest.ratelimit.RequestRateLimiter +import dev.kord.rest.ratelimit.RequestResponse +import dev.kord.rest.ratelimit.RequestToken +import dev.kord.rest.request.Request +import dev.kord.rest.request.identifier +import io.github.bucket4j.BucketConfiguration +import io.github.bucket4j.TokensInheritanceStrategy +import io.github.bucket4j.distributed.proxy.ProxyManager +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.future.await +import kotlinx.datetime.Clock +import kotlinx.datetime.toJavaInstant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes +import kotlin.time.toJavaDuration + +private val LOG = KotlinLogging.logger { } + +private val autoBanBucket = BucketConfiguration.builder() + .addLimit { it.capacity(25000).refillIntervally(25000, 10.minutes.toJavaDuration()) } + .build() + +private const val globalBucket = "global" + +class DistributedRateLimiter(proxyManager: ProxyManager) : RequestRateLimiter { + private val proxyManager = proxyManager.asAsync() + private val executor = Executors.newScheduledThreadPool(10) + private val start = Clock.System.now() + + private suspend fun await(name: String, configuration: BucketConfiguration, expiry: Duration = 1.minutes) { + proxyManager.getProxy(name) { CompletableFuture.completedFuture(configuration) } + .asScheduler() + .tryConsume(1, expiry.toJavaDuration(), executor) + .await() + } + + private suspend fun awaitByName(name: String) { + val requestBucket = proxyManager.getProxyConfiguration(name).await() + if (requestBucket.isPresent) { + await(name, requestBucket.get()) + } + } + + override suspend fun await(request: Request<*, *>): RequestToken { + val requestIdentifier = request.identifier.toString() + awaitByName(globalBucket) + awaitByName(requestIdentifier) + await("auto_ban", autoBanBucket) + + return object : RequestToken { + private val deferred = CompletableFuture() + + override val completed: Boolean + get() = deferred.isDone + + override suspend fun complete(response: RequestResponse) { + if (response is RequestResponse.GlobalRateLimit) { + val config = response.toBucketConfiguration() + val proxy = proxyManager.getProxy(globalBucket) { CompletableFuture.completedFuture(config) } + proxy.replaceConfiguration(config, TokensInheritanceStrategy.AS_IS) + } else { + val key = response.bucketKey!! + val config = response.toBucketConfiguration() + val proxy = proxyManager.getProxy(key.value) { CompletableFuture.completedFuture(config) } + proxy.replaceConfiguration(config, TokensInheritanceStrategy.AS_IS) + } + + deferred.complete(Unit) + } + } + } + + private fun RequestResponse.toBucketConfiguration() = BucketConfiguration.builder() + .addLimit { + val limit = rateLimit!! + val reset = reset!! + it + .capacity(limit.remaining.value) + .refillIntervallyAligned( + limit.total.value, + (Clock.System.now() - reset.value).toJavaDuration(), + start.toJavaInstant() + ) + } + .build() +} diff --git a/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/Setup.kt b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/Setup.kt new file mode 100644 index 000000000..e0cc6e371 --- /dev/null +++ b/core/kubernetes/src/main/kotlin/dev/schlaubi/mikbot/core/health/ratelimit/Setup.kt @@ -0,0 +1,23 @@ +package dev.schlaubi.mikbot.core.health.ratelimit + +import dev.kord.core.builder.kord.KordBuilder +import dev.kord.rest.request.KtorRequestHandler +import dev.schlaubi.mikbot.core.health.Config +import io.github.bucket4j.redis.lettuce.Bucket4jLettuce +import io.lettuce.core.RedisClient +import io.lettuce.core.codec.ByteArrayCodec +import io.lettuce.core.codec.RedisCodec +import io.lettuce.core.codec.StringCodec + +fun KordBuilder.setupDistributedRateLimiter() { + val connection = RedisClient.create(Config.REDIS_URL) + .connect(RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE)) + val proxyManager = Bucket4jLettuce + .casBasedBuilder(connection).build() + + val rateLimiter = DistributedRateLimiter(proxyManager) + + requestHandler { + KtorRequestHandler(it.token, rateLimiter) + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3f399d07f..c22ece00f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,12 +1,12 @@ [versions] kotlin = "2.0.21" -kordex = "1.9.8-mikbot-SNAPSHOT" +kordex = "1.9.9-mikbot-SNAPSHOT" kmongo = "5.1.0" coroutines = "1.9.0" serialization = "1.7.3" ktor = "3.0.0" kord = "feature-user-apps-20241022.210712-8" - api = "3.37.9" +api = "3.37.10" ksp = "2.0.21-1.0.25" lavakord = "7.1.0" @@ -64,6 +64,7 @@ koin = { group = "io.insert-koin", name = "koin-core", version = "3.5.6" } asm = { group = "org.ow2.asm", name = "asm", version = "9.7" } mikbot-api = { group = "dev.schlaubi", name = "mikbot-api" } +bucket4j = { group = "com.bucket4j", name = "bucket4j_jdk17-lettuce", version = "8.14.0" } [plugins] kotlinx-serialization = { id = "org.jetbrains.kotlin.plugin.serialization", version.ref = "kotlin" } diff --git a/runtime/plugins.txt b/runtime/plugins.txt index d4618f65d..36c979994 100644 --- a/runtime/plugins.txt +++ b/runtime/plugins.txt @@ -1,3 +1 @@ -:core:gdpr -:music:player -:music:commands +:core:kubernetes