From dcc60d22195152364c3de997cb09163541fd468a Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 9 May 2024 15:05:27 -0700 Subject: [PATCH] Datadog Flush (#379) --- .github/workflows/gha-go-test.yaml | 1 + .gitignore | 3 + lib/kafkalib/writer.go | 5 +- lib/mocks/client.mock.go | 208 ----------------------------- lib/mtr/datadog.go | 5 + main.go | 14 +- writers/transfer/writer.go | 8 +- 7 files changed, 28 insertions(+), 216 deletions(-) delete mode 100644 lib/mocks/client.mock.go diff --git a/.github/workflows/gha-go-test.yaml b/.github/workflows/gha-go-test.yaml index d8be6f14..1808b3ed 100644 --- a/.github/workflows/gha-go-test.yaml +++ b/.github/workflows/gha-go-test.yaml @@ -14,6 +14,7 @@ jobs: env: SC_VERSION: 2023.1.7 run: | + make generate SC_URL="https://github.com/dominikh/go-tools/releases/download/$SC_VERSION/staticcheck_linux_amd64.tar.gz" wget -q ${SC_URL} -O - | tar -xzf - --strip-components 1 -C /usr/local/bin staticcheck/staticcheck make static diff --git a/.gitignore b/.gitignore index 2fe84b7c..233096c3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,9 @@ # Local .terraform directories **/.terraform/* +# Ignore mocks +**/*.mock.go + # Terraform lock *.terraform.lock.hcl diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index e8ffa93e..68d03c9c 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -141,7 +141,10 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error } } - b.statsD.Count("kafka.publish", int64(len(batch)), tags) + if b.statsD != nil { + b.statsD.Count("kafka.publish", int64(len(batch)), tags) + } + if kafkaErr != nil { return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(batch)) } diff --git a/lib/mocks/client.mock.go b/lib/mocks/client.mock.go deleted file mode 100644 index a7320cd1..00000000 --- a/lib/mocks/client.mock.go +++ /dev/null @@ -1,208 +0,0 @@ -// Code generated by counterfeiter. DO NOT EDIT. -package mocks - -import ( - "sync" - "time" - - "github.com/artie-labs/reader/lib/mtr" -) - -type FakeClient struct { - CountStub func(string, int64, map[string]string) - countMutex sync.RWMutex - countArgsForCall []struct { - arg1 string - arg2 int64 - arg3 map[string]string - } - GaugeStub func(string, float64, map[string]string) - gaugeMutex sync.RWMutex - gaugeArgsForCall []struct { - arg1 string - arg2 float64 - arg3 map[string]string - } - IncrStub func(string, map[string]string) - incrMutex sync.RWMutex - incrArgsForCall []struct { - arg1 string - arg2 map[string]string - } - TimingStub func(string, time.Duration, map[string]string) - timingMutex sync.RWMutex - timingArgsForCall []struct { - arg1 string - arg2 time.Duration - arg3 map[string]string - } - invocations map[string][][]interface{} - invocationsMutex sync.RWMutex -} - -func (fake *FakeClient) Count(arg1 string, arg2 int64, arg3 map[string]string) { - fake.countMutex.Lock() - fake.countArgsForCall = append(fake.countArgsForCall, struct { - arg1 string - arg2 int64 - arg3 map[string]string - }{arg1, arg2, arg3}) - stub := fake.CountStub - fake.recordInvocation("Count", []interface{}{arg1, arg2, arg3}) - fake.countMutex.Unlock() - if stub != nil { - fake.CountStub(arg1, arg2, arg3) - } -} - -func (fake *FakeClient) CountCallCount() int { - fake.countMutex.RLock() - defer fake.countMutex.RUnlock() - return len(fake.countArgsForCall) -} - -func (fake *FakeClient) CountCalls(stub func(string, int64, map[string]string)) { - fake.countMutex.Lock() - defer fake.countMutex.Unlock() - fake.CountStub = stub -} - -func (fake *FakeClient) CountArgsForCall(i int) (string, int64, map[string]string) { - fake.countMutex.RLock() - defer fake.countMutex.RUnlock() - argsForCall := fake.countArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeClient) Gauge(arg1 string, arg2 float64, arg3 map[string]string) { - fake.gaugeMutex.Lock() - fake.gaugeArgsForCall = append(fake.gaugeArgsForCall, struct { - arg1 string - arg2 float64 - arg3 map[string]string - }{arg1, arg2, arg3}) - stub := fake.GaugeStub - fake.recordInvocation("Gauge", []interface{}{arg1, arg2, arg3}) - fake.gaugeMutex.Unlock() - if stub != nil { - fake.GaugeStub(arg1, arg2, arg3) - } -} - -func (fake *FakeClient) GaugeCallCount() int { - fake.gaugeMutex.RLock() - defer fake.gaugeMutex.RUnlock() - return len(fake.gaugeArgsForCall) -} - -func (fake *FakeClient) GaugeCalls(stub func(string, float64, map[string]string)) { - fake.gaugeMutex.Lock() - defer fake.gaugeMutex.Unlock() - fake.GaugeStub = stub -} - -func (fake *FakeClient) GaugeArgsForCall(i int) (string, float64, map[string]string) { - fake.gaugeMutex.RLock() - defer fake.gaugeMutex.RUnlock() - argsForCall := fake.gaugeArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeClient) Incr(arg1 string, arg2 map[string]string) { - fake.incrMutex.Lock() - fake.incrArgsForCall = append(fake.incrArgsForCall, struct { - arg1 string - arg2 map[string]string - }{arg1, arg2}) - stub := fake.IncrStub - fake.recordInvocation("Incr", []interface{}{arg1, arg2}) - fake.incrMutex.Unlock() - if stub != nil { - fake.IncrStub(arg1, arg2) - } -} - -func (fake *FakeClient) IncrCallCount() int { - fake.incrMutex.RLock() - defer fake.incrMutex.RUnlock() - return len(fake.incrArgsForCall) -} - -func (fake *FakeClient) IncrCalls(stub func(string, map[string]string)) { - fake.incrMutex.Lock() - defer fake.incrMutex.Unlock() - fake.IncrStub = stub -} - -func (fake *FakeClient) IncrArgsForCall(i int) (string, map[string]string) { - fake.incrMutex.RLock() - defer fake.incrMutex.RUnlock() - argsForCall := fake.incrArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeClient) Timing(arg1 string, arg2 time.Duration, arg3 map[string]string) { - fake.timingMutex.Lock() - fake.timingArgsForCall = append(fake.timingArgsForCall, struct { - arg1 string - arg2 time.Duration - arg3 map[string]string - }{arg1, arg2, arg3}) - stub := fake.TimingStub - fake.recordInvocation("Timing", []interface{}{arg1, arg2, arg3}) - fake.timingMutex.Unlock() - if stub != nil { - fake.TimingStub(arg1, arg2, arg3) - } -} - -func (fake *FakeClient) TimingCallCount() int { - fake.timingMutex.RLock() - defer fake.timingMutex.RUnlock() - return len(fake.timingArgsForCall) -} - -func (fake *FakeClient) TimingCalls(stub func(string, time.Duration, map[string]string)) { - fake.timingMutex.Lock() - defer fake.timingMutex.Unlock() - fake.TimingStub = stub -} - -func (fake *FakeClient) TimingArgsForCall(i int) (string, time.Duration, map[string]string) { - fake.timingMutex.RLock() - defer fake.timingMutex.RUnlock() - argsForCall := fake.timingArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeClient) Invocations() map[string][][]interface{} { - fake.invocationsMutex.RLock() - defer fake.invocationsMutex.RUnlock() - fake.countMutex.RLock() - defer fake.countMutex.RUnlock() - fake.gaugeMutex.RLock() - defer fake.gaugeMutex.RUnlock() - fake.incrMutex.RLock() - defer fake.incrMutex.RUnlock() - fake.timingMutex.RLock() - defer fake.timingMutex.RUnlock() - copiedInvocations := map[string][][]interface{}{} - for key, value := range fake.invocations { - copiedInvocations[key] = value - } - return copiedInvocations -} - -func (fake *FakeClient) recordInvocation(key string, args []interface{}) { - fake.invocationsMutex.Lock() - defer fake.invocationsMutex.Unlock() - if fake.invocations == nil { - fake.invocations = map[string][][]interface{}{} - } - if fake.invocations[key] == nil { - fake.invocations[key] = [][]interface{}{} - } - fake.invocations[key] = append(fake.invocations[key], args) -} - -var _ mtr.Client = new(FakeClient) diff --git a/lib/mtr/datadog.go b/lib/mtr/datadog.go index a3fa3da5..cd332fa3 100644 --- a/lib/mtr/datadog.go +++ b/lib/mtr/datadog.go @@ -18,6 +18,7 @@ type Client interface { Incr(name string, tags map[string]string) Gauge(name string, value float64, tags map[string]string) Count(name string, value int64, tags map[string]string) + Flush() } type statsClient struct { @@ -34,6 +35,10 @@ func toDatadogTags(tags map[string]string) []string { return retTags } +func (s *statsClient) Flush() { + _ = s.client.Flush() +} + func (s *statsClient) Count(name string, value int64, tags map[string]string) { _ = s.client.Count(name, value, toDatadogTags(tags), s.rate) } diff --git a/main.go b/main.go index 708f587b..d99f8321 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,6 @@ import ( "fmt" "log/slog" - "github.com/artie-labs/transfer/lib/telemetry/metrics" - "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib/kafkalib" "github.com/artie-labs/reader/lib/logger" @@ -23,7 +21,7 @@ import ( func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) { if cfg == nil { - return &metrics.NullMetricsProvider{}, nil + return nil, nil } slog.Info("Creating metrics client") @@ -86,15 +84,21 @@ func main() { } _logger, cleanUpHandlers := logger.NewLogger(cfg) - defer cleanUpHandlers() slog.SetDefault(_logger) - ctx := context.Background() statsD, err := setUpMetrics(cfg.Metrics) if err != nil { logger.Fatal("Failed to set up metrics", slog.Any("err", err)) } + defer func() { + cleanUpHandlers() + if statsD != nil { + statsD.Flush() + } + }() + + ctx := context.Background() destinationWriter, err := buildDestinationWriter(ctx, cfg, statsD) if err != nil { logger.Fatal(fmt.Sprintf("Failed to init %q destination writer", cfg.Destination), slog.Any("err", err)) diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index 361864c7..33c376e0 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -107,7 +107,9 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error { "table": events[0].Table, } defer func() { - w.statsD.Count("process.message", int64(len(events)), tags) + if w.statsD != nil { + w.statsD.Count("process.message", int64(len(events)), tags) + } }() for _, evt := range events { @@ -167,7 +169,9 @@ func (w *Writer) flush(reason string) error { "reason": reason, } defer func() { - w.statsD.Timing("flush", time.Since(start), tags) + if w.statsD != nil { + w.statsD.Timing("flush", time.Since(start), tags) + } }() if !w.tc.SoftDelete {