Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use concurrent timer to bulk send multiple cloudwatch metric_data #2

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ jobs:
include:
- ruby: 3.0
- ruby: 2.7
- ruby: 2.6
container:
image: ruby:${{ matrix.ruby }}
env:
Expand Down
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require:
- rubocop-rspec

AllCops:
TargetRubyVersion: 2.3
TargetRubyVersion: 2.7

Metrics/BlockLength:
Exclude:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## Unrelease

- Add ability to push multiple metrics in bulk. @retsef

## 0.1.4 - 2023-02-13

- Add missing activesupport dependency. @retsef
Expand Down
12 changes: 9 additions & 3 deletions lib/yabeda/cloudwatch/adapter.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# frozen_string_literal: true

require 'yabeda/base_adapter'
require 'yabeda/cloudwatch/bulk_metrics'

module Yabeda
# Yabeda AWS Cloudwatch adapter
class Cloudwatch::Adapter < BaseAdapter
attr_reader :connection
attr_reader :connection, :client

def initialize(connection:)
def initialize(connection:, auto_flush: true, auto_flush_interval: 0)
super()

@connection = connection
@client = connection
@connection = Cloudwatch::BulkMetrics.new(@client, auto_flush: auto_flush, interval: auto_flush_interval)
end

def register_counter!(counter)
Expand Down Expand Up @@ -69,5 +71,9 @@ def perform_histogram_measure!(histogram, tags, value)
],
)
end

def flush
@connection.flush
end
end
end
92 changes: 92 additions & 0 deletions lib/yabeda/cloudwatch/bulk_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# frozen_string_literal: true

require 'concurrent'
require 'json'

module Yabeda
module Cloudwatch
# Class for bulk sending metrics to Cloudwatch
class BulkMetrics
MAX_METRICS_BULK = 1000
MAX_BULK_BYTES = 1024.0 * 1024.0
attr_reader :interval, :auto_flush, :timer

def initialize(client, auto_flush: true, interval: 3)
@client = client
@metrics = []
@interval = interval
@auto_flush = auto_flush

return unless @interval.positive?

@timer = Concurrent::TimerTask.new(execution_interval: @interval) do
flush
end
end

def put_metric_data(namespace:, metric_data:)
@metrics << {
namespace: namespace,
metric_data: metric_data,
}

return unless auto_flush
return flush unless @timer

# Stop and rerun to extend the timer until maximum metrics reach
@timer.shutdown if @timer.running? && @metrics.size <= MAX_METRICS_BULK
@timer.execute unless @timer.running?
end

# Need some way to test concurrency properly when dispatch metrics and in the meantime not discard the incoming
def flush
@metrics.group_by { _1[:namespace] }.each do |namespace, metrics|
metrics.each_slice(MAX_METRICS_BULK).each do |metric_slices|
metric_data = metric_slices.flat_map { _1[:metric_data] }

slice_by_max_payload_allowed(namespace, metric_data).each do |payload_sliced|
@client.put_metric_data(
namespace: namespace,
metric_data: payload_sliced,
)
end
end
end

@metrics.clear
end

private

def slice_by_max_payload_allowed(namespace, metric_data)
slices = []
cursor = 0
offset = 0
# Needs to be optimized. Instead of cycle through all indexes maybe should use chunks instead
metric_data.size.times.each do
offset += 1
slice = metric_data[cursor, offset]

# When is exceeded the max size, i will get the slice right before
next unless payload_exceed?({ namespace: namespace, metric_data: slice })

slice.pop
slices << slice

# move to next slice frame
cursor = offset - 1
offset = 1
end

slices << metric_data[cursor, offset] if slices.size < metric_data.size
return [metric_data] if slices.empty?

slices
end

def payload_exceed?(items)
JSON.dump(items).size >= MAX_BULK_BYTES
end
end
end
end
27 changes: 27 additions & 0 deletions spec/yabeda/cloudwatch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,32 @@
))
end
end

context 'with bulk enabled' do
# rubocop:disable Style/TrailingCommaInArguments
before do
Yabeda.register_adapter(:cloudwatch,
Yabeda::Cloudwatch::Adapter.new(connection: aws_client, auto_flush_interval: 2))
end
# rubocop:enable Style/TrailingCommaInArguments

specify 'send in bulk metrics to cloudwatch' do
Yabeda.test.counter.increment({ ctag: 'ctag-value' })
Yabeda.test.gauge.set({ gtag: 'gtag-value' }, 42)
Yabeda.test.histogram.measure({ htag: 'htag-value' }, 7.5)

sleep 3
expect(aws_client).to have_received(:put_metric_data).once
end

specify 'send in bulk metrics to cloudwatch in bulk of 1000 maximum' do
1300.times do
Yabeda.test.histogram.measure({ htag: 'htag-value' }, 7.5)
end

sleep 5
expect(aws_client).to have_received(:put_metric_data).twice
end
end
end
end
4 changes: 2 additions & 2 deletions yabeda-cloudwatch.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

spec.required_ruby_version = '>= 2.3'
spec.required_ruby_version = '>= 2.7'

spec.add_dependency 'activesupport'
spec.add_dependency 'aws-sdk-cloudwatch'
spec.add_dependency 'yabeda', '~> 0.10'
spec.add_dependency 'activesupport'

spec.add_development_dependency 'bundler', '~> 2.0'
spec.add_development_dependency 'rake', '~> 13.0'
Expand Down