diff --git a/lib/datadog/appsec/rate_limiter.rb b/lib/datadog/appsec/rate_limiter.rb index fd1f76c0119..a0ab184e83f 100644 --- a/lib/datadog/appsec/rate_limiter.rb +++ b/lib/datadog/appsec/rate_limiter.rb @@ -1,34 +1,15 @@ # frozen_string_literal: true +require_relative '../core/rate_limiter' + module Datadog module AppSec - # Simple per-thread rate limiter - # Since AppSec marks sampling to keep on a security event, this limits the flood of egress traces involving AppSec + # Per-thread rate limiter based on token bucket rate limiter. + # + # Since AppSec marks sampling to keep on a security event, this limits + # the flood of egress traces involving AppSec class RateLimiter - def initialize(rate) - @rate = rate - @timestamps = [] - end - - def limit - now = Time.now.to_f - - loop do - oldest = @timestamps.first - - break if oldest.nil? || now - oldest < 1 - - @timestamps.shift - end - - @timestamps << now - - if (count = @timestamps.count) <= @rate - yield - else - Datadog.logger.debug { "Rate limit hit: #{count}/#{@rate} AppSec traces/second" } - end - end + TRACES_KEY = :datadog_security_traces_rate_limiter class << self def limit(name, &block) @@ -37,15 +18,18 @@ def limit(name, &block) # reset a rate limiter: used for testing def reset!(name) - Thread.current[:datadog_security_trace_rate_limiter] = nil + Thread.current.thread_variable_set(TRACES_KEY, nil) end - protected + private def rate_limiter(name) case name when :traces - Thread.current[:datadog_security_trace_rate_limiter] ||= RateLimiter.new(trace_rate_limit) + rate_limiter = Thread.current.thread_variable_get(TRACES_KEY) + return rate_limiter unless rate_limiter.nil? + + Thread.current.thread_variable_set(TRACES_KEY, new(trace_rate_limit)) else raise "unsupported rate limiter: #{name.inspect}" end @@ -55,6 +39,16 @@ def trace_rate_limit Datadog.configuration.appsec.trace_rate_limit end end + + def initialize(rate) + @rate_limiter = Core::TokenBucket.new(rate) + end + + def limit + return yield if @rate_limiter.allow? + + Datadog.logger.debug { "Rate limit hit: #{@rate_limiter.current_window_rate} AppSec traces/second" } + end end end end diff --git a/sig/datadog/appsec/rate_limiter.rbs b/sig/datadog/appsec/rate_limiter.rbs index 8528bfed118..c073efdd5fd 100644 --- a/sig/datadog/appsec/rate_limiter.rbs +++ b/sig/datadog/appsec/rate_limiter.rbs @@ -1,24 +1,25 @@ module Datadog module AppSec class RateLimiter - type timestamp = ::Float - type rate = ::Integer + @rate_limiter: Core::TokenBucket - @rate: ::Integer - @timestamps: ::Array[timestamp] + TRACES_KEY: :datadog_security_traces_rate_limiter - def initialize: (rate rate) -> void + def self.limit: (::Symbol name) { (?) -> untyped } -> untyped - # TODO: return type of limit is return type of block - def limit: () { () -> untyped } -> untyped - - def self.limit: (::Symbol name) { () -> untyped } -> untyped + def self.reset!: (::Symbol name) -> untyped - def self.reset!: (::Symbol name) -> void + private def self.rate_limiter: (::Symbol name) -> RateLimiter - def self.trace_rate_limit: () -> rate + def self.trace_rate_limit: () -> untyped + + public + + def initialize: (::Integer rate) -> void + + def limit: () { () -> untyped } -> untyped end end end diff --git a/sig/datadog/core/rate_limiter.rbs b/sig/datadog/core/rate_limiter.rbs index 25fca42b282..deee495d117 100644 --- a/sig/datadog/core/rate_limiter.rbs +++ b/sig/datadog/core/rate_limiter.rbs @@ -1,17 +1,42 @@ module Datadog module Core class RateLimiter - def allow?: (untyped size) -> nil + def allow?: (?::Integer size) -> (false | true) + def effective_rate: () -> nil end + class TokenBucket < RateLimiter + @rate: untyped + + @max_tokens: untyped + + @tokens: untyped + + @total_messages: untyped + + @conforming_messages: untyped + + @prev_conforming_messages: untyped + + @prev_total_messages: untyped + + @current_window: untyped + + @last_refill: untyped + attr_reader rate: untyped attr_reader max_tokens: untyped + def initialize: (untyped rate, ?untyped max_tokens) -> void - def allow?: (untyped size) -> untyped + + def allow?: (?::Integer size) -> (false | true) + def effective_rate: () -> (::Float | untyped) + def current_window_rate: () -> (::Float | untyped) + def available_tokens: () -> untyped private @@ -24,11 +49,12 @@ module Datadog def increment_conforming_count: () -> untyped - def should_allow?: (untyped size) -> (false | true) - def update_rate_counts: (untyped allowed) -> untyped + def should_allow?: (?::Integer size) -> (false | true) end + class UnlimitedLimiter < RateLimiter - def allow?: (untyped _) -> true + def allow?: (?::Integer _) -> true + def effective_rate: () -> ::Float end end diff --git a/spec/datadog/appsec/event_spec.rb b/spec/datadog/appsec/event_spec.rb index 980fe0afdee..ab671965547 100644 --- a/spec/datadog/appsec/event_spec.rb +++ b/spec/datadog/appsec/event_spec.rb @@ -332,7 +332,12 @@ end context 'with many traces' do - let(:rate_limit) { 100 } + before do + allow(Datadog::Core::Utils::Time).to receive(:get_time).and_return(0) + allow(Datadog::AppSec::RateLimiter).to receive(:trace_rate_limit).and_return(rate_limit) + end + + let(:rate_limit) { 50 } let(:trace_count) { rate_limit * 2 } let(:traces) do @@ -351,7 +356,6 @@ it 'rate limits event recording' do expect(described_class).to receive(:record_via_span).exactly(rate_limit).times.and_call_original - expect(traces).to have_attributes(count: trace_count) end end diff --git a/spec/datadog/appsec/rate_limiter_spec.rb b/spec/datadog/appsec/rate_limiter_spec.rb new file mode 100644 index 00000000000..30baca1956c --- /dev/null +++ b/spec/datadog/appsec/rate_limiter_spec.rb @@ -0,0 +1,30 @@ +require 'datadog/appsec/spec_helper' +require 'datadog/appsec/rate_limiter' + +RSpec.describe Datadog::AppSec::RateLimiter do + before { described_class.reset!(:traces) } + + describe '#limit' do + context 'in different threads' do + before { stub_const("#{described_class}::TRACES_KEY", :__spec_traces) } + + it 'creates separate rate limiter per thread' do + thread_1 = Thread.new do + described_class.limit(:traces) { 'workload' } + end + + thread_2 = Thread.new do + described_class.limit(:traces) { 'workload' } + end + + thread_1.join + thread_2.join + + rate_limiter_1 = thread_1.thread_variable_get(:__spec_traces) + rate_limiter_2 = thread_2.thread_variable_get(:__spec_traces) + + expect(rate_limiter_1.object_id).not_to eq(rate_limiter_2.object_id) + end + end + end +end