Skip to content

Commit

Permalink
chore(go.d/pkg/socket): add err to callback return values (netdata#19103
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ilyam8 authored Nov 29, 2024
1 parent 28ee226 commit 61fb6d7
Show file tree
Hide file tree
Showing 20 changed files with 477 additions and 476 deletions.
40 changes: 16 additions & 24 deletions src/go/plugin/go.d/collector/beanstalk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ func newBeanstalkConn(conf Config, log *logger.Logger) beanstalkConn {
return &beanstalkClient{
Logger: log,
client: socket.New(socket.Config{
Address: conf.Address,
Timeout: conf.Timeout.Duration(),
TLSConf: nil,
Address: conf.Address,
Timeout: conf.Timeout.Duration(),
MaxReadLines: 2000,
TLSConf: nil,
}),
}
}
Expand Down Expand Up @@ -180,43 +181,34 @@ func (c *beanstalkClient) queryStatsTube(tubeName string) (*tubeStats, error) {
}

func (c *beanstalkClient) query(command string) (string, []byte, error) {
var resp string
var length int
var body []byte
var err error

c.Debugf("executing command: %s", command)

const limitReadLines = 1000
var num int
var (
resp string
body []byte
length int
err error
)

clientErr := c.client.Command(command+"\r\n", func(line []byte) bool {
if err := c.client.Command(command+"\r\n", func(line []byte) (bool, error) {
if resp == "" {
s := string(line)
c.Debugf("command '%s' response: '%s'", command, s)

resp, length, err = parseResponseLine(s)
if err != nil {
err = fmt.Errorf("command '%s' line '%s': %v", command, s, err)
return false, fmt.Errorf("command '%s' line '%s': %v", command, s, err)
}
return err == nil && resp == "OK"
}

if num++; num >= limitReadLines {
err = fmt.Errorf("command '%s': read line limit exceeded (%d)", command, limitReadLines)
return false
return resp == "OK", nil
}

body = append(body, line...)
body = append(body, '\n')

return len(body) < length
})
if clientErr != nil {
return "", nil, fmt.Errorf("command '%s' client error: %v", command, clientErr)
}
if err != nil {
return "", nil, err
return len(body) < length, nil
}); err != nil {
return "", nil, fmt.Errorf("command '%s': %v", command, err)
}

return resp, body, nil
Expand Down
15 changes: 5 additions & 10 deletions src/go/plugin/go.d/collector/boinc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,20 @@ func (c *boincClient) send(req *boincRequest) (*boincReply, error) {

var b bytes.Buffer

clientErr := c.conn.Command(string(reqData), func(bs []byte) bool {
if err := c.conn.Command(string(reqData), func(bs []byte) (bool, error) {
s := strings.TrimSpace(string(bs))
if s == "" {
return true
return true, nil
}

if b.Len() == 0 && s != respStart {
err = fmt.Errorf("unexpected response first line: %s", s)
return false
return false, fmt.Errorf("unexpected response first line: %s", s)
}

b.WriteString(s)

return s != respEnd
})
if clientErr != nil {
return nil, fmt.Errorf("failed to send command: %v", clientErr)
}
if err != nil {
return s != respEnd, nil
}); err != nil {
return nil, fmt.Errorf("failed to send command: %v", err)
}

Expand Down
7 changes: 3 additions & 4 deletions src/go/plugin/go.d/collector/dovecot/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ func (c *dovecotClient) queryExportGlobal() ([]byte, error) {
var b bytes.Buffer
var n int

err := c.conn.Command("EXPORT\tglobal\n", func(bs []byte) bool {
if err := c.conn.Command("EXPORT\tglobal\n", func(bs []byte) (bool, error) {
b.Write(bs)
b.WriteByte('\n')

n++
return n < 2
})
if err != nil {
return n < 2, nil
}); err != nil {
return nil, err
}

Expand Down
25 changes: 7 additions & 18 deletions src/go/plugin/go.d/collector/gearman/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type gearmanConn interface {

func newGearmanConn(conf Config) gearmanConn {
return &gearmanClient{conn: socket.New(socket.Config{
Address: conf.Address,
Timeout: conf.Timeout.Duration(),
Address: conf.Address,
Timeout: conf.Timeout.Duration(),
MaxReadLines: 10000,
})}
}

Expand All @@ -45,32 +46,20 @@ func (c *gearmanClient) queryPriorityStatus() ([]byte, error) {
}

func (c *gearmanClient) query(cmd string) ([]byte, error) {
const limitReadLines = 10000
var num int
var err error
var b bytes.Buffer

clientErr := c.conn.Command(cmd+"\n", func(bs []byte) bool {
if err := c.conn.Command(cmd+"\n", func(bs []byte) (bool, error) {
s := string(bs)

if strings.HasPrefix(s, "ERR") {
err = fmt.Errorf("command '%s': %s", cmd, s)
return false
return false, fmt.Errorf("command '%s': %s", cmd, s)
}

b.WriteString(s)
b.WriteByte('\n')

if num++; num >= limitReadLines {
err = fmt.Errorf("command '%s': read line limit exceeded (%d)", cmd, limitReadLines)
return false
}
return !strings.HasPrefix(s, ".")
})
if clientErr != nil {
return nil, fmt.Errorf("command '%s' client error: %v", cmd, clientErr)
}
if err != nil {
return !strings.HasPrefix(s, "."), nil
}); err != nil {
return nil, err
}

Expand Down
12 changes: 3 additions & 9 deletions src/go/plugin/go.d/collector/hddtemp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,15 @@ type hddtempClient struct {
}

func (c *hddtempClient) queryHddTemp() (string, error) {
var i int
var s string

cfg := socket.Config{
Address: c.address,
Timeout: c.timeout,
}

err := socket.ConnectAndRead(cfg, func(bs []byte) bool {
if i++; i > 1 {
return false
}
var s string
err := socket.ConnectAndRead(cfg, func(bs []byte) (bool, error) {
s = string(bs)
return true

return false, nil
})
if err != nil {
return "", err
Expand Down
8 changes: 4 additions & 4 deletions src/go/plugin/go.d/collector/memcached/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func (c *memcachedClient) disconnect() {

func (c *memcachedClient) queryStats() ([]byte, error) {
var b bytes.Buffer
err := c.conn.Command("stats\r\n", func(bytes []byte) bool {
if err := c.conn.Command("stats\r\n", func(bytes []byte) (bool, error) {
s := strings.TrimSpace(string(bytes))
b.WriteString(s)
b.WriteByte('\n')
return !(strings.HasPrefix(s, "END") || strings.HasPrefix(s, "ERROR"))
})
if err != nil {

return !(strings.HasPrefix(s, "END") || strings.HasPrefix(s, "ERROR")), nil
}); err != nil {
return nil, err
}
return b.Bytes(), nil
Expand Down
17 changes: 7 additions & 10 deletions src/go/plugin/go.d/collector/openvpn/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,26 @@ func (c *Client) Version() (*Version, error) {

func (c *Client) get(command string, stopRead stopReadFunc) (output []string, err error) {
var num int
var maxLinesErr error
err = c.Command(command, func(bytes []byte) bool {
if err := c.Command(command, func(bytes []byte) (bool, error) {
line := string(bytes)
num++
if num > maxLinesToRead {
maxLinesErr = fmt.Errorf("read line limit exceeded (%d)", maxLinesToRead)
return false
return false, fmt.Errorf("read line limit exceeded (%d)", maxLinesToRead)
}

// skip real-time messages
if strings.HasPrefix(line, ">") {
return true
return true, nil
}

line = strings.Trim(line, "\r\n ")
output = append(output, line)
if stopRead != nil && stopRead(line) {
return false
return false, nil
}
return true
})
if maxLinesErr != nil {
return nil, maxLinesErr
return true, nil
}); err != nil {
return nil, err
}
return output, err
}
Expand Down
4 changes: 3 additions & 1 deletion src/go/plugin/go.d/collector/openvpn/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (m *mockSocketClient) Command(command string, process socket.Processor) err
}

for s.Scan() {
process(s.Bytes())
if _, err := process(s.Bytes()); err != nil {
return err
}
}
return nil
}
22 changes: 8 additions & 14 deletions src/go/plugin/go.d/collector/tor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (c *torControlClient) authenticate() error {
}

var s string
err := c.conn.Command(cmd+"\n", func(bs []byte) bool {
err := c.conn.Command(cmd+"\n", func(bs []byte) (bool, error) {
s = string(bs)
return false
return false, nil
})
if err != nil {
return fmt.Errorf("authentication failed: %v", err)
Expand All @@ -74,7 +74,7 @@ func (c *torControlClient) authenticate() error {
func (c *torControlClient) disconnect() {
// https://spec.torproject.org/control-spec/commands.html#quit

_ = c.conn.Command(cmdQuit+"\n", func(bs []byte) bool { return false })
_ = c.conn.Command(cmdQuit+"\n", func(bs []byte) (bool, error) { return false, nil })
_ = c.conn.Disconnect()
}

Expand All @@ -87,27 +87,21 @@ func (c *torControlClient) getInfo(keywords ...string) ([]byte, error) {
cmd := fmt.Sprintf("%s %s", cmdGetInfo, strings.Join(keywords, " "))

var buf bytes.Buffer
var err error

clientErr := c.conn.Command(cmd+"\n", func(bs []byte) bool {
if err := c.conn.Command(cmd+"\n", func(bs []byte) (bool, error) {
s := string(bs)

switch {
case strings.HasPrefix(s, "250-"):
buf.WriteString(strings.TrimPrefix(s, "250-"))
buf.WriteByte('\n')
return true
return true, nil
case strings.HasPrefix(s, "250 "):
return false
return false, nil
default:
err = errors.New(s)
return false
return false, errors.New(s)
}
})
if clientErr != nil {
return nil, fmt.Errorf("command '%s' failed: %v", cmd, clientErr)
}
if err != nil {
}); err != nil {
return nil, fmt.Errorf("command '%s' failed: %v", cmd, err)
}

Expand Down
4 changes: 2 additions & 2 deletions src/go/plugin/go.d/collector/unbound/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func (c *Collector) scrapeUnboundStats() ([]entry, error) {
}
defer func() { _ = c.client.Disconnect() }()

err := c.client.Command(command+"\n", func(bytes []byte) bool {
err := c.client.Command(command+"\n", func(bytes []byte) (bool, error) {
output = append(output, string(bytes))
return true
return true, nil
})
if err != nil {
return nil, fmt.Errorf("send command '%s': %w", command, err)
Expand Down
4 changes: 2 additions & 2 deletions src/go/plugin/go.d/collector/upsd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func (c *upsdClient) sendCommand(cmd string) ([]string, error) {
var errMsg string
endLine := getEndLine(cmd)

err := c.conn.Command(cmd+"\n", func(bytes []byte) bool {
err := c.conn.Command(cmd+"\n", func(bytes []byte) (bool, error) {
line := string(bytes)
resp = append(resp, line)

if strings.HasPrefix(line, "ERR ") {
errMsg = strings.TrimPrefix(line, "ERR ")
}

return line != endLine && errMsg == ""
return line != endLine && errMsg == "", nil
})
if err != nil {
return nil, err
Expand Down
24 changes: 6 additions & 18 deletions src/go/plugin/go.d/collector/uwsgi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package uwsgi

import (
"bytes"
"fmt"
"time"

"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/socket"
Expand All @@ -28,30 +27,19 @@ type uwsgiClient struct {

func (c *uwsgiClient) queryStats() ([]byte, error) {
var b bytes.Buffer
var n int64
var err error
const readLineLimit = 1000 * 10

cfg := socket.Config{
Address: c.address,
Timeout: c.timeout,
Address: c.address,
Timeout: c.timeout,
MaxReadLines: 1000 * 10,
}

clientErr := socket.ConnectAndRead(cfg, func(bs []byte) bool {
if err := socket.ConnectAndRead(cfg, func(bs []byte) (bool, error) {
b.Write(bs)
b.WriteByte('\n')

if n++; n >= readLineLimit {
err = fmt.Errorf("read line limit exceeded %d", readLineLimit)
return false
}
// The server will close the connection when it has finished sending data.
return true
})
if clientErr != nil {
return nil, clientErr
}
if err != nil {
return true, nil
}); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 61fb6d7

Please sign in to comment.