forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fluentd.go
166 lines (133 loc) · 3.87 KB
/
fluentd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package fluentd
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
measurement = "fluentd"
description = "Read metrics exposed by fluentd in_monitor plugin"
sampleConfig = `
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
##
## Endpoint:
## - only one URI is allowed
## - https is not supported
endpoint = "http://localhost:24220/api/plugins.json"
## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
exclude = [
"monitor_agent",
"dummy",
]
`
)
// Fluentd - plugin main structure
type Fluentd struct {
Endpoint string
Exclude []string
client *http.Client
}
type endpointInfo struct {
Payload []pluginData `json:"plugins"`
}
type pluginData struct {
PluginID string `json:"plugin_id"`
PluginType string `json:"type"`
PluginCategory string `json:"plugin_category"`
RetryCount *float64 `json:"retry_count"`
BufferQueueLength *float64 `json:"buffer_queue_length"`
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
}
// parse JSON from fluentd Endpoint
// Parameters:
// data: unprocessed json received from endpoint
//
// Returns:
// pluginData: slice that contains parsed plugins
// error: error that may have occurred
func parse(data []byte) (datapointArray []pluginData, err error) {
var endpointData endpointInfo
if err = json.Unmarshal(data, &endpointData); err != nil {
err = fmt.Errorf("processing JSON structure")
return
}
datapointArray = append(datapointArray, endpointData.Payload...)
return
}
// Description - display description
func (h *Fluentd) Description() string { return description }
// SampleConfig - generate configuration
func (h *Fluentd) SampleConfig() string { return sampleConfig }
// Gather - Main code responsible for gathering, processing and creating metrics
func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
_, err := url.Parse(h.Endpoint)
if err != nil {
return fmt.Errorf("invalid URL \"%s\"", h.Endpoint)
}
if h.client == nil {
tr := &http.Transport{
ResponseHeaderTimeout: 3 * time.Second,
}
client := &http.Client{
Transport: tr,
Timeout: 4 * time.Second,
}
h.client = client
}
resp, err := h.client.Get(h.Endpoint)
if err != nil {
return fmt.Errorf("unable to perform HTTP client GET on \"%s\": %v", h.Endpoint, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("unable to read the HTTP body \"%s\": %v", string(body), err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http status ok not met")
}
dataPoints, err := parse(body)
if err != nil {
return fmt.Errorf("problem with parsing")
}
// Go through all plugins one by one
for _, p := range dataPoints {
skip := false
// Check if this specific type was excluded in configuration
for _, exclude := range h.Exclude {
if exclude == p.PluginType {
skip = true
}
}
// If not, create new metric and add it to Accumulator
if !skip {
tmpFields := make(map[string]interface{})
tmpTags := map[string]string{
"plugin_id": p.PluginID,
"plugin_category": p.PluginCategory,
"plugin_type": p.PluginType,
}
if p.BufferQueueLength != nil {
tmpFields["buffer_queue_length"] = *p.BufferQueueLength
}
if p.RetryCount != nil {
tmpFields["retry_count"] = *p.RetryCount
}
if p.BufferTotalQueuedSize != nil {
tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize
}
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {
acc.AddFields(measurement, tmpFields, tmpTags)
}
}
}
return nil
}
func init() {
inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} })
}