Skip to content

Commit

Permalink
FFM-11211 Metrics enhancements (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdirowlands authored May 16, 2024
1 parent 44f4ed6 commit e575e79
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 51 deletions.
2 changes: 1 addition & 1 deletion lib/ff/ruby/server/sdk/api/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def initialize

@frequency = @@min_frequency

@buffer_size = 2048
@buffer_size = 10000

@all_attributes_private = false

Expand Down
145 changes: 95 additions & 50 deletions lib/ff/ruby/server/sdk/api/metrics_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ def initialize(options = nil, &block)

def increment(key)
compute(key) do |old_value|
if old_value == nil; 1 else old_value + 1 end
if old_value == nil;
1
else
old_value + 1
end
end
end

Expand All @@ -39,7 +43,6 @@ def drain_to_map
end
end


def init(connector, config, callback)

unless connector.kind_of?(Connector)
Expand All @@ -59,14 +62,12 @@ def init(connector, config, callback)
@connector = connector

@sdk_type = "SDK_TYPE"
@global_target_set = Set[]
@staging_target_set = Set[]
@target_attribute = "target"
@global_target = "__global__cf_target" # <--- This target identifier is used to aggregate and send data for all
@global_target_identifier = "__global__cf_target" # <--- This target identifier is used to aggregate and send data for all
# targets as a summary

@global_target = Target.new("RubySDK1", identifier = @global_target_identifier, name = @global_target_name)
@ready = false
@jar_version = ""
@jar_version = Ff::Ruby::Server::Sdk::VERSION
@server = "server"
@sdk_version = "SDK_VERSION"
@sdk_language = "SDK_LANGUAGE"
Expand All @@ -76,10 +77,20 @@ def init(connector, config, callback)

@executor = Concurrent::FixedThreadPool.new(10)

@frequency_map = FrequencyMap.new
@evaluation_metrics = FrequencyMap.new
@target_metrics = Concurrent::Map.new

# Keep track of targets that have already been sent to avoid sending them again
@seen_targets = Concurrent::Map.new

@max_buffer_size = config.buffer_size - 1

# Max 100k targets per interval
@max_targets_buffer_size = 100000

@evaluation_warning_issued = Concurrent::AtomicBoolean.new
@target_warning_issued = Concurrent::AtomicBoolean.new

@callback.on_metrics_ready
end

Expand All @@ -99,65 +110,102 @@ def close
end

def register_evaluation(target, feature_config, variation)
register_evaluation_metric(feature_config, variation)
register_target_metric(target)
end

if @frequency_map.size > @max_buffer_size
@config.logger.warn "metrics buffer is full #{@frequency_map.size} - flushing metrics"
@executor.post do
run_one_iteration
private

def register_evaluation_metric(feature_config, variation)
if @evaluation_metrics.size > @max_buffer_size
unless @evaluation_warning_issued.true?
SdkCodes.warn_metrics_evaluations_max_size_exceeded(@config.logger)
@evaluation_warning_issued.make_true
end
return
end

event = MetricsEvent.new(feature_config, target, variation)
@frequency_map.increment event
event = MetricsEvent.new(feature_config, @global_target, variation)
@evaluation_metrics.increment event
end

private
def register_target_metric(target)
if @target_metrics.size > @max_targets_buffer_size
unless @target_warning_issued.true?
SdkCodes.warn_metrics_targets_max_size_exceeded(@config.logger)
@target_warning_issued.make_true
end
return
end

if target.is_private
return
end

already_seen = @seen_targets.put_if_absent(target.identifier, true)

if already_seen
return
end

@target_metrics.put(target.identifier, target)
end

def run_one_iteration
send_data_and_reset_cache @frequency_map.drain_to_map
send_data_and_reset_cache(@evaluation_metrics, @target_metrics)

@config.logger.debug "metrics: frequency map size #{@frequency_map.size}. global target size #{@global_target_set.size}"
@config.logger.debug "metrics: frequency map size #{@evaluation_metrics.size}. targets map size #{@target_metrics.size} global target size #{@seen_targets.size}"
end

def send_data_and_reset_cache(map)
metrics = prepare_summary_metrics_body(map)
def send_data_and_reset_cache(evaluation_metrics_map, target_metrics_map)
evaluation_metrics_map_clone = evaluation_metrics_map.drain_to_map

target_metrics_map_clone = Concurrent::Map.new

target_metrics_map.each_pair do |key, value|
target_metrics_map_clone[key] = value
end

target_metrics_map.clear

@evaluation_warning_issued.make_false
@target_warning_issued.make_false

metrics = prepare_summary_metrics_body(evaluation_metrics_map_clone, target_metrics_map_clone)

if !metrics.metrics_data.empty? && !metrics.target_data.empty?
unless metrics.metrics_data.empty?
start_time = (Time.now.to_f * 1000).to_i
@connector.post_metrics(metrics)
end_time = (Time.now.to_f * 1000).to_i
if end_time - start_time > @config.metrics_service_acceptable_duration
@config.logger.debug "Metrics service API duration=[" + (end_time - start_time).to_s + "]"
end
end

@global_target_set.merge(@staging_target_set)
@staging_target_set.clear

end

def prepare_summary_metrics_body(freq_map)
def prepare_summary_metrics_body(evaluation_metrics_map, target_metrics_map)
metrics = OpenapiClient::Metrics.new({ :target_data => [], :metrics_data => [] })
add_target_data(metrics, Target.new(name = @global_target_name, identifier = @global_target))
freq_map.each_key do |key|
add_target_data(metrics, key.target)
end

total_count = 0
freq_map.each do |key, value|
evaluation_metrics_map.each do |key, value|
total_count += value
metrics_data = OpenapiClient::MetricsData.new({ :attributes => [] })
metrics_data.timestamp = (Time.now.to_f * 1000).to_i
metrics_data.count = value
metrics_data.metrics_type = "FFMETRICS"
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @feature_name_attribute, :value => key.feature_config.feature }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @variation_identifier_attribute, :value => key.variation.identifier }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @target_attribute, :value => @global_target }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @target_attribute, :value => @global_target_identifier }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @sdk_type, :value => @server }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @sdk_language, :value => "ruby" }))
metrics_data.attributes.push(OpenapiClient::KeyValue.new({ :key => @sdk_version, :value => @jar_version }))
metrics.metrics_data.push(metrics_data)
end
@config.logger.debug "Pushed #{total_count} metric evaluations to server. metrics_data count is #{freq_map.size}"
@config.logger.debug "Pushed #{total_count} metric evaluations to server. metrics_data count is #{evaluation_metrics_map.size}. target_data count is #{target_metrics_map.size}"

target_metrics_map.each_pair do |_, value|
add_target_data(metrics, value)
end

metrics
end
Expand All @@ -167,28 +215,25 @@ def add_target_data(metrics, target)
target_data = OpenapiClient::TargetData.new({ :attributes => [] })
private_attributes = target.private_attributes

if !@staging_target_set.include?(target) && !@global_target_set.include?(target) && !target.is_private
@staging_target_set.add(target)
attributes = target.attributes
attributes.each do |k, v|
key_value = OpenapiClient::KeyValue.new
if !private_attributes.empty?
unless private_attributes.include?(k)
key_value = OpenapiClient::KeyValue.new({ :key => k, :value => v.to_s })
end
else
attributes = target.attributes
attributes.each do |k, v|
key_value = OpenapiClient::KeyValue.new
if !private_attributes.empty?
unless private_attributes.include?(k)
key_value = OpenapiClient::KeyValue.new({ :key => k, :value => v.to_s })
end
target_data.attributes.push(key_value)
end
target_data.identifier = target.identifier
if target.name == nil || target.name == ""
target_data.name = target.identifier
else
target_data.name = target.name
key_value = OpenapiClient::KeyValue.new({ :key => k, :value => v.to_s })
end
metrics.target_data.push(target_data)
target_data.attributes.push(key_value)
end
target_data.identifier = target.identifier
if target.name == nil || target.name == ""
target_data.name = target.identifier
else
target_data.name = target.name
end
metrics.target_data.push(target_data)
end

def start_async
Expand Down Expand Up @@ -218,7 +263,7 @@ def get_version
end

def get_frequency_map
@frequency_map
@evaluation_metrics
end

end
9 changes: 9 additions & 0 deletions lib/ff/ruby/server/sdk/common/sdk_codes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ def self.info_metrics_thread_started(logger)
logger.info SdkCodes.sdk_err_msg(7000)
end

def self.warn_metrics_targets_max_size_exceeded(logger)
logger.warn SdkCodes.sdk_err_msg(7004)
end

def self.warn_metrics_evaluations_max_size_exceeded(logger)
logger.warn SdkCodes.sdk_err_msg(7007)
end
def self.warn_auth_failed_srv_defaults(logger)
logger.warn SdkCodes.sdk_err_msg(2001)
end
Expand Down Expand Up @@ -89,6 +96,8 @@ def self.warn_bucket_by_attr_not_found(logger, attr_name, new_value)
7000 => "Metrics thread started",
7001 => "Metrics thread exited",
7002 => "Posting metrics failed, reason:",
7004 => "Target metrics exceeded max size, remaining targets for this analytics interval will not be sent",
7007 => "Evaluation metrics exceeded max size, remaining evaluations for this analytics interval will not be sent"
}

def self.sdk_err_msg(error_code, append_text = "")
Expand Down

0 comments on commit e575e79

Please sign in to comment.