Skip to content

Commit

Permalink
all: add Unix datagram support
Browse files Browse the repository at this point in the history
I tried updating #123 with ~50
commits but the diff got too messy. Easier to just add the changes on
a new commit.
  • Loading branch information
kevinburkesegment committed Oct 9, 2024
1 parent 05e804c commit 251d359
Show file tree
Hide file tree
Showing 28 changed files with 519 additions and 96 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ jobs:
- uses: actions/checkout@v4

- name: golangci-lint
uses: golangci/golangci-lint-action@v6.0.1
uses: golangci/golangci-lint-action@v6.1.0
with:
version: v1.59.1
version: v1.61.0
5 changes: 3 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (b *buffer) len() int {
return len(b.data)
}

func (b *buffer) flush(w io.Writer, n int) {
_, _ = w.Write(b.data[:n])
func (b *buffer) flush(w io.Writer, n int) error {
_, err := w.Write(b.data[:n])
n = copy(b.data, b.data[n:])
b.data = b.data[:n]
return err
}
77 changes: 59 additions & 18 deletions datadog/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package datadog

import (
"io"
"log"
"net"
"net/url"
"os"
"strings"
"time"

"github.com/segmentio/stats/v5"
Expand Down Expand Up @@ -40,6 +42,8 @@ var (
// The ClientConfig type is used to configure datadog clients.
type ClientConfig struct {
// Address of the datadog database to send metrics to.
// UDP: host:port (default)
// UDS: unixgram://dir/file.ext
Address string

// Maximum size of batch of events sent to datadog.
Expand Down Expand Up @@ -106,15 +110,23 @@ func NewClientWith(config ClientConfig) *Client {
},
}

conn, bufferSize, err := dial(config.Address, config.BufferSize)
w, err := newWriter(config.Address)
if err != nil {
log.Printf("stats/datadog: %s", err)
c.err = err
w = &noopWriter{}
}

c.conn, c.err, c.bufferSize = conn, err, bufferSize
c.buffer.BufferSize = bufferSize
newBufSize, err := w.CalcBufferSize(config.BufferSize)
if err != nil {
log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err)
newBufSize = DefaultBufferSize
}

c.bufferSize = newBufSize
c.buffer.Serializer = &c.serializer
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize)
c.serializer.conn = w
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize)
return c
}

Expand All @@ -140,18 +152,7 @@ func (c *Client) Close() error {
return c.err
}

func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) {
var f *os.File

if conn, err = net.Dial("udp", address); err != nil {
return
}

if f, err = conn.(*net.UDPConn).File(); err != nil {
conn.Close()
return
}
defer f.Close()
func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) {
fd := int(f.Fd())

// The kernel refuses to send UDP datagrams that are larger than the size of
Expand All @@ -160,7 +161,6 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
// to accept larger datagrams, or fallback to the default socket buffer size
// if it failed.
if bufsize, err = unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF); err != nil {
conn.Close()
return
}

Expand Down Expand Up @@ -198,3 +198,44 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
_ = unix.SetNonblock(fd, true)
return
}

type ddWriter interface {
io.WriteCloser
CalcBufferSize(desiredBufSize int) (int, error)
}

func newWriter(addr string) (ddWriter, error) {
if strings.HasPrefix(addr, "unixgram://") ||
strings.HasPrefix(addr, "udp://") {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
switch u.Scheme {
case "unixgram":
return newUDSWriter(u.Path)
case "udp":
return newUDPWriter(u.Path)
}
}
// default assume addr host:port to use UDP
return newUDPWriter(addr)
}

// noopWriter is a writer that does nothing.
type noopWriter struct{}

// Write writes nothing.
func (w *noopWriter) Write(_ []byte) (int, error) {
return 0, nil
}

// Close is a noop close.
func (w *noopWriter) Close() error {
return nil
}

// CalcBufferSize returns the sizehint.
func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) {
return sizehint, nil
}
75 changes: 71 additions & 4 deletions datadog/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ func TestClientWithDistributionPrefixes(t *testing.T) {
}
}

func TestClient_UDS(t *testing.T) {
client := NewClient("unixgram://do-not-exist")

for i := 0; i != 1000; i++ {
client.HandleMeasures(time.Time{}, stats.Measure{
Name: "request",
Fields: []stats.Field{
{Name: "count", Value: stats.ValueOf(5)},
{Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)},
},
Tags: []stats.Tag{
stats.T("answer", "42"),
stats.T("hello", "world"),
},
})
}

if err := client.Close(); err != nil {
t.Error(err)
}
}

func TestClientWithUseDistributions(t *testing.T) {
// Start a goroutine listening for packets and giving them back on packets chan
packets := make(chan []byte)
Expand All @@ -87,14 +109,24 @@ func TestClientWithUseDistributions(t *testing.T) {
client.Flush()

expectedPacket1 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|d|#answer:42,hello:world\n"
assert.EqualValues(t, expectedPacket1, string(<-packets))
select {
case packet := <-packets:
assert.EqualValues(t, expectedPacket1, string(packet))
case <-time.After(2 * time.Second):
t.Fatal("no response after 2 seconds")
}

client.useDistributions = false
client.HandleMeasures(time.Time{}, testMeasure)
client.Flush()

expectedPacket2 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|h|#answer:42,hello:world\n"
assert.EqualValues(t, expectedPacket2, string(<-packets))
select {
case packet := <-packets:
assert.EqualValues(t, expectedPacket2, string(packet))
case <-time.After(2 * time.Second):
t.Fatal("no response after 2 seconds")
}

if err := client.Close(); err != nil {
t.Error(err)
Expand All @@ -117,7 +149,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()
Expand All @@ -135,6 +167,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
}
}

func TestClientWriteLargeMetrics_UDS(t *testing.T) {
const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
`

count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startUDSTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()

client := NewClient("unixgram://" + addr)

if _, err := client.Write([]byte(data)); err != nil {
t.Error(err)
}

time.Sleep(100 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != expect {
t.Error("bad metric count:", n)
}
}

func BenchmarkClient(b *testing.B) {
log.SetOutput(io.Discard)

Expand Down Expand Up @@ -180,7 +246,8 @@ func isClosedNetworkConnectionErr(err error) bool {
// startUDPListener starts a goroutine listening for UDP packets on 127.0.0.1 and an available port.
// The address listened to is returned as `addr`. The payloads of packets received are copied to `packets`.
func startUDPListener(t *testing.T, packets chan []byte) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0") // :0 chooses an available port
t.Helper()
conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) // :0 chooses an available port
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions datadog/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"log"
"math"
"net"
"strconv"
"strings"
"time"
Expand All @@ -16,7 +15,7 @@ import (
// Datagram format: https://docs.datadoghq.com/developers/dogstatsd/datagram_shell

type serializer struct {
conn net.Conn
conn io.WriteCloser
bufferSize int
filters map[string]struct{}
distPrefixes []string
Expand Down
70 changes: 68 additions & 2 deletions datadog/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package datadog
import (
"io"
"net"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
Expand All @@ -21,7 +23,7 @@ func TestServer(t *testing.T) {
seenGauges := make([]Metric, 0)
var mu sync.Mutex

addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
switch m.Name {
case "datadog.test.A":
atomic.AddUint32(&a, uint32(m.Value))
Expand Down Expand Up @@ -94,7 +96,7 @@ func TestServer(t *testing.T) {
}
}

func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
Expand All @@ -105,3 +107,67 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos

return conn.LocalAddr().String(), conn
}

// startUDSTestServerWithSocketFile starts a UDS server with a given socket file.
func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) {
udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

go Serve(conn, handler)

return &testUnixgramServer{
UnixConn: conn,
pathToDelete: socketPath,
}
}

// startUDSTestServer starts a UDS server with a random socket file internally generated.
func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) {
// generate a random dir
dir, err := os.MkdirTemp("", "socket")
if err != nil {
t.Error(err)
t.FailNow()
}

socketPath = filepath.Join(dir, "dsd.socket")

udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

ts := testUnixgramServer{
UnixConn: conn,
pathToDelete: dir, // so we delete any tmp dir we created
}

go Serve(conn, handler)
return socketPath, &ts
}

type testUnixgramServer struct {
*net.UnixConn
pathToDelete string
}

func (ts *testUnixgramServer) Close() error {
os.RemoveAll(ts.pathToDelete) // clean up
return ts.UnixConn.Close()
}
Loading

0 comments on commit 251d359

Please sign in to comment.