diff --git a/buffer.go b/buffer.go index 97c9e67..db448eb 100644 --- a/buffer.go +++ b/buffer.go @@ -155,8 +155,9 @@ func (b *buffer) len() int { return len(b.data) } -func (b *buffer) flush(w io.Writer, n int) { - _, _ = w.Write(b.data[:n]) +func (b *buffer) flush(w io.Writer, n int) error { + _, err := w.Write(b.data[:n]) n = copy(b.data, b.data[n:]) b.data = b.data[:n] + return err } diff --git a/datadog/client.go b/datadog/client.go index 221a15f..418fedb 100644 --- a/datadog/client.go +++ b/datadog/client.go @@ -125,6 +125,7 @@ func NewClientWith(config ClientConfig) *Client { c.bufferSize = newBufSize c.buffer.Serializer = &c.serializer + c.serializer.conn = w log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize) return c } diff --git a/datadog/client_test.go b/datadog/client_test.go index e96b41e..55258b3 100644 --- a/datadog/client_test.go +++ b/datadog/client_test.go @@ -89,6 +89,7 @@ func TestClientWithUseDistributions(t *testing.T) { addr, closer := startUDPListener(t, packets) defer closer.Close() + fmt.Println("addr", addr) client := NewClientWith(ClientConfig{ Address: addr, UseDistributions: true, @@ -109,14 +110,25 @@ func TestClientWithUseDistributions(t *testing.T) { client.Flush() expectedPacket1 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|d|#answer:42,hello:world\n" - assert.EqualValues(t, expectedPacket1, string(<-packets)) + select { + case packet := <-packets: + fmt.Println("receive packet", packet) + assert.EqualValues(t, expectedPacket1, string(packet)) + case <-time.After(2 * time.Second): + t.Fatal("no response after 2 seconds") + } client.useDistributions = false client.HandleMeasures(time.Time{}, testMeasure) client.Flush() expectedPacket2 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|h|#answer:42,hello:world\n" - assert.EqualValues(t, expectedPacket2, string(<-packets)) + select { + case packet := <-packets: + assert.EqualValues(t, expectedPacket2, string(packet)) + case <-time.After(2 * time.Second): + t.Fatal("no response after 2 seconds") + } if err := client.Close(); err != nil { t.Error(err) @@ -236,15 +248,19 @@ func isClosedNetworkConnectionErr(err error) bool { // startUDPListener starts a goroutine listening for UDP packets on 127.0.0.1 and an available port. // The address listened to is returned as `addr`. The payloads of packets received are copied to `packets`. func startUDPListener(t *testing.T, packets chan []byte) (addr string, closer io.Closer) { - conn, err := net.ListenPacket("udp", "127.0.0.1:0") // :0 chooses an available port + t.Helper() + conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) // :0 chooses an available port if err != nil { t.Fatal(err) } + fmt.Println("starting UDP goroutine") go func() { for { packetBytes := make([]byte, 1024) + fmt.Println("call conn.ReadFrom") n, _, err := conn.ReadFrom(packetBytes) + fmt.Println("read", n, err) if n > 0 { packets <- packetBytes[:n] } diff --git a/datadog/serializer.go b/datadog/serializer.go index 6ffa8fd..491d9fc 100644 --- a/datadog/serializer.go +++ b/datadog/serializer.go @@ -5,7 +5,6 @@ import ( "io" "log" "math" - "net" "strconv" "strings" "time" @@ -16,7 +15,7 @@ import ( // Datagram format: https://docs.datadoghq.com/developers/dogstatsd/datagram_shell type serializer struct { - conn net.Conn + conn io.WriteCloser bufferSize int filters map[string]struct{} distPrefixes []string diff --git a/datadog/udp.go b/datadog/udp.go index ff2d4cc..bd34cb5 100644 --- a/datadog/udp.go +++ b/datadog/udp.go @@ -1,6 +1,9 @@ package datadog -import "net" +import ( + "fmt" + "net" +) type udpWriter struct { conn net.Conn @@ -16,6 +19,7 @@ func newUDPWriter(addr string) (*udpWriter, error) { if err != nil { return nil, err } + fmt.Printf("udp conn: %#v\n", conn) return &udpWriter{conn: conn}, nil } @@ -26,6 +30,7 @@ func (w *udpWriter) Write(data []byte) (int, error) { } func (w *udpWriter) Close() error { + fmt.Println("call udpWriter.Close") return w.conn.Close() }