diff --git a/constants/constants.go b/constants/constants.go index 1ecabeb4..033f4e03 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -6,4 +6,5 @@ const ( ConfigKey contextKey = "__cfg" KafkaKey contextKey = "__kafka" LoggerKey contextKey = "__logger" + MtrKey contextKey = "__mtr" ) diff --git a/go.mod b/go.mod index f30fa1d0..49ef3c3c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/artie-labs/reader go 1.19 require ( + github.com/DataDog/datadog-go v4.8.3+incompatible github.com/artie-labs/transfer v0.0.0-20231106205704-cf038be65858 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2/config v1.18.19 @@ -15,7 +16,6 @@ require ( ) require ( - github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/Microsoft/go-winio v0.6.0 // indirect github.com/aws/aws-sdk-go-v2 v1.18.1 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.18 // indirect diff --git a/go.sum b/go.sum index f1d39239..b60847dc 100644 --- a/go.sum +++ b/go.sum @@ -94,6 +94,7 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0 github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -120,6 +121,7 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/lib/kafkalib/batch.go b/lib/kafkalib/batch.go index dc372318..eb56449d 100644 --- a/lib/kafkalib/batch.go +++ b/lib/kafkalib/batch.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "github.com/artie-labs/reader/lib/logger" + "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/transfer/lib/jitter" - "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/segmentio/kafka-go" "time" ) @@ -91,7 +91,7 @@ func (b *Batch) Publish(ctx context.Context) error { time.Sleep(sleepDuration) } - metrics.FromContext(ctx).Count("kafka.publish", count, tags) + mtr.FromContext(ctx).Count("kafka.publish", count, tags) if err != nil { return err } diff --git a/lib/mtr/datadog.go b/lib/mtr/datadog.go new file mode 100644 index 00000000..08dfc4f0 --- /dev/null +++ b/lib/mtr/datadog.go @@ -0,0 +1,51 @@ +package mtr + +import ( + "fmt" + "time" + + "github.com/DataDog/datadog-go/statsd" +) + +const ( + DefaultNamespace = "reader." + // DefaultAddr is the default address for where the DD agent would be running on a single host machine + DefaultAddr = "127.0.0.1:8125" +) + +type Client interface { + Timing(name string, value time.Duration, tags map[string]string) + Incr(name string, tags map[string]string) + Gauge(name string, tags map[string]string, value float64) + Count(name string, value int64, tags map[string]string) +} + +type statsClient struct { + client *statsd.Client + rate float64 +} + +func toDatadogTags(tags map[string]string) []string { + var retTags []string + for key, val := range tags { + retTags = append(retTags, fmt.Sprintf("%s:%s", key, val)) + } + + return retTags +} + +func (s *statsClient) Count(name string, value int64, tags map[string]string) { + _ = s.client.Count(name, value, toDatadogTags(tags), s.rate) +} + +func (s *statsClient) Timing(name string, value time.Duration, tags map[string]string) { + _ = s.client.Timing(name, value, toDatadogTags(tags), s.rate) +} + +func (s *statsClient) Incr(name string, tags map[string]string) { + _ = s.client.Incr(name, toDatadogTags(tags), s.rate) +} + +func (s *statsClient) Gauge(name string, tags map[string]string, value float64) { + _ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate) +} diff --git a/lib/mtr/mtr.go b/lib/mtr/mtr.go new file mode 100644 index 00000000..2be263e5 --- /dev/null +++ b/lib/mtr/mtr.go @@ -0,0 +1,48 @@ +package mtr + +import ( + "context" + "fmt" + "github.com/DataDog/datadog-go/statsd" + "github.com/artie-labs/transfer/lib/stringutil" + "os" + + "github.com/artie-labs/reader/constants" + "github.com/artie-labs/reader/lib/logger" +) + +func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string, samplingRate float64) context.Context { + host := os.Getenv("TELEMETRY_HOST") + port := os.Getenv("TELEMETRY_PORT") + address := DefaultAddr + if !stringutil.Empty(host, port) { + address = fmt.Sprintf("%s:%s", host, port) + logger.FromContext(ctx).WithField("address", address).Info("overriding telemetry address with env vars") + } + + datadogClient, err := statsd.New(address) + if err != nil { + logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client") + } + + datadogClient.Tags = tags + datadogClient.Namespace = stringutil.Override(DefaultNamespace, namespace) + return context.WithValue(ctx, constants.MtrKey, &statsClient{ + client: datadogClient, + rate: samplingRate, + }) +} + +func FromContext(ctx context.Context) Client { + metricsClientVal := ctx.Value(constants.MtrKey) + if metricsClientVal == nil { + logger.FromContext(ctx).Fatal("metrics client is nil") + } + + metricsClient, isOk := metricsClientVal.(Client) + if !isOk { + logger.FromContext(ctx).Fatal("metrics client is not mtr.Client type") + } + + return metricsClient +} diff --git a/lib/ttlmap/ttlmap_suite_test.go b/lib/ttlmap/ttlmap_suite_test.go index 76c6ec5e..00bfb290 100644 --- a/lib/ttlmap/ttlmap_suite_test.go +++ b/lib/ttlmap/ttlmap_suite_test.go @@ -2,7 +2,6 @@ package ttlmap import ( "context" - "github.com/artie-labs/transfer/lib/config" "github.com/stretchr/testify/suite" "testing" ) @@ -13,14 +12,7 @@ type TTLMapTestSuite struct { } func (t *TTLMapTestSuite) SetupTest() { - ctx := config.InjectSettingsIntoContext(context.Background(), &config.Settings{ - VerboseLogging: true, - Config: &config.Config{ - Redshift: &config.Redshift{}, - }, - }) - - t.ctx = ctx + t.ctx = context.Background() } func TestTTLMapTestSuite(t *testing.T) { diff --git a/main.go b/main.go index cd327ff2..1287798d 100644 --- a/main.go +++ b/main.go @@ -6,9 +6,8 @@ import ( "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" + "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/reader/sources/dynamodb" - "github.com/artie-labs/transfer/lib/telemetry/metrics" - "github.com/artie-labs/transfer/lib/telemetry/metrics/datadog" "log" ) @@ -27,18 +26,7 @@ func main() { ctx = kafkalib.InjectIntoContext(ctx) if cfg.Metrics != nil { logger.FromContext(ctx).Info("injecting datadog") - client, err := datadog.NewDatadogClient(ctx, map[string]interface{}{ - datadog.Namespace: cfg.Metrics.Namespace, - datadog.Tags: cfg.Metrics.Tags, - // Sample 50% to start, we can make this configurable later. - datadog.Sampling: 0.5, - }) - - if err != nil { - logger.FromContext(ctx).WithError(err).Fatal("failed to create datadog client") - } - - ctx = metrics.InjectMetricsClientIntoCtx(ctx, client) + ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5) } ddb := dynamodb.Load(ctx)