Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created working TCP based socket factory impl, but it seemingly breaks after the first flush #374

Open
akallu opened this issue Jul 28, 2021 · 2 comments

Comments

@akallu
Copy link

akallu commented Jul 28, 2021

Hello all!
I went ahead and created a socket factory using TCP as the underlying transport, and it for sure works- the gostatsd.Server is definitely flushing metrics that my app is emitting over 8125/tcp. However, after the first flush- I get seemingly thousands of the following log line (and TCP metrics seem to be broken as well, as the log line suggests):
b'time="2021-07-27T23:48:05Z" level=warning msg="Error reading from socket: EOF"'

For reference, here is the TCP based statsd.SocketFactory / net.PacketConn impl I came up with:

// NewTCPSocketFactory returns a SocketFactory which uses the TCP protocol underneath.
func NewTCPSocketFactory(addr string) statsd.SocketFactory {
	l, err := net.Listen("tcp", addr)
	return func() (net.PacketConn, error) {
		if err != nil {
			return nil, err
		}
		c, err := l.Accept()
		if err != nil {
			return nil, err
		}
		return &tcpPacketConnWrapper{c.(*net.TCPConn)}, nil
	}
}

type tcpPacketConnWrapper struct {
	*net.TCPConn
}

func (t *tcpPacketConnWrapper) ReadFrom(p []byte) (int, net.Addr, error) {
	n, err := t.Read(p)

	// Weird hack because a false-positive error gets logged inside atlassian's statsd server impl.
	// https://github.com/atlassian/gostatsd/blob/master/pkg/statsd/receiver.go#L167
	//
	// Note that `udpAddr` never ends up getting used anyways- so I could toss in random stuff into this UDPAddr and it
	// wouldn't matter. Figured because TCPAddr and UDPAddr are very similar, I would just fill in the IP, Port, and
	// Zone fields.
	tcpLocalAddr := t.LocalAddr().(*net.TCPAddr)
	udpAddr := &net.UDPAddr{
		IP:   tcpLocalAddr.IP,
		Port: tcpLocalAddr.Port,
		Zone: tcpLocalAddr.Zone,
	}

	return n, udpAddr, err
}

// WriteTo is not implemented, because this PacketConn wrapper is only to be used by the gostatsd.Server, which only
// reads from the PacketConn and will never invoke WriteTo.
func (t *tcpPacketConnWrapper) WriteTo(_ []byte, _ net.Addr) (int, error) {
	return -1, errors.New("unimplemented")
}
@tiedotguy
Copy link
Collaborator

tiedotguy commented Jul 28, 2021

The model UDP model doesn't translate directly to TCP as you've discovered. I suspect the errors you're seeing aren't due to flush, but rather your client is disconnecting - when that happens the TCP socket will report EOF.

All the SocketFactory invocations happen on startup, we create 1-N sockets and they all receive datagrams, to keep it simple let's just pretend N is 1.

The single UDP socket receives all data from all clients, and handles it appropriately. With your TCP implementation, however, when SocketFactory() is called, you call Accept() (which blocks). When you connect, Accept() returns, you return a tcpPacketConnWrapper, and the system is expecting all metrics to be coming through that.

The correct way to handle this would be something closer to a single factory, that looks like:

func NewTCPSocketFactory(addr string) statsd.SocketFactory {
  return func() (net.PacketConn, error) {
    l, err := net.Listen("tcp", addr)
    if err != nil {
      return nil, err
    }
    wrapper := &tcpPacketConnWrapper{}
    // Run the Accept loop in a goroutine
    go func() {
      for {
        c, err := l.Accept()
        if err != nil {
          time.Sleep(1 * time.Second) // some sort of backoff
          continue
        }
        go wrapper.readFromConnection(c)
      }
    }()
    return wrapper, nil
  }
}

Then the tcpPacketConnWrapper is responsible for:

  • Reading a chunk of data from each connection (we start a goroutine for each connection)
  • Finding a suitable place to chunk the data (the line ending, so we don't get metrics cut off mid chunk)
  • Pushing that data to an internal queue
  • Implementing ReadFrom to pull from that queue
  • Gracefully handling client disconnection

This is the basic outline, it would need to have things like graceful shutdown, backoff, internal metrics, etc.

@akallu
Copy link
Author

akallu commented Jul 28, 2021

Thank you so much for taking the time to explain this, I really appreciate it! In the future I'll try to contribute to this project to add first-class TCP support, since a lot of OSS can emit statsd metrics over TCP (i.e. Envoy).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants