diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 3e4b49c..e6904d5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -55,9 +55,11 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>, one of `["n", "u", "ms", "s", "m", "h"]`|No | <> |<>|No +| <> |<>|No | <> |<>|No |======================================================================= @@ -230,8 +232,20 @@ The retention policy to use An array containing the names of fields to send to Influxdb as tags instead of fields. Influxdb 0.9 convention is that values that do not change every request should be considered metadata and given as tags. Tags are only sent when -present in `data_points` or if `user_event_fields_for_data_points` is `true`. +present in `data_points` or if `use_hash_field_for_data_points` is specified +or if `use_event_fields_for_data_points` is `true`. +[id="plugins-{type}s-{plugin}-send_array_field_as_tags"] +===== `send_array_field_as_tags` + + * Value type is <> + * Default value is `nil` + +Canonical name of an array field present in the event containing the names of fields to send to Influxdb +as tags instead of fields. Tags are only sent when present in `data_points` or if `use_hash_field_for_data_points` +is specified or if `use_event_fields_for_data_points` is `true`. +This configuration parameter overrides the behavior specified by `send_as_tags`. + [id="plugins-{type}s-{plugin}-ssl"] ===== `ssl` @@ -258,6 +272,15 @@ only useful when overriding the time value Automatically use fields from the event as the data points sent to Influxdb +[id="plugins-{type}s-{plugin}-use_hash_field_for_data_points"] +===== `use_hash_field_for_data_points` + + * Value type is <> + * Default value is `nil` + +Specify a canonical name of a hash field present in the event containing the data points sent to Influxdb +This configuration parameter overrides the behavior specified by `use_event_fields_for_data_points`. + [id="plugins-{type}s-{plugin}-user"] ===== `user` diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index 9d5f0b9..97d1aac 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -78,6 +78,10 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # Automatically use fields from the event as the data points sent to Influxdb config :use_event_fields_for_data_points, :validate => :boolean, :default => false + + # Derive all data points from the give event hash field path. If this configuration parameter + # is specified, `use_event_fields_for_data_points` is ignored. + config :use_hash_field_for_data_points, :validate => :string, :default => nil # An array containing the names of fields from the event to exclude from the # data points @@ -95,6 +99,11 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # Tags are only sent when present in `data_points` or if `use_event_fields_for_data_points` is `true`. config :send_as_tags, :validate => :array, :default => ["host"] + # Similar in its behavior/defintion to `send_as_tags`, this configuration parameter + # specifies the event array field path from tags are to be derived. + # If it is specified, `send_as_tags` is ignored. + config :send_array_field_as_tags, :validate => :string, :default => nil + # This setting controls how many events will be buffered before sending a batch # of events. Note that these are only batched for the same measurement config :flush_size, :validate => :number, :default => 100 @@ -167,7 +176,7 @@ def receive(event) - tags, point = extract_tags(point) + tags, point = extract_tags(event,point) event_hash = { :series => event.sprintf(@measurement), @@ -201,13 +210,22 @@ def close buffer_flush(:final => true) end # def teardown - + def resolve_data_points(event) + if !(@use_hash_field_for_data_points.nil? || @use_hash_field_for_data_points.empty?) + return event.get(@use_hash_field_for_data_points).to_hash + elsif @use_event_fields_for_data_points + return event.to_hash + else + return @data_points + end + end + # Create a data point from an event. If @use_event_fields_for_data_points is # true, convert the event to a hash. Otherwise, use @data_points. Each key and # value will be run through event#sprintf with the exception of a non-String # value (which will be passed through) def create_point_from_event(event) - Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v| + Hash[ resolve_data_points(event).map do |k,v| [event.sprintf(k), (String === v ? event.sprintf(v) : v)] end ] end @@ -259,6 +277,14 @@ def exclude_fields!(event_data) end + def resolve_tags(event) + if @send_array_field_as_tags + return event.get(@send_array_field_as_tags) + else + return @send_as_tags + end + end + # Extract tags from a hash of fields. # Returns a tuple containing a hash of tags (as configured by send_as_tags) # and a hash of fields that exclude the tags. If fields contains a key @@ -270,7 +296,7 @@ def exclude_fields!(event_data) # original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]} # tags, fields = extract_tags(original_fields) # # tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1} - def extract_tags(fields) + def extract_tags(event,fields) remainder = fields.dup tags = if remainder.has_key?("tags") && remainder["tags"].respond_to?(:inject) @@ -279,7 +305,7 @@ def extract_tags(fields) {} end - @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } + resolve_tags(event).each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } tags.delete_if { |key,value| value.nil? || value == "" } remainder.delete_if { |key,value| value.nil? || value == "" } diff --git a/logstash-output-influxdb.gemspec b/logstash-output-influxdb.gemspec index 8df0649..42b16d2 100644 --- a/logstash-output-influxdb.gemspec +++ b/logstash-output-influxdb.gemspec @@ -1,31 +1,48 @@ -Gem::Specification.new do |s| - s.name = 'logstash-output-influxdb' - s.version = '5.0.5' - s.licenses = ['Apache License (2.0)'] - s.summary = "Writes metrics to InfluxDB" - s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" - s.authors = ["Elastic"] - s.email = 'info@elastic.co' - s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" - s.require_paths = ["lib"] - - # Files - s.files = Dir["lib/**/*","spec/**/*","*.gemspec","*.md","CONTRIBUTORS","Gemfile","LICENSE","NOTICE.TXT", "vendor/jar-dependencies/**/*.jar", "vendor/jar-dependencies/**/*.rb", "VERSION", "docs/**/*"] +# -*- encoding: utf-8 -*- +# stub: logstash-output-influxdb 5.0.5 ruby lib - # Tests - s.test_files = s.files.grep(%r{^(test|spec|features)/}) +Gem::Specification.new do |s| + s.name = "logstash-output-influxdb".freeze + s.version = "5.0.6" - # Special flag to let us know this is actually a logstash plugin - s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } + s.required_rubygems_version = Gem::Requirement.new(">= 0".freeze) if s.respond_to? :required_rubygems_version= + s.metadata = { "logstash_group" => "output", "logstash_plugin" => "true" } if s.respond_to? :metadata= + s.require_paths = ["lib".freeze] + s.authors = ["Elastic".freeze] + s.date = "2019-08-22" + s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program".freeze + s.email = "info@elastic.co".freeze + s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html".freeze + s.licenses = ["Apache License (2.0)".freeze] + s.rubygems_version = "2.7.9".freeze + s.summary = "Writes metrics to InfluxDB".freeze - # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + s.installed_by_version = "2.7.9" if s.respond_to? :installed_by_version - s.add_runtime_dependency 'stud' - s.add_runtime_dependency 'influxdb' , ">= 0.3", "<= 0.3.99" + if s.respond_to? :specification_version then + s.specification_version = 4 - s.add_development_dependency 'logstash-devutils' - s.add_development_dependency 'logstash-input-generator' - s.add_development_dependency 'logstash-filter-kv' + if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then + s.add_runtime_dependency(%q.freeze, [">= 1.60", "<= 2.99"]) + s.add_runtime_dependency(%q.freeze, [">= 0"]) + s.add_runtime_dependency(%q.freeze, [">= 0.3", "<= 0.3.99"]) + s.add_development_dependency(%q.freeze, [">= 0"]) + s.add_development_dependency(%q.freeze, [">= 0"]) + s.add_development_dependency(%q.freeze, [">= 0"]) + else + s.add_dependency(%q.freeze, [">= 1.60", "<= 2.99"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0.3", "<= 0.3.99"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0"]) + end + else + s.add_dependency(%q.freeze, [">= 1.60", "<= 2.99"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0.3", "<= 0.3.99"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0"]) + s.add_dependency(%q.freeze, [">= 0"]) + end end - diff --git a/spec/outputs/influxdb_spec.rb b/spec/outputs/influxdb_spec.rb index 28d7284..cd7d60a 100644 --- a/spec/outputs/influxdb_spec.rb +++ b/spec/outputs/influxdb_spec.rb @@ -205,6 +205,35 @@ end + context "read data-points/tags from speicifc event fields" do + + let(:config) do + { + "host" => "localhost", + "measurement" => "my_series", + "allow_time_override" => true, + "use_event_fields_for_data_points" => true, + "use_hash_field_for_data_points" => "[pts]", + "send_array_field_as_tags" => "[tags]", + } + end + + before do + subject.register + allow(subject).to receive(:dowrite).with(result, "statistics") + subject.receive(event) + subject.close + end + + let(:event) {LogStash::Event.new("pts" => {"foo" => "bar","fee" =>"buzz","time" => 22}, "tags" => ["foo"], "type" => "generator")} + let(:result) {[{:series=>"my_series", :timestamp=>22, :tags=>{"foo"=>"bar"}, :values=>{"fee"=>"buzz"}}]} + + it "should use the hash field entries as the data points and array field as tags" do + expect(subject).to have_received(:dowrite).with(result, "statistics") + end + + end + context "when fields are coerced to numerics" do let(:config) do