From 00de2acee2c825498bee7fdfb76df8a0820e26de Mon Sep 17 00:00:00 2001 From: Vladislav Radin Date: Tue, 15 Oct 2024 14:32:38 +0300 Subject: [PATCH] commit --- go.mod | 1 + go.sum | 2 + plugins/inputs/all/mt_net_response_ex.go | 5 + .../mt_net_response_ex/mt_net_response_ex.go | 254 ++++++++++++++++++ plugins/inputs/mt_net_response_ex/sample.conf | 18 ++ 5 files changed, 280 insertions(+) create mode 100644 plugins/inputs/all/mt_net_response_ex.go create mode 100644 plugins/inputs/mt_net_response_ex/mt_net_response_ex.go create mode 100644 plugins/inputs/mt_net_response_ex/sample.conf diff --git a/go.mod b/go.mod index a2d0e226e9fc4..a18b8227baa2d 100644 --- a/go.mod +++ b/go.mod @@ -545,6 +545,7 @@ require ( github.com/grafana-tools/sdk v0.0.0-00010101000000-000000000000 github.com/jinzhu/copier v0.4.0 github.com/lxc/incus/v6 v6.4.0 + github.com/shirou/gopsutil v3.21.11+incompatible ) replace github.com/grafana-tools/sdk => github.com/devopsext/sdk v0.9.6 diff --git a/go.sum b/go.sum index d04081a7819cc..7d9d8896d372a 100644 --- a/go.sum +++ b/go.sum @@ -2282,6 +2282,8 @@ github.com/seancfoley/ipaddress-go v1.6.0/go.mod h1:TQRZgv+9jdvzHmKoPGBMxyiaVmoI github.com/sensu/sensu-go/api/core/v2 v2.16.0 h1:HOq4rFkQ1S5ZjxmMTLc5J5mAbECrnKWvtXXbMqr3j9s= github.com/sensu/sensu-go/api/core/v2 v2.16.0/go.mod h1:MjM7+MCGEyTAgaZ589SiGHwYiaYF7N/58dU0J070u/0= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil/v3 v3.24.4 h1:dEHgzZXt4LMNm+oYELpzl9YCqV65Yr/6SfrvgRBtXeU= github.com/shirou/gopsutil/v3 v3.24.4/go.mod h1:lTd2mdiOspcqLgAnr9/nGi71NkeMpWKdmhuxm9GusH8= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= diff --git a/plugins/inputs/all/mt_net_response_ex.go b/plugins/inputs/all/mt_net_response_ex.go new file mode 100644 index 0000000000000..eb9c22e265a72 --- /dev/null +++ b/plugins/inputs/all/mt_net_response_ex.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.mt_net_response_ex + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/mt_net_response_ex" // register plugin diff --git a/plugins/inputs/mt_net_response_ex/mt_net_response_ex.go b/plugins/inputs/mt_net_response_ex/mt_net_response_ex.go new file mode 100644 index 0000000000000..df04a581ce912 --- /dev/null +++ b/plugins/inputs/mt_net_response_ex/mt_net_response_ex.go @@ -0,0 +1,254 @@ +//go:generate ../../../tools/readme_config_includer/generator +package net_response + +import ( + "bufio" + _ "embed" + "errors" + "net" + "net/textproto" + "regexp" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + + utils "github.com/shirou/gopsutil/net" +) + +//go:embed sample.conf +var sampleConfig string + +type ResultType uint64 + +const ( + Success ResultType = 0 + Timeout ResultType = 1 + ConnectionFailed ResultType = 2 + ReadFailed ResultType = 3 + StringMismatch ResultType = 4 +) + +// NetResponse struct +type MtNetResponse struct { + Addresses []string + Timeout config.Duration + ReadTimeout config.Duration + Send string + Expect string + Type string +} + +func (*MtNetResponse) SampleConfig() string { + return sampleConfig +} + +// DCGather will execute if there are DC type defined in the configuration. +func (m *MtNetResponse) DCGather() (map[string]string, map[string]interface{}, error) { + // Prepare returns + tags := make(map[string]string) + fields := make(map[string]interface{}) + // Get TCP connections + connections, err := utils.Connections("tcp") + if err != nil { + return nil, nil, err + } + // Check if there are active connections with the IP from the DC list + for _, ip := range m.Addresses { + for _, conn := range connections { + // Prepare host and port + host, port, err := net.SplitHostPort(ip) + if err != nil { + return nil, nil, err + } + if conn.Status == "ESTABLISHED" && strings.Contains(conn.Raddr.IP, host) { + // If an active connection is found, perform a connection test + + // Start timer + start := time.Now() + + conn, err := net.DialTimeout("tcp", ip, time.Duration(m.Timeout)) + + // Handle error + if err != nil { + var e net.Error + if errors.As(err, &e) && e.Timeout() { + setResult(Timeout, tags) + } else { + setResult(ConnectionFailed, tags) + } + return tags, fields, nil + } + + defer conn.Close() + + // Send data + _, err = conn.Write([]byte(m.Send)) + if err != nil { + return nil, nil, err + } + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data, err := tp.ReadLine() + + // Stop timer + responseTime := time.Since(start).Seconds() + + // Handle error + if err != nil { + setResult(ReadFailed, tags) + } else { + // Looking for string in answer + regEx := regexp.MustCompile(m.Expect) + find := regEx.FindString(data) + if find != "" { + setResult(Success, tags) + } else { + setResult(StringMismatch, tags) + } + } + fields["response_time"] = responseTime + tags["protocol"] = "tcp" + tags["server"] = host + tags["port"] = port + return tags, fields, nil + } + } + } + return nil, nil, err +} + +func (m *MtNetResponse) ACGather() (map[string]string, map[string]interface{}, error) { + // Prepare returns + tags := make(map[string]string) + fields := make(map[string]interface{}) + // Get TCP connections + connections, err := utils.Connections("tcp") + if err != nil { + return nil, nil, err + } + // Check if there are active connections with the IP from the DC list + for _, ip := range m.Addresses { + for _, conn := range connections { + // Prepare host and port + host, port, err := net.SplitHostPort(ip) + if err != nil { + return nil, nil, err + } + if conn.Status == "ESTABLISHED" && strings.Contains(conn.Raddr.IP, host) { + // If an active connection is found, perform a connection test + + //Start timer + start := time.Now() + conn, err := net.DialTimeout("tcp", ip, time.Duration(m.Timeout)) + // Stop timer + responseTime := time.Since(start).Seconds() + // Handle error + if err != nil { + var e net.Error + if errors.As(err, &e) && e.Timeout() { + setResult(Timeout, tags) + } else { + setResult(ConnectionFailed, tags) + } + return tags, fields, nil + } + defer conn.Close() + setResult(Success, tags) + fields["response_time"] = responseTime + tags["protocol"] = "tcp" + tags["server"] = host + tags["port"] = port + return tags, fields, nil + } + } + } + return nil, nil, err +} + +// Init performs one time setup of the plugin and returns an error if the +// configuration is invalid. +func (m *MtNetResponse) Init() error { + // Set default values + if m.Timeout == 0 { + m.Timeout = config.Duration(time.Second) + } + if m.ReadTimeout == 0 { + m.ReadTimeout = config.Duration(time.Second) + } + + if m.Dc != nil { + if m.Send == "" { + return errors.New("send string cannot be empty") + } + if m.Expect == "" { + return errors.New("expected string cannot be empty") + } + } + if m.Dc == nil && m.Ac == nil { + return errors.New("dc and ac cannot be empty") + } + if m.Type == "" { + return errors.New("type cannot be empty") + } + return nil +} + +// Gather is called by telegraf when the plugin is executed on its interval. +// It will call either UDPGather or TCPGather based on the configuration and +// also fill an Accumulator that is supplied. +func (m *MtNetResponse) Gather(acc telegraf.Accumulator) error { + // Prepare data + tags := map[string]string{} + var fields map[string]interface{} + var returnTags map[string]string + var err error + // Gather data + switch m.Type { + case "ac": + returnTags, fields, err = m.ACGather() + if err != nil { + return err + } + tags["type"] = "ac" + case "dc": + returnTags, fields, err = m.DCGather() + if err != nil { + return err + } + tags["type"] = "dc" + } + // Merge the tags + for k, v := range returnTags { + tags[k] = v + } + // Add metrics + acc.AddFields("net_response", fields, tags) + return nil +} + +func setResult(result ResultType, tags map[string]string) { + var tag string + switch result { + case Success: + tag = "success" + case Timeout: + tag = "timeout" + case ConnectionFailed: + tag = "connection_failed" + case ReadFailed: + tag = "read_failed" + case StringMismatch: + tag = "string_mismatch" + } + + tags["result"] = tag +} + +func init() { + inputs.Add("mt_net_response_ex", func() telegraf.Input { + return &MtNetResponse{} + }) +} diff --git a/plugins/inputs/mt_net_response_ex/sample.conf b/plugins/inputs/mt_net_response_ex/sample.conf new file mode 100644 index 0000000000000..54f7b302c7022 --- /dev/null +++ b/plugins/inputs/mt_net_response_ex/sample.conf @@ -0,0 +1,18 @@ +# Collect response time of a TCP or UDP connection +[[inputs.mt_net_response_ex]] + ## Server type, must be "dc" or "ac" + type = "dc" + ## Server address (default localhost) + addresses = ["localhost:80"] + + ## Set timeout + # timeout = "1s" + + ## Set read timeout (only used if expecting a response) + # read_timeout = "1s" + + ## string sent to the server + # send = "ssh" + ## expected string in answer + # expect = "ssh" +