Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added allow_measurement_override and measurement_from_field flags #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions lib/logstash/outputs/influxdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base

# Measurement name - supports sprintf formatting
config :measurement, :validate => :string, :default => "logstash"

# Allow measurement from field - allow override of measurement name
config :allow_measurement_override, :validate => :boolean, :default => false

# Measurement from field - use measurement from field name
config :measurement_from_field, :validate => :string, :default => "measurement"

# Hash of key/value pairs representing data points to send to the named database
# Example: `{'column1' => 'value1', 'column2' => 'value2'}`
Expand Down Expand Up @@ -90,6 +96,17 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
# request should be considered metadata and given as tags.
config :send_as_tags, :validate => :array, :default => ["host"]

# An array containing the names of fields to send to Influxdb as fields instead
# of tags. Influxdb 0.9 convention is that values that do not change every
# request should be considered metadata and given as tags. Enable with the
# prefer_tags option.
config :send_as_fields, :validate => :array, :default => ["value"]

# This setting sends fields as tags which may then be overridden per field
# using the send_as_fields setting. Influx 0.9 preference to tags indicates
# that more fields will be sent as tags than as fields.
config :prefer_tags, :validate => :boolean, :default => false

# This setting controls how many events will be buffered before sending a batch
# of events. Note that these are only batched for the same measurement
config :flush_size, :validate => :number, :default => 100
Expand Down Expand Up @@ -149,6 +166,14 @@ def receive(event)
time = point.delete("time")
end
end

if @allow_measurement_override
unless point.has_key?(@measurement_from_field)
logger.error("Cannot override measurement, field specified does not exist. Using default.")
else
@measurement = point.delete(@measurement_from_field)
end
end

exclude_fields!(point)
coerce_values!(point)
Expand Down Expand Up @@ -286,13 +311,20 @@ def exclude_fields!(event_data)
# Returns a tuple containing a hash of tags (as configured by send_as_tags)
# and a hash of fields that exclude the tags. If fields contains a key
# "tags" with an array, they will be moved to the tags hash (and each will be
# given a value of true)
# given a value of true). If prefer_tags is true, will instead move all values
# in fields to tags array except for values specificed in the send_as_fields
# array.
#
# Example:
# # Given send_as_tags: ["bar"]
# original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]}
# tags, fields = extract_tags(original_fields)
# # tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1}
# Example:
# # Given send_as_fields: ["bar"] and prefer_tags = true
# original_fields = {"foo" => 1, "bar" => 2, "foobar" => 3}
# tags, fields = extract_tags(original_fields)
# # tags: {"foo" => 1, "foobar" => 3} and fields: {"bar" => 2}
def extract_tags(fields)
remainder = fields.dup

Expand All @@ -301,8 +333,12 @@ def extract_tags(fields)
else
{}
end

@send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }

if @prefer_tags
remainder.each_pair { |key,value| (tags[key] = remainder.delete(key)) unless @send_as_fields.include?(key) }
else
@send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }
end

tags.delete_if { |key,value| value.nil? || value == "" }
remainder.delete_if { |key,value| value.nil? || value == "" }
Expand Down
73 changes: 73 additions & 0 deletions spec/outputs/influxdb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,79 @@
end
end

context "sending some fields as Influxdb fields" do
let(:config) do <<-CONFIG
input {
generator {
message => "foo=1 bar=2 baz=3 time=4"
count => 1
type => "generator"
}
}

filter {
kv { }
}

output {
influxdb {
host => "localhost"
measurement => "my_series"
allow_time_override => true
prefer_tags => true
use_event_fields_for_data_points => true
exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"]
send_as_fields => ["foo"]
}
}
CONFIG
end

let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='}
let(:expected_body) { 'my_series,bar=2,baz=3 foo="1" 4' }

it "should send the specified fields as fields" do
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body)
pipeline.run
end
end

context "sending field as Influxdb measurement" do
let(:config) do <<-CONFIG
input {
generator {
message => "foo=1 bar=2 baz=3 time=4"
count => 1
type => "generator"
}
}

filter {
kv { }
}

output {
influxdb {
host => "localhost"
allow_time_override => true
use_event_fields_for_data_points => true
exclude_fields => ["@version", "@timestamp", "sequence", "message", "type", "host"]
allow_measurement_override => true
measurement_from_field => "bar"
}
}
CONFIG
end

let(:expected_url) { 'http://localhost:8086/write?db=statistics&rp=default&precision=ms&u=&p='}
let(:expected_body) { 'bar baz="3",foo="1" 4' }

it "should send the specified fields as fields" do
expect_any_instance_of(Manticore::Client).to receive(:post!).with(expected_url, body: expected_body)
pipeline.run
end
end

context "when fields data contains a list of tags" do
let(:config) do <<-CONFIG
input {
Expand Down