Skip to content

Commit

Permalink
Replace AppSec rate limiter with core rate limiter
Browse files Browse the repository at this point in the history
This allows us to be more precise in throttling outgoing AppSec traces
and removes additional custom logic of rate limiting.

Also RBS definitions of rate limiters are updated
  • Loading branch information
Strech committed Oct 8, 2024
1 parent f0dd28e commit 3a78df0
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 47 deletions.
52 changes: 23 additions & 29 deletions lib/datadog/appsec/rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
# frozen_string_literal: true

require_relative '../core/rate_limiter'

module Datadog
module AppSec
# Simple per-thread rate limiter
# Since AppSec marks sampling to keep on a security event, this limits the flood of egress traces involving AppSec
# Per-thread rate limiter based on token bucket rate limiter.
#
# Since AppSec marks sampling to keep on a security event, this limits
# the flood of egress traces involving AppSec
class RateLimiter
def initialize(rate)
@rate = rate
@timestamps = []
end

def limit
now = Time.now.to_f

loop do
oldest = @timestamps.first

break if oldest.nil? || now - oldest < 1

@timestamps.shift
end

@timestamps << now

if (count = @timestamps.count) <= @rate
yield
else
Datadog.logger.debug { "Rate limit hit: #{count}/#{@rate} AppSec traces/second" }
end
end
TRACES_KEY = :datadog_security_traces_rate_limiter

class << self
def limit(name, &block)
Expand All @@ -37,15 +18,18 @@ def limit(name, &block)

# reset a rate limiter: used for testing
def reset!(name)
Thread.current[:datadog_security_trace_rate_limiter] = nil
Thread.current.thread_variable_set(TRACES_KEY, nil)
end

protected
private

def rate_limiter(name)
case name
when :traces
Thread.current[:datadog_security_trace_rate_limiter] ||= RateLimiter.new(trace_rate_limit)
rate_limiter = Thread.current.thread_variable_get(TRACES_KEY)
return rate_limiter unless rate_limiter.nil?

Thread.current.thread_variable_set(TRACES_KEY, new(trace_rate_limit))
else
raise "unsupported rate limiter: #{name.inspect}"
end
Expand All @@ -55,6 +39,16 @@ def trace_rate_limit
Datadog.configuration.appsec.trace_rate_limit
end
end

def initialize(rate)
@rate_limiter = Core::TokenBucket.new(rate)
end

def limit
return yield if @rate_limiter.allow?

Datadog.logger.debug { "Rate limit hit: #{@rate_limiter.current_window_rate} AppSec traces/second" }
end
end
end
end
23 changes: 12 additions & 11 deletions sig/datadog/appsec/rate_limiter.rbs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
module Datadog
module AppSec
class RateLimiter
type timestamp = ::Float
type rate = ::Integer
@rate_limiter: Core::TokenBucket

@rate: ::Integer
@timestamps: ::Array[timestamp]
TRACES_KEY: :datadog_security_traces_rate_limiter

def initialize: (rate rate) -> void
def self.limit: (::Symbol name) { (?) -> untyped } -> untyped

# TODO: return type of limit is return type of block
def limit: () { () -> untyped } -> untyped

def self.limit: (::Symbol name) { () -> untyped } -> untyped
def self.reset!: (::Symbol name) -> untyped

def self.reset!: (::Symbol name) -> void
private

def self.rate_limiter: (::Symbol name) -> RateLimiter

def self.trace_rate_limit: () -> rate
def self.trace_rate_limit: () -> untyped

public

def initialize: (::Integer rate) -> void

def limit: () { () -> untyped } -> untyped
end
end
end
36 changes: 31 additions & 5 deletions sig/datadog/core/rate_limiter.rbs
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
module Datadog
module Core
class RateLimiter
def allow?: (untyped size) -> nil
def allow?: (?::Integer size) -> (false | true)

def effective_rate: () -> nil
end

class TokenBucket < RateLimiter
@rate: untyped

@max_tokens: untyped

@tokens: untyped

@total_messages: untyped

@conforming_messages: untyped

@prev_conforming_messages: untyped

@prev_total_messages: untyped

@current_window: untyped

@last_refill: untyped

attr_reader rate: untyped

attr_reader max_tokens: untyped

def initialize: (untyped rate, ?untyped max_tokens) -> void
def allow?: (untyped size) -> untyped

def allow?: (?::Integer size) -> (false | true)

def effective_rate: () -> (::Float | untyped)

def current_window_rate: () -> (::Float | untyped)

def available_tokens: () -> untyped

private
Expand All @@ -24,11 +49,12 @@ module Datadog

def increment_conforming_count: () -> untyped

def should_allow?: (untyped size) -> (false | true)
def update_rate_counts: (untyped allowed) -> untyped
def should_allow?: (?::Integer size) -> (false | true)
end

class UnlimitedLimiter < RateLimiter
def allow?: (untyped _) -> true
def allow?: (?::Integer _) -> true

def effective_rate: () -> ::Float
end
end
Expand Down
8 changes: 6 additions & 2 deletions spec/datadog/appsec/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,12 @@
end

context 'with many traces' do
let(:rate_limit) { 100 }
before do
allow(Datadog::Core::Utils::Time).to receive(:get_time).and_return(0)
allow(Datadog::AppSec::RateLimiter).to receive(:trace_rate_limit).and_return(rate_limit)
end

let(:rate_limit) { 50 }
let(:trace_count) { rate_limit * 2 }

let(:traces) do
Expand All @@ -351,7 +356,6 @@

it 'rate limits event recording' do
expect(described_class).to receive(:record_via_span).exactly(rate_limit).times.and_call_original

expect(traces).to have_attributes(count: trace_count)
end
end
Expand Down
30 changes: 30 additions & 0 deletions spec/datadog/appsec/rate_limiter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
require 'datadog/appsec/spec_helper'
require 'datadog/appsec/rate_limiter'

RSpec.describe Datadog::AppSec::RateLimiter do
before { described_class.reset!(:traces) }

describe '#limit' do
context 'in different threads' do
before { stub_const("#{described_class}::TRACES_KEY", :__spec_traces) }

it 'creates separate rate limiter per thread' do
thread_1 = Thread.new do
described_class.limit(:traces) { 'workload' }
end

thread_2 = Thread.new do
described_class.limit(:traces) { 'workload' }
end

thread_1.join
thread_2.join

rate_limiter_1 = thread_1.thread_variable_get(:__spec_traces)
rate_limiter_2 = thread_2.thread_variable_get(:__spec_traces)

expect(rate_limiter_1.object_id).not_to eq(rate_limiter_2.object_id)
end
end
end
end

0 comments on commit 3a78df0

Please sign in to comment.