Skip to content

Commit

Permalink
Using Null metrics provider if none is available (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Feb 15, 2024
1 parent 45f8eda commit a3a1a3b
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 28 deletions.
9 changes: 3 additions & 6 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (
type BatchWriter struct {
writer *kafka.Writer
cfg config.Kafka
statsD *mtr.Client
statsD mtr.Client
}

func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD *mtr.Client) (*BatchWriter, error) {
func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD mtr.Client) (*BatchWriter, error) {
if cfg.TopicPrefix == "" {
return nil, fmt.Errorf("kafka topic prefix cannot be empty")
}
Expand Down Expand Up @@ -123,10 +123,7 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
}
}

if w.statsD != nil {
(*w.statsD).Count("kafka.publish", int64(len(chunk)), tags)
}

w.statsD.Count("kafka.publish", int64(len(chunk)), tags)
if kafkaErr != nil {
return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(chunk))
}
Expand Down
4 changes: 2 additions & 2 deletions lib/mtr/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
type Client interface {
Timing(name string, value time.Duration, tags map[string]string)
Incr(name string, tags map[string]string)
Gauge(name string, tags map[string]string, value float64)
Gauge(name string, value float64, tags map[string]string)
Count(name string, value int64, tags map[string]string)
}

Expand Down Expand Up @@ -46,6 +46,6 @@ func (s *statsClient) Incr(name string, tags map[string]string) {
_ = s.client.Incr(name, toDatadogTags(tags), s.rate)
}

func (s *statsClient) Gauge(name string, tags map[string]string, value float64) {
func (s *statsClient) Gauge(name string, value float64, tags map[string]string) {
_ = s.client.Gauge(name, value, toDatadogTags(tags), s.rate)
}
2 changes: 1 addition & 1 deletion lib/mtr/mtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/artie-labs/transfer/lib/stringutil"
)

func New(namespace string, tags []string, samplingRate float64) (Client, error) {
func New(namespace string, tags []string, samplingRate float64) (*statsClient, error) {
host := os.Getenv("TELEMETRY_HOST")
port := os.Getenv("TELEMETRY_PORT")
address := DefaultAddr
Expand Down
15 changes: 7 additions & 8 deletions lib/postgres/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

type MessageBuilder struct {
statsD *mtr.Client
statsD mtr.Client
table *Table
iter batchRowIterator
}

func NewMessageBuilder(table *Table, iter batchRowIterator, statsD *mtr.Client) *MessageBuilder {
func NewMessageBuilder(table *Table, iter batchRowIterator, statsD mtr.Client) *MessageBuilder {
return &MessageBuilder{
table: table,
iter: iter,
Expand All @@ -34,12 +34,11 @@ func (m *MessageBuilder) HasNext() bool {
}

func (m *MessageBuilder) recordMetrics(start time.Time) {
if m.statsD != nil {
(*m.statsD).Timing("scanned_and_parsed", time.Since(start), map[string]string{
"table": strings.ReplaceAll(m.table.Name, `"`, ``),
"schema": m.table.Schema,
})
}
m.statsD.Timing("scanned_and_parsed", time.Since(start), map[string]string{
"table": strings.ReplaceAll(m.table.Name, `"`, ``),
"schema": m.table.Schema,
})

}

func (m *MessageBuilder) Next() ([]lib.RawMessage, error) {
Expand Down
7 changes: 4 additions & 3 deletions lib/postgres/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package postgres

import (
"fmt"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"testing"

"github.com/artie-labs/transfer/lib/cdc/util"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestMessageBuilder(t *testing.T) {
builder := NewMessageBuilder(
table,
&MockRowIterator{batches: [][]map[string]interface{}{}},
nil,
&metrics.NullMetricsProvider{},
)
assert.False(t, builder.HasNext())
}
Expand All @@ -60,7 +61,7 @@ func TestMessageBuilder(t *testing.T) {
builder := NewMessageBuilder(
table,
&ErrorRowIterator{},
nil,
&metrics.NullMetricsProvider{},
)

assert.True(t, builder.HasNext())
Expand All @@ -78,7 +79,7 @@ func TestMessageBuilder(t *testing.T) {
{{"a": "3", "b": "13"}, {"a": "4", "b": "14"}},
},
},
nil,
&metrics.NullMetricsProvider{},
)

assert.True(t, builder.HasNext())
Expand Down
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"log/slog"
"time"

Expand All @@ -19,20 +20,21 @@ import (
"github.com/artie-labs/reader/sources/postgres"
)

func setUpMetrics(cfg *config.Metrics) (*mtr.Client, error) {
func setUpMetrics(cfg *config.Metrics) (mtr.Client, error) {
if cfg == nil {
return nil, nil
return &metrics.NullMetricsProvider{}, nil
}

slog.Info("Creating metrics client")
client, err := mtr.New(cfg.Namespace, cfg.Tags, 0.5)
if err != nil {
return nil, err
}
return &client, nil

return client, nil
}

func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD *mtr.Client) (*kafkalib.BatchWriter, error) {
func setUpKafka(ctx context.Context, cfg *config.Kafka, statsD mtr.Client) (*kafkalib.BatchWriter, error) {
if cfg == nil {
return nil, fmt.Errorf("kafka configuration is not set")
}
Expand Down
2 changes: 1 addition & 1 deletion sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Store) Close() error {
return nil
}

func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter, _ *mtr.Client) error {
func (s *Store) Run(ctx context.Context, writer kafkalib.BatchWriter, _ mtr.Client) error {
if s.cfg.Snapshot {
if err := s.scanFilesOverBucket(); err != nil {
return fmt.Errorf("scanning files over bucket failed: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion sources/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *Source) Close() error {
return nil
}

func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, _ *mtr.Client) error {
func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, _ mtr.Client) error {
for _, collection := range s.cfg.Collections {
snapshotStartTime := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *Source) Close() error {
return s.db.Close()
}

func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, statsD *mtr.Client) error {
func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter, statsD mtr.Client) error {
for _, tableCfg := range s.cfg.Tables {
snapshotStartTime := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (

type Source interface {
Close() error
Run(ctx context.Context, writer kafkalib.BatchWriter, statsD *mtr.Client) error
Run(ctx context.Context, writer kafkalib.BatchWriter, statsD mtr.Client) error
}

0 comments on commit a3a1a3b

Please sign in to comment.