diff --git a/config/config.go b/config/config.go index 3072a6d5..90d2b314 100644 --- a/config/config.go +++ b/config/config.go @@ -3,10 +3,11 @@ package config import ( "context" "fmt" - "gopkg.in/yaml.v2" "log" "os" - + + "gopkg.in/yaml.v2" + "github.com/artie-labs/reader/constants" ) @@ -87,17 +88,17 @@ func (s *Settings) Validate() error { func ReadConfig(fp string) (*Settings, error) { bytes, err := os.ReadFile(fp) if err != nil { - log.Fatalf("failed to read config file, err: %v", err) + log.Fatalf("Failed to read config file, err: %v", err) } var settings Settings err = yaml.Unmarshal(bytes, &settings) if err != nil { - log.Fatalf("failed to unmarshal config file, err: %v", err) + log.Fatalf("Failed to unmarshal config file, err: %v", err) } if err = settings.Validate(); err != nil { - log.Fatalf("failed to validate config file, err: %v", err) + log.Fatalf("Failed to validate config file, err: %v", err) } settings.Kafka.GenerateDefault() diff --git a/lib/kafkalib/batch.go b/lib/kafkalib/batch.go index 28709098..dea18c94 100644 --- a/lib/kafkalib/batch.go +++ b/lib/kafkalib/batch.go @@ -85,11 +85,11 @@ func (b *Batch) Publish(ctx context.Context) error { } sleepDuration := time.Duration(jitter.JitterMs(RetryDelayMs, attempts)) * time.Millisecond - slog.With( + slog.Warn("Failed to publish message, jitter sleeping before retrying...", slog.Any("err", err), slog.Int("attempts", attempts), slog.Int("maxAttempts", MaxRetries), - ).Warn("failed to publish message, jitter sleeping before retrying...") + ) time.Sleep(sleepDuration) } diff --git a/lib/kafkalib/kafka.go b/lib/kafkalib/kafka.go index 14e072f6..05720fe2 100644 --- a/lib/kafkalib/kafka.go +++ b/lib/kafkalib/kafka.go @@ -19,12 +19,12 @@ import ( func FromContext(ctx context.Context) *kafka.Writer { kafkaVal := ctx.Value(constants.KafkaKey) if kafkaVal == nil { - logger.Fatal("kafka is not set in context.Context") + logger.Fatal("Kafka is not set in context.Context") } kafkaWriter, isOk := kafkaVal.(*kafka.Writer) if !isOk { - logger.Fatal("kafka writer is not type *kafka.Writer") + logger.Fatal("Kafka writer is not type *kafka.Writer") } return kafkaWriter @@ -37,7 +37,7 @@ func InjectIntoContext(ctx context.Context) context.Context { logger.Fatal("Kafka configuration is not set") } - slog.With("url", strings.Split(cfg.Kafka.BootstrapServers, ",")).Info("setting bootstrap url") + slog.Info("Setting bootstrap url", slog.Any("url", strings.Split(cfg.Kafka.BootstrapServers, ","))) writer := &kafka.Writer{ Addr: kafka.TCP(strings.Split(cfg.Kafka.BootstrapServers, ",")...), diff --git a/lib/mtr/mtr.go b/lib/mtr/mtr.go index fb33c212..d2ea1adc 100644 --- a/lib/mtr/mtr.go +++ b/lib/mtr/mtr.go @@ -19,12 +19,12 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string, address := DefaultAddr if !stringutil.Empty(host, port) { address = fmt.Sprintf("%s:%s", host, port) - slog.With("address", address).Info("overriding telemetry address with env vars") + slog.Info("Overriding telemetry address with env vars", slog.String("address", address)) } datadogClient, err := statsd.New(address) if err != nil { - logger.Fatal("failed to create datadog client", slog.Any("err", err)) + logger.Fatal("Failed to create datadog client", slog.Any("err", err)) } datadogClient.Tags = tags @@ -38,12 +38,12 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string, func FromContext(ctx context.Context) Client { metricsClientVal := ctx.Value(constants.MtrKey) if metricsClientVal == nil { - logger.Fatal("metrics client is nil") + logger.Fatal("Metrics client is nil") } metricsClient, isOk := metricsClientVal.(Client) if !isOk { - logger.Fatal("metrics client is not mtr.Client type") + logger.Fatal("Metrics client is not mtr.Client type") } return metricsClient diff --git a/lib/ttlmap/ttlmap.go b/lib/ttlmap/ttlmap.go index ea21fbf6..f4cd2106 100644 --- a/lib/ttlmap/ttlmap.go +++ b/lib/ttlmap/ttlmap.go @@ -45,7 +45,7 @@ func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval } if err := t.loadFromFile(); err != nil { - slog.Warn("failed to load ttlmap from memory, starting a new one...", slog.Any("err", err)) + slog.Warn("Failed to load ttlmap from memory, starting a new one...", slog.Any("err", err)) } t.cleanupTicker = time.NewTicker(cleanupInterval) @@ -95,7 +95,7 @@ func (t *TTLMap) cleanUpAndFlushRoutine() { t.cleanup() case <-t.flushTicker.C: if err := t.flush(); err != nil { - logger.Fatal("failed to flush", slog.Any("err", err)) + logger.Fatal("Failed to flush", slog.Any("err", err)) } case <-t.closeChan: return diff --git a/main.go b/main.go index 66cb9b34..52888745 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,7 @@ func main() { cfg, err := config.ReadConfig(configFilePath) if err != nil { - logger.Fatal("failed to read config file", slog.Any("err", err)) + logger.Fatal("Failed to read config file", slog.Any("err", err)) } _logger, usingSentry := logger.NewLogger(cfg) @@ -34,7 +34,7 @@ func main() { ctx := config.InjectIntoContext(context.Background(), cfg) ctx = kafkalib.InjectIntoContext(ctx) if cfg.Metrics != nil { - slog.Info("injecting datadog") + slog.Info("Injecting datadog") ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5) } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index b7d4cf70..602b4784 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -70,11 +70,11 @@ func Load(ctx context.Context) *Store { func (s *Store) Run(ctx context.Context) { if s.cfg.Snapshot { if err := s.scanFilesOverBucket(); err != nil { - logger.Fatal("scanning files over bucket failed", slog.Any("err", err)) + logger.Fatal("Scanning files over bucket failed", slog.Any("err", err)) } if err := s.streamAndPublish(ctx); err != nil { - logger.Fatal("stream and publish failed", slog.Any("err", err)) + logger.Fatal("Stream and publish failed", slog.Any("err", err)) } slog.Info("Finished snapshotting all the files") diff --git a/sources/dynamodb/shard.go b/sources/dynamodb/shard.go index 0dd53176..3ec46a46 100644 --- a/sources/dynamodb/shard.go +++ b/sources/dynamodb/shard.go @@ -31,11 +31,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) // If no one is processing it, let's mark it as being processed. s.storage.SetShardProcessing(*shard.ShardId) if s.storage.GetShardProcessed(*shard.ShardId) { - slog.With("shardId", *shard.ShardId).Info("shard has been processed, skipping...") + slog.Info("Shard has been processed, skipping...", slog.String("shardId", *shard.ShardId)) return } - slog.With("shardId", *shard.ShardId).Info("processing shard...") + slog.Info("Processing shard...", slog.String("shardId", *shard.ShardId)) iteratorType := "TRIM_HORIZON" var startingSequenceNumber string @@ -56,11 +56,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) iteratorOutput, err := s.streams.GetShardIterator(iteratorInput) if err != nil { - slog.With( + slog.Warn("Failed to get shard iterator...", slog.Any("err", err), slog.String("streamArn", s.streamArn), slog.String("shardId", *shard.ShardId), - ).Warn("failed to get shard iterator...") + ) return } @@ -74,11 +74,11 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) getRecordsOutput, err := s.streams.GetRecords(getRecordsInput) if err != nil { - slog.With( + slog.Warn("Failed to get records from shard iterator...", slog.Any("err", err), slog.String("streamArn", s.streamArn), slog.String("shardId", *shard.ShardId), - ).Warn("failed to get records from shard iterator...") + ) break } @@ -86,7 +86,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) for _, record := range getRecordsOutput.Records { msg, err := dynamo.NewMessage(record, s.tableName) if err != nil { - logger.Fatal("failed to cast message from DynamoDB", + logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err), slog.String("streamArn", s.streamArn), slog.String("shardId", *shard.ShardId), @@ -96,7 +96,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) message, err := msg.KafkaMessage(ctx) if err != nil { - logger.Fatal("failed to cast message from DynamoDB", + logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err), slog.String("streamArn", s.streamArn), slog.String("shardId", *shard.ShardId), @@ -108,7 +108,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) } if err = kafkalib.NewBatch(messages, s.batchSize).Publish(ctx); err != nil { - logger.Fatal("failed to publish messages, exiting...", slog.Any("err", err)) + logger.Fatal("Failed to publish messages, exiting...", slog.Any("err", err)) } if len(getRecordsOutput.Records) > 0 { @@ -125,7 +125,7 @@ func (s *Store) processShard(ctx context.Context, shard *dynamodbstreams.Shard) shardIterator = getRecordsOutput.NextShardIterator if shardIterator == nil { // This means this shard has been fully processed, let's add it to our processed list. - slog.With("shardId", *shard.ShardId).Info("shard has been fully processed, adding it to the processed list...") + slog.Info("Shard has been fully processed, adding it to the processed list...", slog.String("shardId", *shard.ShardId)) s.storage.SetShardProcessed(*shard.ShardId) } } diff --git a/sources/dynamodb/snapshot.go b/sources/dynamodb/snapshot.go index c4b8e8ee..66ac2e17 100644 --- a/sources/dynamodb/snapshot.go +++ b/sources/dynamodb/snapshot.go @@ -28,7 +28,7 @@ func (s *Store) scanFilesOverBucket() error { } for _, file := range files { - slog.With("fileName", *file.Key).Info("discovered file, adding to the processing queue...") + slog.Info("Discovered file, adding to the processing queue...", slog.String("fileName", *file.Key)) } s.cfg.SnapshotSettings.SpecifiedFiles = files @@ -46,7 +46,7 @@ func (s *Store) streamAndPublish(ctx context.Context) error { slog.String("fileName", *file.Key), } - slog.With(logFields...).Info("processing file...") + slog.Info("Processing file...", logFields...) ch := make(chan dynamodb.ItemResponse) go func() { if err := s.s3Client.StreamJsonGzipFile(file, ch); err != nil { @@ -58,22 +58,22 @@ func (s *Store) streamAndPublish(ctx context.Context) error { for msg := range ch { dynamoMsg, err := dynamo.NewMessageFromExport(msg, keys, s.tableName) if err != nil { - logger.Fatal("failed to cast message from DynamoDB", slog.Any("err", err), slog.Any("msg", msg)) + logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err), slog.Any("msg", msg)) } kafkaMsg, err := dynamoMsg.KafkaMessage(ctx) if err != nil { - logger.Fatal("failed to cast message from DynamoDB", slog.Any("err", err)) + logger.Fatal("Failed to cast message from DynamoDB", slog.Any("err", err)) } kafkaMsgs = append(kafkaMsgs, kafkaMsg) } if err = kafkalib.NewBatch(kafkaMsgs, s.batchSize).Publish(ctx); err != nil { - logger.Fatal("failed to publish messages, exiting...", slog.Any("err", err)) + logger.Fatal("Failed to publish messages, exiting...", slog.Any("err", err)) } - slog.With(logFields...).Info("successfully processed file...") + slog.Info("Successfully processed file...", logFields...) } return nil