From 4e07cf4cfa44e69e1e85544f79f006cc5154291a Mon Sep 17 00:00:00 2001 From: Taras Goriachko Date: Wed, 9 Oct 2024 02:30:54 +0200 Subject: [PATCH] Adaptive concurrency filter --- .../snapshot/SnapshotProperties.kt | 4 + .../AdaptiveConcurrencyFilterFactory.kt | 81 +++++++++++++++++++ .../listeners/filters/EnvoyDefaultFilters.kt | 11 ++- 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/AdaptiveConcurrencyFilterFactory.kt diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt index 6c64b7405..cf317b0d1 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt @@ -34,6 +34,7 @@ class SnapshotProperties { var jwt = JwtFilterProperties() var requireServiceName = false var rateLimit = RateLimitProperties() + var adaptiveConcurrencyProperties = AdaptiveConcurrencyProperties() var deltaXdsEnabled = false var retryPolicy = RetryPolicyProperties() var tcpDumpsEnabled: Boolean = true @@ -417,6 +418,9 @@ class JwtFilterProperties { var providers = mapOf() } +data class AdaptiveConcurrencyProperties( + var enabled: Boolean = true, +) data class RateLimitProperties( var domain: String = "rl", var serviceName: String = "ratelimit-grpc" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/AdaptiveConcurrencyFilterFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/AdaptiveConcurrencyFilterFactory.kt new file mode 100644 index 000000000..adbac02a5 --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/AdaptiveConcurrencyFilterFactory.kt @@ -0,0 +1,81 @@ +package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters + +import com.google.protobuf.Any +import com.google.protobuf.BoolValue +import com.google.protobuf.Duration +import com.google.protobuf.UInt32Value +import io.envoyproxy.envoy.config.core.v3.RuntimeFeatureFlag +import io.envoyproxy.envoy.extensions.filters.http.adaptive_concurrency.v3.AdaptiveConcurrency +import io.envoyproxy.envoy.extensions.filters.http.adaptive_concurrency.v3.GradientControllerConfig +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter +import io.envoyproxy.envoy.type.v3.Percent +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.AdaptiveConcurrencyProperties + +class AdaptiveConcurrencyFilterFactory( + val properties: AdaptiveConcurrencyProperties +) { + + fun adaptiveConcurrencyFilter(): HttpFilter = adaptiveConcurrencyFilter + + /** + * This filter is used to limit the number of concurrent requests to a service. + * It is used to prevent overloading the service. + * gradient_controller_config: + * sample_aggregate_percentile: + * value: 90 + * concurrency_limit_params: + * concurrency_update_interval: 0.1s + * min_rtt_calc_params: + * jitter: + * value: 10 + * interval: 60s + * request_count: 50 + * enabled: + * default_value: true + * runtime_key: "adaptive_concurrency.enabled" + */ + + private val adaptiveConcurrencyFilter: HttpFilter = + HttpFilter.newBuilder() + .setName("envoy.filters.http.adaptive_concurrency") + .setTypedConfig( + Any.pack( + AdaptiveConcurrency.newBuilder() + .setGradientControllerConfig( + GradientControllerConfig.newBuilder() + .setSampleAggregatePercentile( + Percent.newBuilder().setValue(90.0).build() + ) + .setConcurrencyLimitParams( + GradientControllerConfig.ConcurrencyLimitCalculationParams.newBuilder() + .setConcurrencyUpdateInterval( + Duration.newBuilder().setSeconds(1).build() + ) + .build() + ) + .setMinRttCalcParams( + GradientControllerConfig.MinimumRTTCalculationParams.newBuilder() + .setRequestCount(UInt32Value.of(50)) + .setJitter(Percent.newBuilder().setValue(10.0).build()) + .setInterval(Duration.newBuilder().setSeconds(60).build()) + .build() + ) + .build() + ) + .setEnabled( + RuntimeFeatureFlag.newBuilder() + .setDefaultValue( + BoolValue.of(true) + ) + .setRuntimeKey("adaptive_concurrency.enabled") + .build() + ) + .build() + ) + ) + .build() +} + + + + diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/EnvoyDefaultFilters.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/EnvoyDefaultFilters.kt index 07e92eb53..b78dc950b 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/EnvoyDefaultFilters.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/EnvoyDefaultFilters.kt @@ -27,6 +27,9 @@ class EnvoyDefaultFilters( private val serviceTagFilterFactory = ServiceTagFilterFactory( properties = snapshotProperties.routing.serviceTags ) + private val adaptiveConcurrencyFilterFactory = AdaptiveConcurrencyFilterFactory( + snapshotProperties.adaptiveConcurrencyProperties + ) private val compressionFilterFactory = CompressionFilterFactory(snapshotProperties) @@ -59,6 +62,10 @@ class EnvoyDefaultFilters( luaFilterFactory.ingressClientNameHeaderFilter() } + val defaultAdaptiveConcurrencyFilter = { _: Group, _: GlobalSnapshot -> + adaptiveConcurrencyFilterFactory.adaptiveConcurrencyFilter() + } + val defaultJwtHttpFilter = { group: Group, _: GlobalSnapshot -> jwtFilterFactory.createJwtFilter(group) } val defaultAuthorizationHeaderFilter = { _: Group, _: GlobalSnapshot -> @@ -119,9 +126,11 @@ class EnvoyDefaultFilters( defaultClientNameHeaderFilter, defaultAuthorizationHeaderFilter, defaultJwtHttpFilter, - defaultCurrentZoneHeaderFilter + defaultCurrentZoneHeaderFilter, ) + val postFilters = listOf( + defaultAdaptiveConcurrencyFilter, defaultRbacLoggingFilter, defaultRbacFilter, defaultRateLimitLuaFilter,