Skip to content

Commit

Permalink
[mysql,postgres] Update error messages (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 6, 2024
1 parent e229140 commit cc71d3e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
5 changes: 2 additions & 3 deletions sources/mysql/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter,
logger.Info("Table does not contain any rows, skipping...")
return nil
} else {
return fmt.Errorf("failed to build scanner for table %s: %w", tableCfg.Name, err)
return fmt.Errorf("failed to build Debezium transformer for table %s: %w", tableCfg.Name, err)
}
}

logger.Info("Scanning table", slog.Any("batchSize", tableCfg.BatchSize))
logger.Info("Scanning table...", slog.Any("batchSize", tableCfg.GetBatchSize()))
count, err := writer.WriteIterator(ctx, dbzTransformer)
if err != nil {
return fmt.Errorf("failed to snapshot for table %s: %w", tableCfg.Name, err)
Expand All @@ -75,6 +75,5 @@ func (s Source) snapshotTable(ctx context.Context, writer kafkalib.BatchWriter,
slog.Int("scannedTotal", count),
slog.Duration("totalDuration", time.Since(snapshotStartTime)),
)

return nil
}
4 changes: 2 additions & 2 deletions sources/postgres/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func (s *Source) Run(ctx context.Context, writer kafkalib.BatchWriter) error {
logger.Info("Table does not contain any rows, skipping...")
continue
} else {
return fmt.Errorf("failed to build scanner for table %s: %w", tableCfg.Name, err)
return fmt.Errorf("failed to build Debezium transformer for table %s: %w", tableCfg.Name, err)
}
}

logger.Info("Scanning table", slog.Any("batchSize", tableCfg.GetBatchSize()))
logger.Info("Scanning table...", slog.Any("batchSize", tableCfg.GetBatchSize()))
count, err := writer.WriteIterator(ctx, dbzTransformer)
if err != nil {
return fmt.Errorf("failed to snapshot for table %s: %w", tableCfg.Name, err)
Expand Down

0 comments on commit cc71d3e

Please sign in to comment.