diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index d552a2bd..65661da1 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -24,10 +24,10 @@ const ( type BatchWriter struct { writer *kafka.Writer cfg config.Kafka - statsD *mtr.Client + statsD mtr.Client } -func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD *mtr.Client) (*BatchWriter, error) { +func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD mtr.Client) (*BatchWriter, error) { if cfg.TopicPrefix == "" { return nil, fmt.Errorf("kafka topic prefix cannot be empty") } @@ -123,10 +123,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e } } - if w.statsD != nil { - (*w.statsD).Count("kafka.publish", int64(len(chunk)), tags) - } - + w.statsD.Count("kafka.publish", int64(len(chunk)), tags) if kafkaErr != nil { return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(chunk)) } diff --git a/lib/mtr/datadog.go b/lib/mtr/datadog.go index 08dfc4f0..a3fa3da5 100644 --- a/lib/mtr/datadog.go +++ b/lib/mtr/datadog.go @@ -16,7 +16,7 @@ const ( type Client interface { Timing(name string, value time.Duration, tags map[string]string) Incr(name string, tags map[string]string) - Gauge(name string, tags map[string]string, value float64) + Gauge(name string, value float64, tags map[string]string) Count(name string, value int64, tags map[string]string) } @@ -46,6 +46,6 @@ func (s *statsClient) Incr(name string, tags map[string]string) { _ = s.client.Incr(name, toDatadogTags(tags), s.rate) } -func (s *statsClient) Gauge(name string, tags map[string]string, value float64) { +func (s *statsClient) Gauge(name string, value float64, tags map[string]string) { _ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate) } diff --git a/lib/mtr/mtr.go b/lib/mtr/mtr.go index be8d6b73..a1e2e1c5 100644 --- a/lib/mtr/mtr.go +++ b/lib/mtr/mtr.go @@ -9,7 +9,7 @@ import ( "github.com/artie-labs/transfer/lib/stringutil" ) -func New(namespace string, tags []string, samplingRate float64) (Client, error) { +func New(namespace string, tags []string, samplingRate float64) (*statsClient, error) { host := os.Getenv("TELEMETRY_HOST") port := os.Getenv("TELEMETRY_PORT") address := DefaultAddr diff --git a/lib/postgres/message.go b/lib/postgres/message.go index 08f7ac52..ad2ddcac 100644 --- a/lib/postgres/message.go +++ b/lib/postgres/message.go @@ -11,12 +11,12 @@ import ( ) type MessageBuilder struct { - statsD *mtr.Client + statsD mtr.Client table *Table iter batchRowIterator } -func NewMessageBuilder(table *Table, iter batchRowIterator, statsD *mtr.Client) *MessageBuilder { +func NewMessageBuilder(table *Table, iter batchRowIterator, statsD mtr.Client) *MessageBuilder { return &MessageBuilder{ table: table, iter: iter, @@ -34,12 +34,11 @@ func (m *MessageBuilder) HasNext() bool { } func (m *MessageBuilder) recordMetrics(start time.Time) { - if m.statsD != nil { - (*m.statsD).Timing("scanned_and_parsed", time.Since(start), map[string]string{ - "table": strings.ReplaceAll(m.table.Name, `"`, ``), - "schema": m.table.Schema, - }) - } + m.statsD.Timing("scanned_and_parsed", time.Since(start), map[string]string{ + "table": strings.ReplaceAll(m.table.Name, `"`, ``), + "schema": m.table.Schema, + }) + } func (m *MessageBuilder) Next() ([]lib.RawMessage, error) { diff --git a/lib/postgres/message_test.go b/lib/postgres/message_test.go index f6df5073..f653b1f3 100644 --- a/lib/postgres/message_test.go +++ b/lib/postgres/message_test.go @@ -2,6 +2,7 @@ package postgres import ( "fmt" + "github.com/artie-labs/transfer/lib/telemetry/metrics" "testing" "github.com/artie-labs/transfer/lib/cdc/util" @@ -50,7 +51,7 @@ func TestMessageBuilder(t *testing.T) { builder := NewMessageBuilder( table, &MockRowIterator{batches: [][]map[string]interface{}{}}, - nil, + &metrics.NullMetricsProvider{}, ) assert.False(t, builder.HasNext()) } @@ -60,7 +61,7 @@ func TestMessageBuilder(t *testing.T) { builder := NewMessageBuilder( table, &ErrorRowIterator{}, - nil, + &metrics.NullMetricsProvider{}, ) assert.True(t, builder.HasNext()) @@ -78,7 +79,7 @@ func TestMessageBuilder(t *testing.T) { {{"a": "3", "b": "13"}, {"a": "4", "b": "14"}}, }, }, - nil, + &metrics.NullMetricsProvider{}, ) assert.True(t, builder.HasNext()) diff --git a/main.go b/main.go index 050ac1be..8f9a3acf 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/artie-labs/transfer/lib/telemetry/metrics" "log/slog" "time" @@ -19,9 +20,9 @@ import ( "github.com/artie-labs/reader/sources/postgres" ) -func setUpMetrics(cfg *config.Metrics) (*mtr.Client, error) { +func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) { if cfg == nil { - return nil, nil + return &metrics.NullMetricsProvider{}, nil } slog.Info("Creating metrics client") @@ -29,10 +30,11 @@ func setUpMetrics(cfg *config.Metrics) (*mtr.Client, error) { if err != nil { return nil, err } - return &client, nil + + return client, nil } -func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD *mtr.Client) (*kafkalib.BatchWriter, error) { +func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD mtr.Client) (*kafkalib.BatchWriter, error) { if cfg == nil { return nil, fmt.Errorf("kafka configuration is not set") } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 4538248b..1c2393b3 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -70,7 +70,7 @@ func (s *Store) Close() error { return nil } -func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter, _ *mtr.Client) error { +func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter, _ mtr.Client) error { if s.cfg.Snapshot { if err := s.scanFilesOverBucket(); err != nil { return fmt.Errorf("scanning files over bucket failed: %w", err) diff --git a/sources/mongo/mongo.go b/sources/mongo/mongo.go index 20d63666..ebc336b8 100644 --- a/sources/mongo/mongo.go +++ b/sources/mongo/mongo.go @@ -50,7 +50,7 @@ func (s *Source) Close() error { return nil } -func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, _ *mtr.Client) error { +func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, _ mtr.Client) error { for _, collection := range s.cfg.Collections { snapshotStartTime := time.Now() diff --git a/sources/postgres/snapshot.go b/sources/postgres/snapshot.go index 6d03bb81..fae476d1 100644 --- a/sources/postgres/snapshot.go +++ b/sources/postgres/snapshot.go @@ -38,7 +38,7 @@ func (s *Source) Close() error { return s.db.Close() } -func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, statsD *mtr.Client) error { +func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, statsD mtr.Client) error { for _, tableCfg := range s.cfg.Tables { snapshotStartTime := time.Now() diff --git a/sources/source.go b/sources/source.go index 7b418e6f..c0c25c8a 100644 --- a/sources/source.go +++ b/sources/source.go @@ -9,5 +9,5 @@ import ( type Source interface { Close() error - Run(ctx context.Context, writer kafkalib.BatchWriter, statsD *mtr.Client) error + Run(ctx context.Context, writer kafkalib.BatchWriter, statsD mtr.Client) error }