Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for canonical event field names for data-points/tags #85

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-retention_policy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-send_as_tags>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-send_array_field_as_tags>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-ssl>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-time_precision>> |<<string,string>>, one of `["n", "u", "ms", "s", "m", "h"]`|No
| <<plugins-{type}s-{plugin}-use_event_fields_for_data_points>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-use_hash_field_for_data_points>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
|=======================================================================

Expand Down Expand Up @@ -230,8 +232,20 @@ The retention policy to use
An array containing the names of fields to send to Influxdb as tags instead
of fields. Influxdb 0.9 convention is that values that do not change every
request should be considered metadata and given as tags. Tags are only sent when
present in `data_points` or if `user_event_fields_for_data_points` is `true`.
present in `data_points` or if `use_hash_field_for_data_points` is specified
or if `use_event_fields_for_data_points` is `true`.

[id="plugins-{type}s-{plugin}-send_array_field_as_tags"]
===== `send_array_field_as_tags`

* Value type is <<string,string>>
* Default value is `nil`

Canonical name of an array field present in the event containing the names of fields to send to Influxdb
as tags instead of fields. Tags are only sent when present in `data_points` or if `use_hash_field_for_data_points`
is specified or if `use_event_fields_for_data_points` is `true`.
This configuration parameter overrides the behavior specified by `send_as_tags`.

[id="plugins-{type}s-{plugin}-ssl"]
===== `ssl`

Expand All @@ -258,6 +272,15 @@ only useful when overriding the time value

Automatically use fields from the event as the data points sent to Influxdb

[id="plugins-{type}s-{plugin}-use_hash_field_for_data_points"]
===== `use_hash_field_for_data_points`

* Value type is <<string,string>>
* Default value is `nil`

Specify a canonical name of a hash field present in the event containing the data points sent to Influxdb
This configuration parameter overrides the behavior specified by `use_event_fields_for_data_points`.

[id="plugins-{type}s-{plugin}-user"]
===== `user`

Expand Down
36 changes: 31 additions & 5 deletions lib/logstash/outputs/influxdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base

# Automatically use fields from the event as the data points sent to Influxdb
config :use_event_fields_for_data_points, :validate => :boolean, :default => false

# Derive all data points from the give event hash field path. If this configuration parameter
# is specified, `use_event_fields_for_data_points` is ignored.
config :use_hash_field_for_data_points, :validate => :string, :default => nil

# An array containing the names of fields from the event to exclude from the
# data points
Expand All @@ -95,6 +99,11 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
# Tags are only sent when present in `data_points` or if `use_event_fields_for_data_points` is `true`.
config :send_as_tags, :validate => :array, :default => ["host"]

# Similar in its behavior/defintion to `send_as_tags`, this configuration parameter
# specifies the event array field path from tags are to be derived.
# If it is specified, `send_as_tags` is ignored.
config :send_array_field_as_tags, :validate => :string, :default => nil

# This setting controls how many events will be buffered before sending a batch
# of events. Note that these are only batched for the same measurement
config :flush_size, :validate => :number, :default => 100
Expand Down Expand Up @@ -167,7 +176,7 @@ def receive(event)



tags, point = extract_tags(point)
tags, point = extract_tags(event,point)

event_hash = {
:series => event.sprintf(@measurement),
Expand Down Expand Up @@ -201,13 +210,22 @@ def close
buffer_flush(:final => true)
end # def teardown


def resolve_data_points(event)
if !(@use_hash_field_for_data_points.nil? || @use_hash_field_for_data_points.empty?)
return event.get(@use_hash_field_for_data_points).to_hash
elsif @use_event_fields_for_data_points
return event.to_hash
else
return @data_points
end
end

# Create a data point from an event. If @use_event_fields_for_data_points is
# true, convert the event to a hash. Otherwise, use @data_points. Each key and
# value will be run through event#sprintf with the exception of a non-String
# value (which will be passed through)
def create_point_from_event(event)
Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v|
Hash[ resolve_data_points(event).map do |k,v|
[event.sprintf(k), (String === v ? event.sprintf(v) : v)]
end ]
end
Expand Down Expand Up @@ -259,6 +277,14 @@ def exclude_fields!(event_data)
end


def resolve_tags(event)
if @send_array_field_as_tags
return event.get(@send_array_field_as_tags)
else
return @send_as_tags
end
end

# Extract tags from a hash of fields.
# Returns a tuple containing a hash of tags (as configured by send_as_tags)
# and a hash of fields that exclude the tags. If fields contains a key
Expand All @@ -270,7 +296,7 @@ def exclude_fields!(event_data)
# original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]}
# tags, fields = extract_tags(original_fields)
# # tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1}
def extract_tags(fields)
def extract_tags(event,fields)
remainder = fields.dup

tags = if remainder.has_key?("tags") && remainder["tags"].respond_to?(:inject)
Expand All @@ -279,7 +305,7 @@ def extract_tags(fields)
{}
end

@send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }
resolve_tags(event).each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }

tags.delete_if { |key,value| value.nil? || value == "" }
remainder.delete_if { |key,value| value.nil? || value == "" }
Expand Down
67 changes: 42 additions & 25 deletions logstash-output-influxdb.gemspec
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-influxdb'
s.version = '5.0.5'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes metrics to InfluxDB"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
s.email = '[email protected]'
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
s.require_paths = ["lib"]

# Files
s.files = Dir["lib/**/*","spec/**/*","*.gemspec","*.md","CONTRIBUTORS","Gemfile","LICENSE","NOTICE.TXT", "vendor/jar-dependencies/**/*.jar", "vendor/jar-dependencies/**/*.rb", "VERSION", "docs/**/*"]
# -*- encoding: utf-8 -*-
# stub: logstash-output-influxdb 5.0.5 ruby lib

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})
Gem::Specification.new do |s|
s.name = "logstash-output-influxdb".freeze
s.version = "5.0.6"

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
s.required_rubygems_version = Gem::Requirement.new(">= 0".freeze) if s.respond_to? :required_rubygems_version=
s.metadata = { "logstash_group" => "output", "logstash_plugin" => "true" } if s.respond_to? :metadata=
s.require_paths = ["lib".freeze]
s.authors = ["Elastic".freeze]
s.date = "2019-08-22"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program".freeze
s.email = "[email protected]".freeze
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html".freeze
s.licenses = ["Apache License (2.0)".freeze]
s.rubygems_version = "2.7.9".freeze
s.summary = "Writes metrics to InfluxDB".freeze

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.installed_by_version = "2.7.9" if s.respond_to? :installed_by_version

s.add_runtime_dependency 'stud'
s.add_runtime_dependency 'influxdb' , ">= 0.3", "<= 0.3.99"
if s.respond_to? :specification_version then
s.specification_version = 4

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'logstash-input-generator'
s.add_development_dependency 'logstash-filter-kv'
if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_runtime_dependency(%q<logstash-core-plugin-api>.freeze, [">= 1.60", "<= 2.99"])
s.add_runtime_dependency(%q<stud>.freeze, [">= 0"])
s.add_runtime_dependency(%q<influxdb>.freeze, [">= 0.3", "<= 0.3.99"])
s.add_development_dependency(%q<logstash-devutils>.freeze, [">= 0"])
s.add_development_dependency(%q<logstash-input-generator>.freeze, [">= 0"])
s.add_development_dependency(%q<logstash-filter-kv>.freeze, [">= 0"])
else
s.add_dependency(%q<logstash-core-plugin-api>.freeze, [">= 1.60", "<= 2.99"])
s.add_dependency(%q<stud>.freeze, [">= 0"])
s.add_dependency(%q<influxdb>.freeze, [">= 0.3", "<= 0.3.99"])
s.add_dependency(%q<logstash-devutils>.freeze, [">= 0"])
s.add_dependency(%q<logstash-input-generator>.freeze, [">= 0"])
s.add_dependency(%q<logstash-filter-kv>.freeze, [">= 0"])
end
else
s.add_dependency(%q<logstash-core-plugin-api>.freeze, [">= 1.60", "<= 2.99"])
s.add_dependency(%q<stud>.freeze, [">= 0"])
s.add_dependency(%q<influxdb>.freeze, [">= 0.3", "<= 0.3.99"])
s.add_dependency(%q<logstash-devutils>.freeze, [">= 0"])
s.add_dependency(%q<logstash-input-generator>.freeze, [">= 0"])
s.add_dependency(%q<logstash-filter-kv>.freeze, [">= 0"])
end
end

29 changes: 29 additions & 0 deletions spec/outputs/influxdb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,35 @@

end

context "read data-points/tags from speicifc event fields" do

let(:config) do
{
"host" => "localhost",
"measurement" => "my_series",
"allow_time_override" => true,
"use_event_fields_for_data_points" => true,
"use_hash_field_for_data_points" => "[pts]",
"send_array_field_as_tags" => "[tags]",
}
end

before do
subject.register
allow(subject).to receive(:dowrite).with(result, "statistics")
subject.receive(event)
subject.close
end

let(:event) {LogStash::Event.new("pts" => {"foo" => "bar","fee" =>"buzz","time" => 22}, "tags" => ["foo"], "type" => "generator")}
let(:result) {[{:series=>"my_series", :timestamp=>22, :tags=>{"foo"=>"bar"}, :values=>{"fee"=>"buzz"}}]}

it "should use the hash field entries as the data points and array field as tags" do
expect(subject).to have_received(:dowrite).with(result, "statistics")
end

end

context "when fields are coerced to numerics" do

let(:config) do
Expand Down