Skip to content

Commit

Permalink
Refactor to use its own Datadog client (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jan 5, 2024
1 parent 42e52ff commit 2f4bb61
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 26 deletions.
1 change: 1 addition & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ const (
ConfigKey contextKey = "__cfg"
KafkaKey contextKey = "__kafka"
LoggerKey contextKey = "__logger"
MtrKey contextKey = "__mtr"
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/artie-labs/reader
go 1.19

require (
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/artie-labs/transfer v0.0.0-20231106205704-cf038be65858
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go-v2/config v1.18.19
Expand All @@ -15,7 +16,6 @@ require (
)

require (
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.18 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand All @@ -120,6 +121,7 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
4 changes: 2 additions & 2 deletions lib/kafkalib/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/segmentio/kafka-go"
"time"
)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (b *Batch) Publish(ctx context.Context) error {
time.Sleep(sleepDuration)
}

metrics.FromContext(ctx).Count("kafka.publish", count, tags)
mtr.FromContext(ctx).Count("kafka.publish", count, tags)
if err != nil {
return err
}
Expand Down
51 changes: 51 additions & 0 deletions lib/mtr/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package mtr

import (
"fmt"
"time"

"github.com/DataDog/datadog-go/statsd"
)

const (
DefaultNamespace = "reader."
// DefaultAddr is the default address for where the DD agent would be running on a single host machine
DefaultAddr = "127.0.0.1:8125"
)

type Client interface {
Timing(name string, value time.Duration, tags map[string]string)
Incr(name string, tags map[string]string)
Gauge(name string, tags map[string]string, value float64)
Count(name string, value int64, tags map[string]string)
}

type statsClient struct {
client *statsd.Client
rate float64
}

func toDatadogTags(tags map[string]string) []string {
var retTags []string
for key, val := range tags {
retTags = append(retTags, fmt.Sprintf("%s:%s", key, val))
}

return retTags
}

func (s *statsClient) Count(name string, value int64, tags map[string]string) {
_ = s.client.Count(name, value, toDatadogTags(tags), s.rate)
}

func (s *statsClient) Timing(name string, value time.Duration, tags map[string]string) {
_ = s.client.Timing(name, value, toDatadogTags(tags), s.rate)
}

func (s *statsClient) Incr(name string, tags map[string]string) {
_ = s.client.Incr(name, toDatadogTags(tags), s.rate)
}

func (s *statsClient) Gauge(name string, tags map[string]string, value float64) {
_ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate)
}
48 changes: 48 additions & 0 deletions lib/mtr/mtr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mtr

import (
"context"
"fmt"
"github.com/DataDog/datadog-go/statsd"
"github.com/artie-labs/transfer/lib/stringutil"
"os"

"github.com/artie-labs/reader/constants"
"github.com/artie-labs/reader/lib/logger"
)

func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string, samplingRate float64) context.Context {
host := os.Getenv("TELEMETRY_HOST")
port := os.Getenv("TELEMETRY_PORT")
address := DefaultAddr
if !stringutil.Empty(host, port) {
address = fmt.Sprintf("%s:%s", host, port)
logger.FromContext(ctx).WithField("address", address).Info("overriding telemetry address with env vars")
}

datadogClient, err := statsd.New(address)
if err != nil {
logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client")
}

datadogClient.Tags = tags
datadogClient.Namespace = stringutil.Override(DefaultNamespace, namespace)
return context.WithValue(ctx, constants.MtrKey, &statsClient{
client: datadogClient,
rate: samplingRate,
})
}

func FromContext(ctx context.Context) Client {
metricsClientVal := ctx.Value(constants.MtrKey)
if metricsClientVal == nil {
logger.FromContext(ctx).Fatal("metrics client is nil")
}

metricsClient, isOk := metricsClientVal.(Client)
if !isOk {
logger.FromContext(ctx).Fatal("metrics client is not mtr.Client type")
}

return metricsClient
}
10 changes: 1 addition & 9 deletions lib/ttlmap/ttlmap_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ttlmap

import (
"context"
"github.com/artie-labs/transfer/lib/config"
"github.com/stretchr/testify/suite"
"testing"
)
Expand All @@ -13,14 +12,7 @@ type TTLMapTestSuite struct {
}

func (t *TTLMapTestSuite) SetupTest() {
ctx := config.InjectSettingsIntoContext(context.Background(), &config.Settings{
VerboseLogging: true,
Config: &config.Config{
Redshift: &config.Redshift{},
},
})

t.ctx = ctx
t.ctx = context.Background()
}

func TestTTLMapTestSuite(t *testing.T) {
Expand Down
16 changes: 2 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/sources/dynamodb"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/artie-labs/transfer/lib/telemetry/metrics/datadog"
"log"
)

Expand All @@ -27,18 +26,7 @@ func main() {
ctx = kafkalib.InjectIntoContext(ctx)
if cfg.Metrics != nil {
logger.FromContext(ctx).Info("injecting datadog")
client, err := datadog.NewDatadogClient(ctx, map[string]interface{}{
datadog.Namespace: cfg.Metrics.Namespace,
datadog.Tags: cfg.Metrics.Tags,
// Sample 50% to start, we can make this configurable later.
datadog.Sampling: 0.5,
})

if err != nil {
logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client")
}

ctx = metrics.InjectMetricsClientIntoCtx(ctx, client)
ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5)
}

ddb := dynamodb.Load(ctx)
Expand Down

0 comments on commit 2f4bb61

Please sign in to comment.