diff --git a/sources/mysql/snapshot.go b/sources/mysql/snapshot.go index dcf63340..ac650c08 100644 --- a/sources/mysql/snapshot.go +++ b/sources/mysql/snapshot.go @@ -61,11 +61,11 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, logger.Info("Table does not contain any rows, skipping...") return nil } else { - return fmt.Errorf("failed to build scanner for table %s: %w", tableCfg.Name, err) + return fmt.Errorf("failed to build Debezium transformer for table %s: %w", tableCfg.Name, err) } } - logger.Info("Scanning table", slog.Any("batchSize", tableCfg.BatchSize)) + logger.Info("Scanning table...", slog.Any("batchSize", tableCfg.GetBatchSize())) count, err := writer.WriteIterator(ctx, dbzTransformer) if err != nil { return fmt.Errorf("failed to snapshot for table %s: %w", tableCfg.Name, err) @@ -75,6 +75,5 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter, slog.Int("scannedTotal", count), slog.Duration("totalDuration", time.Since(snapshotStartTime)), ) - return nil } diff --git a/sources/postgres/snapshot.go b/sources/postgres/snapshot.go index f732d309..bf79a22d 100644 --- a/sources/postgres/snapshot.go +++ b/sources/postgres/snapshot.go @@ -54,11 +54,11 @@ func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error { logger.Info("Table does not contain any rows, skipping...") continue } else { - return fmt.Errorf("failed to build scanner for table %s: %w", tableCfg.Name, err) + return fmt.Errorf("failed to build Debezium transformer for table %s: %w", tableCfg.Name, err) } } - logger.Info("Scanning table", slog.Any("batchSize", tableCfg.GetBatchSize())) + logger.Info("Scanning table...", slog.Any("batchSize", tableCfg.GetBatchSize())) count, err := writer.WriteIterator(ctx, dbzTransformer) if err != nil { return fmt.Errorf("failed to snapshot for table %s: %w", tableCfg.Name, err)