From 8350a6aa12d13a7b4bac9779438f6ca09c5b5178 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Mon, 6 Jul 2020 14:23:44 -0400 Subject: [PATCH] streaming processors docs update (#7786) --- docs/PROCESSORS.md | 74 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 6ea82fdae3309..47f29a4160652 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -64,6 +64,80 @@ func init() { } ``` +### Streaming Processors + +Streaming processors are a new processor type available to you. They are +particularly useful to implement processor types that use background processes +or goroutines to process multiple metrics at the same time. Some examples of this +are the execd processor, which pipes metrics out to an external process over stdin +and reads them back over stdout, and the reverse_dns processor, which does reverse +dns lookups on IP addresses in fields. While both of these come with a speed cost, +it would be significantly worse if you had to process one metric completely from +start to finish before handling the next metric, and thus they benefit +significantly from a streaming-pipe approach. + +Some differences from classic Processors: + +* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface. +* Processors should call `processors.AddStreaming` in their `init` function to register + themselves. See below for a quick example. + +### Streaming Processor Example + +```go +package printer + +// printer.go + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Init() error { + return nil +} + +func (p *Printer) Start(acc telegraf.Accumulator) error { +} + +func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + // print! + fmt.Println(metric.String()) + // pass the metric downstream, or metric.Drop() it. + // Metric will be dropped if this function returns an error. + acc.AddMetric(metric) + + return nil +} + +func (p *Printer) Stop() error { +} + +func init() { + processors.AddStreaming("printer", func() telegraf.StreamingProcessor { + return &Printer{} + }) +} +``` + [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig [CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle [telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor +[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor