-
Notifications
You must be signed in to change notification settings - Fork 11
FiltersProgrammable
The programmable filter is a facility to doing arbitrary in-flight manipulation of telemetry and logs, programmable by end-users. The programming environment available is lua 5.3. The full API is documented below.
The programmable filter configuration options are as follows:
-
script
:: the lua program to read into cernan -
forwards
:: the filters and/or sinks to forward into [default: []]
The value for script
must map to a file in the globally set
scripts-directory
. Read more about
that here.
The forward concept is described in detail here.
You may configure multiple programmable filters.
Let's say we want to write a filter to strip instance metadata from collectd metrics. Recall that collectd metric names look like so:
collectd.computer_name.interface-lo.if_errors.tx
and what we'd like to do is avoid emitting the 'computer_name' to our sinks. Enabling filters in configuration is much like enabling a sink or source.
scripts-directory = "examples/scripts/"
[sources]
[sources.graphite.primary]
enabled = true
port = 2004
forwards = ["filters.programmable.collectd_scrub"]
[filters]
[filters.programmable.collectd_scrub]
script = "collectd_scrub.lua"
forwards = ["sinks.console"]
[sinks]
[sinks.console]
bin_width = 1
Note that scripts-directory
must exist and contain the file
collectd_scrub.lua
. The scrub program:
count_per_tick = 0
function process_metric(pyld)
count_per_tick = count_per_tick + 1
local old_name = payload.metric_name(pyld, 1)
local collectd, rest = string.match(old_name, "^(collectd)[%.@][%w_]+(.*)")
if collectd ~= nil then
local new_name = string.format("%s%s", collectd, rest)
payload.set_metric_name(pyld, 1, new_name)
end
end
function process_log(pyld)
end
function tick(pyld)
payload.push_metric(pyld, "count_per_tick", count_per_tick)
payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick))
count_per_tick = 0
end
Cernan will pass every metric through the function process_metric
and this
program will inspect the name, extract the offending bit of metadata and reset
the name in the payload metric.
There's a little more than filtering going on here. Every flush-interval
a
filter's tick
will be called. collectd_scrub
is pushing a new metric called
count_per_tick
into the payload, as well as a new log line.
Payloads are indexed from 1. Negative offsets are also supported, as in Lua. This is a break from similar systems like heka that index from 0. Metrics and logs are indexed separately.
Filters are currently programmed in Lua. The API is split between manipulation
of telemetry and log lines. For historical reasons--to be resolved with
postmates/cernan#184--the functions that manipulate telemetry are named
metric_*
. Those that manipulate log lines are named log_*
.
Every filter script must have three basic functions. This is the most minimal script:
function process_metric(pyld)
end
function process_log(pyld)
end
function tick(pyld)
end
Each function is passed an opaque payload
structure. This structure, in
effect, is two arrays of log lines and telemetry structures. Each array can be
independently indexed. The filters API indexes from one. The process_metric
and process_log
functions are called when a telemetry or log line structure is
available for the filter. tick
is called once every flush-interval
. See
the configuration page in this wiki for more
information.
You can find example scripts
in
examples/scripts/
in
the root of this project.
The functions available for manipulation and querying follow.
This function removes a tag--if it exists--from the referenced log line structure. In the following function we remove the tag with key "bizz" from the log line at index 1 in the payload:
function process_log(pyld)
payload.log_remove_tag(pyld, 1, "bizz")
end
The old value will be returned if it existed, else nil.
This function adds a tag to the referenced log line structure. In the following function we insert the key/value pair "bizz"/"bazz" into the log line at index 1 in the payload:
function process_log(pyld)
payload.log_set_tag(pyld, 1, "bizz", "bazz")
end
The old value will be returned if it existed, else nil.
This function returns the value of a tag in the referenced log line structure. In the following function we query for the value of key "bizz".
function process_log(pyld)
payload.log_tag_value(pyld, 1, "bizz")
end
The value will be returned if it exists, else nil.
This function returns the name of the referenced telemetry structure. It always returns a string.
function process_metric(pyld)
print(payload.metric_name(pyld, 1))
end
This function performs a percentile query against the stored samples of the telemetry structure. A number is returned if there are values in the telemetry, else nil.
function process_metric(pyld)
print(payload.metric_query(pyld, 0.999, 1))
end
This function removes a tag--if it exists--from the referenced telemetry structure. In the following function we remove the tag with key "bizz" from the telemetry at index 1 in the payload:
function process_metric(pyld)
payload.metric_remove_tag(pyld, 1, "bizz")
end
The old value will be returned if it existed, else nil.
This function adds a tag to the referenced telemetry structure. In the following function we insert the key/value pair "bizz"/"bazz" into the log line at index 1 in the payload:
function process_metric(pyld)
payload.metric_set_tag(pyld, 1, "bizz", "bazz")
end
The old value will be returned if it existed, else nil.
This function returns the value of a tag in the referenced telemetry structure. In the following function we query for the value of key "bizz".
function process_metric(pyld)
payload.metric_tag_value(pyld, 1, "bizz")
end
The value will be returned if it exists, else nil.
This function performs a value query against the stored samples of the telemetry structure, obeying its AggregationMethod. A number is returned if there are values in the telemetry, else nil.
function process_metric(pyld)
print(payload.metric_value(pyld, 1))
end
This function appends a log line structure to the payload. This always succeeds and there is no return value.
count_per_tick = 0
function process_metric(pyld)
count_per_tick = count_per_tick + 1
end
function process_log(pyld)
count_per_tick = count_per_tick + 1
end
function tick(pyld)
payload.push_log(pyld, string.format("count_per_tick: %s", count_per_tick))
count_per_tick = 0
end
In the above we count the number of telemetry and log lines that come through this filter and periodically write a log line describing how many have come through.
This function appends a telemetry structure to the payload. This always succeeds and there is no return value.
count_per_tick = 0
function process_metric(pyld)
count_per_tick = count_per_tick + 1
end
function process_log(pyld)
count_per_tick = count_per_tick + 1
end
function tick(pyld)
payload.push_metric(pyld, "count_per_tick", count_per_tick)
count_per_tick = 0
end
In the above we count the number of telemetry and log lines that come through this filter and periodically write a log line describing how many have come through.
This function sets the name of the referenced telemetry structure. It always succeeds.
function process_metric(pyld)
print(payload.set_metric_name(pyld, 1, "new_metric_name"))
end
A Postmates Project
Tech Blog | Twitter @PostmatesDev | Jobs