Skip to content

Commit

Permalink
fix: handle receiver warning messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Świątek committed Apr 8, 2022
1 parent 1b38827 commit eb29a88
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
29 changes: 27 additions & 2 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ class SumologicConnection
COMPRESS_DEFLATE = 'deflate'
COMPRESS_GZIP = 'gzip'

def initialize(endpoint, verify_ssl, connect_timeout, send_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding)
def initialize(endpoint, verify_ssl, connect_timeout, send_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding, logger)
@endpoint = endpoint
@sumo_client = sumo_client
create_http_client(verify_ssl, connect_timeout, send_timeout, proxy_uri, disable_cookies)
@compress = compress_enabled
@compress_encoding = (compress_encoding ||= COMPRESS_GZIP).downcase
@logger = logger

unless [COMPRESS_DEFLATE, COMPRESS_GZIP].include? @compress_encoding
raise "Invalid compression encoding #{@compress_encoding} must be gzip or deflate"
Expand All @@ -30,6 +31,29 @@ def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, dat
unless response.ok?
raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
end

# response is 20x, check response content
return if response.content.length == 0

# if we get a non-empty response, check it
begin
response_map = JSON.load(response.content)
rescue JSON::ParserError
@logger.warn "Error decoding receiver response: #{response.content}"
return
end

# log a warning with the present keys
response_keys = ["id", "code", "status", "message", "errors"]
log_params = []
response_keys.each do |key|
if response_map.has_key?(key) then
value = response_map[key]
log_params.append("#{key}: #{value}")
end
end
log_params_str = log_params.join(", ")
@logger.warn "There was an issue sending data: #{log_params_str}"
end

def request_headers(source_host, source_category, source_name, data_type, metric_data_format, collected_fields, dimensions)
Expand Down Expand Up @@ -218,7 +242,8 @@ def configure(conf)
conf['disable_cookies'],
conf['sumo_client'],
conf['compress'],
conf['compress_encoding']
conf['compress_encoding'],
log,
)
super
end
Expand Down
30 changes: 29 additions & 1 deletion test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,34 @@ def test_emit_fields_invalid_json_string_based_3
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"\\"foo\\\\\\": \\\\\\"bar\\\\\\", \\\\\\"mess"}\z/,
times:1
end
end

def test_warning_response_from_receiver
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"
config = %{
endpoint #{endpoint}
}
testdata = [
[
'{"id":"1TIRY-KGIVX-TPQRJ","errors":[{"code":"internal.error","message":"Internal server error."}]}',
'There was an issue sending data: id: 1TIRY-KGIVX-TPQRJ, errors: [{"code"=>"internal.error", "message"=>"Internal server error."}]'
],
[
'{"id":"1TIRY-KGIVX-TPQRX","code": 200, "status": "Fields dropped", "message": "Dropped fields above the 30 field limit"}',
'There was an issue sending data: id: 1TIRY-KGIVX-TPQRX, code: 200, status: Fields dropped, message: Dropped fields above the 30 field limit'
],
]
time = event_time

testdata.each do |data, log|
driver = create_driver(config)
stub_request(:post, endpoint).to_return(body: data, headers: {content_type: 'application/json'})
driver.run do
driver.feed("test", time, {"message": "test"})
end
assert_equal driver.logs.length, 1
assert driver.logs[0].end_with?(log + "\n")
end
end

end

0 comments on commit eb29a88

Please sign in to comment.