Skip to content

Commit

Permalink
Standardize logging (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 24, 2024
1 parent 2be98bf commit 933fa28
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 36 deletions.
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package config
import (
"context"
"fmt"
"gopkg.in/yaml.v2"
"log"
"os"


"gopkg.in/yaml.v2"

"github.com/artie-labs/reader/constants"
)

Expand Down Expand Up @@ -87,17 +88,17 @@ func (s *Settings) Validate() error {
func ReadConfig(fp string) (*Settings, error) {
bytes, err := os.ReadFile(fp)
if err != nil {
log.Fatalf("failed to read config file, err: %v", err)
log.Fatalf("Failed to read config file, err: %v", err)
}

var settings Settings
err = yaml.Unmarshal(bytes, &settings)
if err != nil {
log.Fatalf("failed to unmarshal config file, err: %v", err)
log.Fatalf("Failed to unmarshal config file, err: %v", err)
}

if err = settings.Validate(); err != nil {
log.Fatalf("failed to validate config file, err: %v", err)
log.Fatalf("Failed to validate config file, err: %v", err)
}

settings.Kafka.GenerateDefault()
Expand Down
4 changes: 2 additions & 2 deletions lib/kafkalib/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func (b *Batch) Publish(ctx context.Context) error {
}

sleepDuration := time.Duration(jitter.JitterMs(RetryDelayMs, attempts)) * time.Millisecond
slog.With(
slog.Warn("Failed to publish message, jitter sleeping before retrying...",
slog.Any("err", err),
slog.Int("attempts", attempts),
slog.Int("maxAttempts", MaxRetries),
).Warn("failed to publish message, jitter sleeping before retrying...")
)
time.Sleep(sleepDuration)
}

Expand Down
6 changes: 3 additions & 3 deletions lib/kafkalib/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
func FromContext(ctx context.Context) *kafka.Writer {
kafkaVal := ctx.Value(constants.KafkaKey)
if kafkaVal == nil {
logger.Fatal("kafka is not set in context.Context")
logger.Fatal("Kafka is not set in context.Context")
}

kafkaWriter, isOk := kafkaVal.(*kafka.Writer)
if !isOk {
logger.Fatal("kafka writer is not type *kafka.Writer")
logger.Fatal("Kafka writer is not type *kafka.Writer")
}

return kafkaWriter
Expand All @@ -37,7 +37,7 @@ func InjectIntoContext(ctx context.Context) context.Context {
logger.Fatal("Kafka configuration is not set")
}

slog.With("url", strings.Split(cfg.Kafka.BootstrapServers, ",")).Info("setting bootstrap url")
slog.Info("Setting bootstrap url", slog.Any("url", strings.Split(cfg.Kafka.BootstrapServers, ",")))

writer := &kafka.Writer{
Addr: kafka.TCP(strings.Split(cfg.Kafka.BootstrapServers, ",")...),
Expand Down
8 changes: 4 additions & 4 deletions lib/mtr/mtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string,
address := DefaultAddr
if !stringutil.Empty(host, port) {
address = fmt.Sprintf("%s:%s", host, port)
slog.With("address", address).Info("overriding telemetry address with env vars")
slog.Info("Overriding telemetry address with env vars", slog.String("address", address))
}

datadogClient, err := statsd.New(address)
if err != nil {
logger.Fatal("failed to create datadog client", slog.Any("err", err))
logger.Fatal("Failed to create datadog client", slog.Any("err", err))
}

datadogClient.Tags = tags
Expand All @@ -38,12 +38,12 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string,
func FromContext(ctx context.Context) Client {
metricsClientVal := ctx.Value(constants.MtrKey)
if metricsClientVal == nil {
logger.Fatal("metrics client is nil")
logger.Fatal("Metrics client is nil")
}

metricsClient, isOk := metricsClientVal.(Client)
if !isOk {
logger.Fatal("metrics client is not mtr.Client type")
logger.Fatal("Metrics client is not mtr.Client type")
}

return metricsClient
Expand Down
4 changes: 2 additions & 2 deletions lib/ttlmap/ttlmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval
}

if err := t.loadFromFile(); err != nil {
slog.Warn("failed to load ttlmap from memory, starting a new one...", slog.Any("err", err))
slog.Warn("Failed to load ttlmap from memory, starting a new one...", slog.Any("err", err))
}

t.cleanupTicker = time.NewTicker(cleanupInterval)
Expand Down Expand Up @@ -95,7 +95,7 @@ func (t *TTLMap) cleanUpAndFlushRoutine() {
t.cleanup()
case <-t.flushTicker.C:
if err := t.flush(); err != nil {
logger.Fatal("failed to flush", slog.Any("err", err))
logger.Fatal("Failed to flush", slog.Any("err", err))
}
case <-t.closeChan:
return
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {

cfg, err := config.ReadConfig(configFilePath)
if err != nil {
logger.Fatal("failed to read config file", slog.Any("err", err))
logger.Fatal("Failed to read config file", slog.Any("err", err))
}

_logger, usingSentry := logger.NewLogger(cfg)
Expand All @@ -34,7 +34,7 @@ func main() {
ctx := config.InjectIntoContext(context.Background(), cfg)
ctx = kafkalib.InjectIntoContext(ctx)
if cfg.Metrics != nil {
slog.Info("injecting datadog")
slog.Info("Injecting datadog")
ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5)
}

Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func Load(ctx context.Context) *Store {
func (s *Store) Run(ctx context.Context) {
if s.cfg.Snapshot {
if err := s.scanFilesOverBucket(); err != nil {
logger.Fatal("scanning files over bucket failed", slog.Any("err", err))
logger.Fatal("Scanning files over bucket failed", slog.Any("err", err))
}

if err := s.streamAndPublish(ctx); err != nil {
logger.Fatal("stream and publish failed", slog.Any("err", err))
logger.Fatal("Stream and publish failed", slog.Any("err", err))
}

slog.Info("Finished snapshotting all the files")
Expand Down
20 changes: 10 additions & 10 deletions sources/dynamodb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
// If no one is processing it, let's mark it as being processed.
s.storage.SetShardProcessing(*shard.ShardId)
if s.storage.GetShardProcessed(*shard.ShardId) {
slog.With("shardId", *shard.ShardId).Info("shard has been processed, skipping...")
slog.Info("Shard has been processed, skipping...", slog.String("shardId", *shard.ShardId))
return
}

slog.With("shardId", *shard.ShardId).Info("processing shard...")
slog.Info("Processing shard...", slog.String("shardId", *shard.ShardId))

iteratorType := "TRIM_HORIZON"
var startingSequenceNumber string
Expand All @@ -56,11 +56,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)

iteratorOutput, err := s.streams.GetShardIterator(iteratorInput)
if err != nil {
slog.With(
slog.Warn("Failed to get shard iterator...",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
).Warn("failed to get shard iterator...")
)
return
}

Expand All @@ -74,19 +74,19 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)

getRecordsOutput, err := s.streams.GetRecords(getRecordsInput)
if err != nil {
slog.With(
slog.Warn("Failed to get records from shard iterator...",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
).Warn("failed to get records from shard iterator...")
)
break
}

var messages []kafka.Message
for _, record := range getRecordsOutput.Records {
msg, err := dynamo.NewMessage(record, s.tableName)
if err != nil {
logger.Fatal("failed to cast message from DynamoDB",
logger.Fatal("Failed to cast message from DynamoDB",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
Expand All @@ -96,7 +96,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)

message, err := msg.KafkaMessage(ctx)
if err != nil {
logger.Fatal("failed to cast message from DynamoDB",
logger.Fatal("Failed to cast message from DynamoDB",
slog.Any("err", err),
slog.String("streamArn", s.streamArn),
slog.String("shardId", *shard.ShardId),
Expand All @@ -108,7 +108,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
}

if err = kafkalib.NewBatch(messages, s.batchSize).Publish(ctx); err != nil {
logger.Fatal("failed to publish messages, exiting...", slog.Any("err", err))
logger.Fatal("Failed to publish messages, exiting...", slog.Any("err", err))
}

if len(getRecordsOutput.Records) > 0 {
Expand All @@ -125,7 +125,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard)
shardIterator = getRecordsOutput.NextShardIterator
if shardIterator == nil {
// This means this shard has been fully processed, let's add it to our processed list.
slog.With("shardId", *shard.ShardId).Info("shard has been fully processed, adding it to the processed list...")
slog.Info("Shard has been fully processed, adding it to the processed list...", slog.String("shardId", *shard.ShardId))
s.storage.SetShardProcessed(*shard.ShardId)
}
}
Expand Down
12 changes: 6 additions & 6 deletions sources/dynamodb/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *Store) scanFilesOverBucket() error {
}

for _, file := range files {
slog.With("fileName", *file.Key).Info("discovered file, adding to the processing queue...")
slog.Info("Discovered file, adding to the processing queue...", slog.String("fileName", *file.Key))
}

s.cfg.SnapshotSettings.SpecifiedFiles = files
Expand All @@ -46,7 +46,7 @@ func (s *Store) streamAndPublish(ctx context.Context) error {
slog.String("fileName", *file.Key),
}

slog.With(logFields...).Info("processing file...")
slog.Info("Processing file...", logFields...)
ch := make(chan dynamodb.ItemResponse)
go func() {
if err := s.s3Client.StreamJsonGzipFile(file, ch); err != nil {
Expand All @@ -58,22 +58,22 @@ func (s *Store) streamAndPublish(ctx context.Context) error {
for msg := range ch {
dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName)
if err != nil {
logger.Fatal("failed to cast message from DynamoDB", slog.Any("err", err), slog.Any("msg", msg))
logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err), slog.Any("msg", msg))
}

kafkaMsg, err := dynamoMsg.KafkaMessage(ctx)
if err != nil {
logger.Fatal("failed to cast message from DynamoDB", slog.Any("err", err))
logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err))
}

kafkaMsgs = append(kafkaMsgs, kafkaMsg)
}

if err = kafkalib.NewBatch(kafkaMsgs, s.batchSize).Publish(ctx); err != nil {
logger.Fatal("failed to publish messages, exiting...", slog.Any("err", err))
logger.Fatal("Failed to publish messages, exiting...", slog.Any("err", err))
}

slog.With(logFields...).Info("successfully processed file...")
slog.Info("Successfully processed file...", logFields...)
}

return nil
Expand Down

0 comments on commit 933fa28

Please sign in to comment.