forked from netdata/netdata
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(go.d): add NATS collector (netdata#19252)
add go.d/nats
- Loading branch information
Showing
17 changed files
with
1,302 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
// SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
package nats | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" | ||
) | ||
|
||
const ( | ||
prioServerTraffic = module.Priority + iota | ||
prioServerMessages | ||
prioServerConnectionsCurrent | ||
prioServerConnectionsRate | ||
prioHttpEndpointRequests | ||
prioServerHealthProbeStatus | ||
prioServerCpuUsage | ||
prioServerMemoryUsage | ||
prioServerUptime | ||
) | ||
|
||
var serverCharts = func() module.Charts { | ||
charts := module.Charts{ | ||
chartServerConnectionsCurrent.Copy(), | ||
chartServerConnectionsRate.Copy(), | ||
chartServerTraffic.Copy(), | ||
chartServerMessages.Copy(), | ||
chartServerHealthProbeStatus.Copy(), | ||
chartServerCpuUsage.Copy(), | ||
chartServerMemUsage.Copy(), | ||
chartServerUptime.Copy(), | ||
} | ||
charts = append(charts, httpEndpointCharts()...) | ||
return charts | ||
}() | ||
|
||
var ( | ||
chartServerTraffic = module.Chart{ | ||
ID: "server_traffic", | ||
Title: "Server Traffic", | ||
Units: "bytes/s", | ||
Fam: "traffic", | ||
Ctx: "nats.server_traffic", | ||
Priority: prioServerTraffic, | ||
Type: module.Area, | ||
Dims: module.Dims{ | ||
{ID: "in_bytes", Name: "in", Algo: module.Incremental}, | ||
{ID: "out_bytes", Name: "out", Mul: -1, Algo: module.Incremental}, | ||
}, | ||
} | ||
chartServerMessages = module.Chart{ | ||
ID: "server_messages", | ||
Title: "Server Messages", | ||
Units: "messages/s", | ||
Fam: "traffic", | ||
Ctx: "nats.server_messages", | ||
Priority: prioServerMessages, | ||
Dims: module.Dims{ | ||
{ID: "in_msgs", Name: "in", Algo: module.Incremental}, | ||
{ID: "out_msgs", Name: "out", Mul: -1, Algo: module.Incremental}, | ||
}, | ||
} | ||
chartServerConnectionsCurrent = module.Chart{ | ||
ID: "server_connections_current", | ||
Title: "Server Current Connections", | ||
Units: "connections", | ||
Fam: "connections", | ||
Ctx: "nats.server_connections_current", | ||
Priority: prioServerConnectionsCurrent, | ||
Dims: module.Dims{ | ||
{ID: "connections", Name: "active"}, | ||
}, | ||
} | ||
chartServerConnectionsRate = module.Chart{ | ||
ID: "server_connections_rate", | ||
Title: "Server Connections", | ||
Units: "connections/s", | ||
Fam: "connections", | ||
Ctx: "nats.server_connections_rate", | ||
Priority: prioServerConnectionsRate, | ||
Dims: module.Dims{ | ||
{ID: "total_connections", Name: "connections", Algo: module.Incremental}, | ||
}, | ||
} | ||
chartServerHealthProbeStatus = module.Chart{ | ||
ID: "server_health_probe_status", | ||
Title: "Server Health Probe Status", | ||
Units: "status", | ||
Fam: "health", | ||
Ctx: "nats.server_health_probe_status", | ||
Priority: prioServerHealthProbeStatus, | ||
Dims: module.Dims{ | ||
{ID: "healthz_status_ok", Name: "ok"}, | ||
{ID: "healthz_status_error", Name: "error"}, | ||
}, | ||
} | ||
chartServerCpuUsage = module.Chart{ | ||
ID: "server_cpu_usage", | ||
Title: "Server CPU Usage", | ||
Units: "percent", | ||
Fam: "rusage", | ||
Ctx: "nats.server_cpu_usage", | ||
Priority: prioServerCpuUsage, | ||
Type: module.Area, | ||
Dims: module.Dims{ | ||
{ID: "cpu", Name: "used"}, | ||
}, | ||
} | ||
chartServerMemUsage = module.Chart{ | ||
ID: "server_mem_usage", | ||
Title: "Server Memory Usage", | ||
Units: "bytes", | ||
Fam: "rusage", | ||
Ctx: "nats.server_mem_usage", | ||
Priority: prioServerMemoryUsage, | ||
Type: module.Area, | ||
Dims: module.Dims{ | ||
{ID: "mem", Name: "used"}, | ||
}, | ||
} | ||
chartServerUptime = module.Chart{ | ||
ID: "server_uptime", | ||
Title: "Server Uptime", | ||
Units: "seconds", | ||
Fam: "uptime", | ||
Ctx: "nats.server_uptime", | ||
Priority: prioServerUptime, | ||
Dims: module.Dims{ | ||
{ID: "uptime", Name: "uptime"}, | ||
}, | ||
} | ||
) | ||
|
||
func httpEndpointCharts() module.Charts { | ||
var charts module.Charts | ||
for _, path := range httpEndpoints { | ||
chart := httpEndpointRequestsChartTmpl.Copy() | ||
chart.ID = fmt.Sprintf(chart.ID, path) | ||
chart.Labels = []module.Label{ | ||
{Key: "http_endpoint", Value: path}, | ||
} | ||
for _, dim := range chart.Dims { | ||
dim.ID = fmt.Sprintf(dim.ID, path) | ||
} | ||
charts = append(charts, chart) | ||
} | ||
return charts | ||
} | ||
|
||
var httpEndpointRequestsChartTmpl = module.Chart{ | ||
ID: "http_endpoint_%s_requests", | ||
Title: "HTTP Endpoint Requests", | ||
Units: "requests/s", | ||
Fam: "http requests", | ||
Ctx: "nats.http_endpoint_requests", | ||
Priority: prioHttpEndpointRequests, | ||
Dims: module.Dims{ | ||
{ID: "http_endpoint_%s_req", Name: "requests", Algo: module.Incremental}, | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
package nats | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
|
||
"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/metrix" | ||
"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" | ||
) | ||
|
||
const ( | ||
urlPathVarz = "/varz" | ||
urlPathHealthz = "/healthz" | ||
) | ||
|
||
func (c *Collector) collect() (map[string]int64, error) { | ||
mx := make(map[string]int64) | ||
|
||
if err := c.collectVarz(mx); err != nil { | ||
return nil, err | ||
} | ||
if err := c.collectHealthz(mx); err != nil { | ||
return nil, err | ||
} | ||
|
||
return mx, nil | ||
} | ||
|
||
func (c *Collector) collectVarz(mx map[string]int64) error { | ||
// https://docs.nats.io/running-a-nats-service/nats_admin/monitoring#general-information | ||
req, err := web.NewHTTPRequestWithPath(c.RequestConfig, urlPathVarz) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var resp varzResponse | ||
if err := web.DoHTTP(c.httpClient).RequestJSON(req, &resp); err != nil { | ||
return err | ||
} | ||
|
||
mx["uptime"] = int64(resp.Now.Sub(resp.Start).Seconds()) | ||
mx["in_msgs"] = resp.InMsgs | ||
mx["out_msgs"] = resp.OutMsgs | ||
mx["in_bytes"] = resp.InBytes | ||
mx["out_bytes"] = resp.OutBytes | ||
mx["slow_consumers"] = resp.SlowConsumers | ||
mx["subscriptions"] = int64(resp.Subscriptions) | ||
mx["connections"] = int64(resp.Connections) | ||
mx["total_connections"] = int64(resp.TotalConnections) | ||
mx["routes"] = int64(resp.Routes) | ||
mx["remotes"] = int64(resp.Remotes) | ||
mx["cpu"] = int64(resp.CPU) | ||
mx["mem"] = resp.Mem | ||
|
||
for _, path := range httpEndpoints { | ||
v := resp.HTTPReqStats[path] | ||
mx[fmt.Sprintf("http_endpoint_%s_req", path)] = int64(v) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *Collector) collectHealthz(mx map[string]int64) error { | ||
// https://docs.nats.io/running-a-nats-service/nats_admin/monitoring#health | ||
req, err := web.NewHTTPRequestWithPath(c.RequestConfig, urlPathHealthz) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var resp healthzResponse | ||
client := web.DoHTTP(c.httpClient).OnNokCode(func(resp *http.Response) (bool, error) { return true, nil }) | ||
if err := client.RequestJSON(req, &resp); err != nil { | ||
return err | ||
} | ||
if resp.Status == nil { | ||
return fmt.Errorf("healthz response missing status") | ||
} | ||
|
||
mx["healthz_status_ok"] = metrix.Bool(*resp.Status == "ok") | ||
mx["healthz_status_error"] = metrix.Bool(*resp.Status != "ok") | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
package nats | ||
|
||
import ( | ||
"context" | ||
_ "embed" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" | ||
"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/confopt" | ||
"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" | ||
) | ||
|
||
//go:embed "config_schema.json" | ||
var configSchema string | ||
|
||
func init() { | ||
module.Register("nats", module.Creator{ | ||
Create: func() module.Module { return New() }, | ||
JobConfigSchema: configSchema, | ||
Config: func() any { return &Config{} }, | ||
}) | ||
} | ||
|
||
func New() *Collector { | ||
return &Collector{ | ||
Config: Config{ | ||
HTTPConfig: web.HTTPConfig{ | ||
RequestConfig: web.RequestConfig{ | ||
URL: "http://127.0.0.1:8222", | ||
}, | ||
ClientConfig: web.ClientConfig{ | ||
Timeout: confopt.Duration(time.Second), | ||
}, | ||
}, | ||
}, | ||
charts: serverCharts.Copy(), | ||
} | ||
} | ||
|
||
type Config struct { | ||
Vnode string `yaml:"vnode,omitempty" json:"vnode"` | ||
UpdateEvery int `yaml:"update_every,omitempty" json:"update_every"` | ||
web.HTTPConfig `yaml:",inline" json:""` | ||
} | ||
|
||
type Collector struct { | ||
module.Base | ||
Config `yaml:",inline" json:""` | ||
|
||
charts *module.Charts | ||
|
||
httpClient *http.Client | ||
} | ||
|
||
func (c *Collector) Configuration() any { | ||
return c.Config | ||
} | ||
|
||
func (c *Collector) Init(context.Context) error { | ||
if c.URL == "" { | ||
return errors.New("URL required but not set") | ||
} | ||
|
||
httpClient, err := web.NewHTTPClient(c.ClientConfig) | ||
if err != nil { | ||
return fmt.Errorf("init HTTP client: %v", err) | ||
} | ||
c.httpClient = httpClient | ||
|
||
c.Debugf("using URL %s", c.URL) | ||
c.Debugf("using timeout: %s", c.Timeout) | ||
|
||
return nil | ||
} | ||
|
||
func (c *Collector) Check(context.Context) error { | ||
mx, err := c.collect() | ||
if err != nil { | ||
return err | ||
} | ||
if len(mx) == 0 { | ||
return errors.New("no metrics collected") | ||
|
||
} | ||
return nil | ||
} | ||
|
||
func (c *Collector) Charts() *module.Charts { | ||
return c.charts | ||
} | ||
|
||
func (c *Collector) Collect(context.Context) map[string]int64 { | ||
mx, err := c.collect() | ||
if err != nil { | ||
c.Error(err) | ||
} | ||
|
||
if len(mx) == 0 { | ||
return nil | ||
} | ||
return mx | ||
} | ||
|
||
func (c *Collector) Cleanup(context.Context) { | ||
if c.httpClient != nil { | ||
c.httpClient.CloseIdleConnections() | ||
} | ||
} |
Oops, something went wrong.