Skip to content

Commit

Permalink
Datadog Flush (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 9, 2024
1 parent 74db570 commit dcc60d2
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 216 deletions.
1 change: 1 addition & 0 deletions .github/workflows/gha-go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
env:
SC_VERSION: 2023.1.7
run: |
make generate
SC_URL="https://github.com/dominikh/go-tools/releases/download/$SC_VERSION/staticcheck_linux_amd64.tar.gz"
wget -q ${SC_URL} -O - | tar -xzf - --strip-components 1 -C /usr/local/bin staticcheck/staticcheck
make static
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Local .terraform directories
**/.terraform/*

# Ignore mocks
**/*.mock.go

# Terraform lock
*.terraform.lock.hcl

Expand Down
5 changes: 4 additions & 1 deletion lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error
}
}

b.statsD.Count("kafka.publish", int64(len(batch)), tags)
if b.statsD != nil {
b.statsD.Count("kafka.publish", int64(len(batch)), tags)
}

if kafkaErr != nil {
return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(batch))
}
Expand Down
208 changes: 0 additions & 208 deletions lib/mocks/client.mock.go

This file was deleted.

5 changes: 5 additions & 0 deletions lib/mtr/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Client interface {
Incr(name string, tags map[string]string)
Gauge(name string, value float64, tags map[string]string)
Count(name string, value int64, tags map[string]string)
Flush()
}

type statsClient struct {
Expand All @@ -34,6 +35,10 @@ func toDatadogTags(tags map[string]string) []string {
return retTags
}

func (s *statsClient) Flush() {
_ = s.client.Flush()
}

func (s *statsClient) Count(name string, value int64, tags map[string]string) {
_ = s.client.Count(name, value, toDatadogTags(tags), s.rate)
}
Expand Down
14 changes: 9 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"log/slog"

"github.com/artie-labs/transfer/lib/telemetry/metrics"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/kafkalib"
"github.com/artie-labs/reader/lib/logger"
Expand All @@ -23,7 +21,7 @@ import (

func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) {
if cfg == nil {
return &metrics.NullMetricsProvider{}, nil
return nil, nil
}

slog.Info("Creating metrics client")
Expand Down Expand Up @@ -86,15 +84,21 @@ func main() {
}

_logger, cleanUpHandlers := logger.NewLogger(cfg)
defer cleanUpHandlers()
slog.SetDefault(_logger)
ctx := context.Background()

statsD, err := setUpMetrics(cfg.Metrics)
if err != nil {
logger.Fatal("Failed to set up metrics", slog.Any("err", err))
}

defer func() {
cleanUpHandlers()
if statsD != nil {
statsD.Flush()
}
}()

ctx := context.Background()
destinationWriter, err := buildDestinationWriter(ctx, cfg, statsD)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to init %q destination writer", cfg.Destination), slog.Any("err", err))
Expand Down
8 changes: 6 additions & 2 deletions writers/transfer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error {
"table": events[0].Table,
}
defer func() {
w.statsD.Count("process.message", int64(len(events)), tags)
if w.statsD != nil {
w.statsD.Count("process.message", int64(len(events)), tags)
}
}()

for _, evt := range events {
Expand Down Expand Up @@ -167,7 +169,9 @@ func (w *Writer) flush(reason string) error {
"reason": reason,
}
defer func() {
w.statsD.Timing("flush", time.Since(start), tags)
if w.statsD != nil {
w.statsD.Timing("flush", time.Since(start), tags)
}
}()

if !w.tc.SoftDelete {
Expand Down

0 comments on commit dcc60d2

Please sign in to comment.