From 708a06c7e26209cc5095b027797b152561f6554e Mon Sep 17 00:00:00 2001 From: Steve Anthony Date: Wed, 28 Oct 2015 09:58:40 -0400 Subject: [PATCH 1/3] Added allow_measurement_override and measurement_from_field flags to allow setting measurement name from a field in the string processed prior to output. For example, when translating graphite input to influxdb output, it's more useful to set measurement from the the metric name than as "logstash". --- lib/logstash/outputs/influxdb.rb | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index cc170f1..14ef608 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -39,6 +39,12 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # Measurement name - supports sprintf formatting config :measurement, :validate => :string, :default => "logstash" + + # Allow measurement from field - allow override of measurement name + config :allow_measurement_override, :validate => :boolean, :default => false + + # Measurement from field - use measurement from field name + config :measurement_from_field, :validate => :string, :default => "measurement" # Hash of key/value pairs representing data points to send to the named database # Example: `{'column1' => 'value1', 'column2' => 'value2'}` @@ -149,6 +155,14 @@ def receive(event) time = point.delete("time") end end + + if @allow_measurement_override + unless point.has_key?(@measurement_from_field) + logger.error("Cannot override measurement, field specified does not exist. Using default.") + else + @measurement = point[@measurement_from_field] + end + end exclude_fields!(point) coerce_values!(point) From 0e6dd50c9ae664dfdb44291ce06b89e0cd738bb3 Mon Sep 17 00:00:00 2001 From: Steve Anthony Date: Fri, 11 Dec 2015 17:11:33 -0500 Subject: [PATCH 2/3] Remove measurement from field when sending as measurement. Added prefer_tags and send_as_fields to allow default send logstash fields as InfluxDB tags, unless in send_as_fields. --- lib/logstash/outputs/influxdb.rb | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index 14ef608..b4390f9 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -96,6 +96,17 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # request should be considered metadata and given as tags. config :send_as_tags, :validate => :array, :default => ["host"] + # An array containing the names of fields to send to Influxdb as fields instead + # of tags. Influxdb 0.9 convention is that values that do not change every + # request should be considered metadata and given as tags. Enable with the + # prefer_tags option. + config :send_as_fields, :validate => :array, :default => ["value"] + + # This setting sends fields as tags which may then be overridden per field + # using the send_as_fields setting. Influx 0.9 preference to tags indicates + # that more fields will be sent as tags than as fields. + config :prefer_tags, :validate => :boolean, :default => false + # This setting controls how many events will be buffered before sending a batch # of events. Note that these are only batched for the same measurement config :flush_size, :validate => :number, :default => 100 @@ -160,7 +171,7 @@ def receive(event) unless point.has_key?(@measurement_from_field) logger.error("Cannot override measurement, field specified does not exist. Using default.") else - @measurement = point[@measurement_from_field] + @measurement = point.delete(@measurement_from_field) end end @@ -300,13 +311,20 @@ def exclude_fields!(event_data) # Returns a tuple containing a hash of tags (as configured by send_as_tags) # and a hash of fields that exclude the tags. If fields contains a key # "tags" with an array, they will be moved to the tags hash (and each will be - # given a value of true) + # given a value of true). If prefer_tags is true, will instead move all values + # in fields to tags array except for values specificed in the send_as_fields + # array. # # Example: # # Given send_as_tags: ["bar"] # original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]} # tags, fields = extract_tags(original_fields) # # tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1} + # Example: + # # Given send_as_fields: ["bar"] and prefer_tags = true + # original_fields = {"foo" => 1, "bar" => 2, "foobar" => 3} + # tags, fields = extract_tags(original_fields) + # # tags: {"foo" => 1, "foobar" => 3} and fields: {"bar" => 2} def extract_tags(fields) remainder = fields.dup @@ -315,8 +333,12 @@ def extract_tags(fields) else {} end - - @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } + + if @prefer_tags + remainder.each_pair { |key,value| (tags[key] = remainder.delete(key)) unless @send_as_fields.include?(key) } + else + @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) } + end tags.delete_if { |key,value| value.nil? || value == "" } remainder.delete_if { |key,value| value.nil? || value == "" } From d7a774e5d020e6ed38102aafdf4624b7132925f2 Mon Sep 17 00:00:00 2001 From: Steve Anthony Date: Sat, 12 Dec 2015 17:18:18 -0500 Subject: [PATCH 3/3] Tests for sending logstash field as measurement and for sending prefer_tags and send_as_fields. --- spec/outputs/influxdb_spec.rb | 73 +++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/spec/outputs/influxdb_spec.rb b/spec/outputs/influxdb_spec.rb index 31d2c1b..86e1af7 100644 --- a/spec/outputs/influxdb_spec.rb +++ b/spec/outputs/influxdb_spec.rb @@ -115,6 +115,79 @@ end end + context "sending some fields as Influxdb fields" do + let(:config) do <<-CONFIG + input { + generator { + message => "foo=1 bar=2 baz=3 time=4" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + influxdb { + host => "localhost" + measurement => "my_series" + allow_time_override => true + prefer_tags => true + use_event_fields_for_data_points => true + exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"] + send_as_fields => ["foo"] + } + } + CONFIG + end + + let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='} + let(:expected_body) { 'my_series,bar=2,baz=3 foo="1" 4' } + + it "should send the specified fields as fields" do + expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body) + pipeline.run + end + end + + context "sending field as Influxdb measurement" do + let(:config) do <<-CONFIG + input { + generator { + message => "foo=1 bar=2 baz=3 time=4" + count => 1 + type => "generator" + } + } + + filter { + kv { } + } + + output { + influxdb { + host => "localhost" + allow_time_override => true + use_event_fields_for_data_points => true + exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"] + allow_measurement_override => true + measurement_from_field => "bar" + } + } + CONFIG + end + + let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='} + let(:expected_body) { 'bar baz="3",foo="1" 4' } + + it "should send the specified fields as fields" do + expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body) + pipeline.run + end + end + context "when fields data contains a list of tags" do let(:config) do <<-CONFIG input {