Skip to content

Commit

Permalink
streaming processors docs update (influxdata#7786)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Jul 6, 2020
1 parent 2b545bf commit 8350a6a
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions docs/PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,80 @@ func init() {
}
```

### Streaming Processors

Streaming processors are a new processor type available to you. They are
particularly useful to implement processor types that use background processes
or goroutines to process multiple metrics at the same time. Some examples of this
are the execd processor, which pipes metrics out to an external process over stdin
and reads them back over stdout, and the reverse_dns processor, which does reverse
dns lookups on IP addresses in fields. While both of these come with a speed cost,
it would be significantly worse if you had to process one metric completely from
start to finish before handling the next metric, and thus they benefit
significantly from a streaming-pipe approach.

Some differences from classic Processors:

* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface.
* Processors should call `processors.AddStreaming` in their `init` function to register
themselves. See below for a quick example.

### Streaming Processor Example

```go
package printer

// printer.go

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)

type Printer struct {
}

var sampleConfig = `
`

func (p *Printer) SampleConfig() string {
return sampleConfig
}

func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Init() error {
return nil
}

func (p *Printer) Start(acc telegraf.Accumulator) error {
}

func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
// print!
fmt.Println(metric.String())
// pass the metric downstream, or metric.Drop() it.
// Metric will be dropped if this function returns an error.
acc.AddMetric(metric)

return nil
}

func (p *Printer) Stop() error {
}

func init() {
processors.AddStreaming("printer", func() telegraf.StreamingProcessor {
return &Printer{}
})
}
```

[SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig
[CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle
[telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor
[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor

0 comments on commit 8350a6a

Please sign in to comment.