From f84e20daa30b865f4bdfc626258879c08166108e Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 12 Dec 2024 10:20:36 +0100 Subject: [PATCH] Initialize TargetPaginationKeys in constructor --- data_iterator.go | 9 ++++----- ferry.go | 3 ++- test/go/data_iterator_sorter_test.go | 2 ++ test/go/data_iterator_test.go | 1 + 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/data_iterator.go b/data_iterator.go index 6b8b3597..5621a24e 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -20,7 +20,7 @@ type DataIterator struct { StateTracker *StateTracker TableSorter DataIteratorSorter - targetPaginationKeys *sync.Map + TargetPaginationKeys *sync.Map batchListeners []func(*RowBatch) error doneListeners []func() error logger *logrus.Entry @@ -33,7 +33,6 @@ type TableMaxPaginationKey struct { func (d *DataIterator) Run(tables []*TableSchema) { d.logger = logrus.WithField("tag", "data_iterator") - d.targetPaginationKeys = &sync.Map{} // If a state tracker is not provided, then the caller doesn't care about // tracking state. However, some methods are still useful so we initialize @@ -59,7 +58,7 @@ func (d *DataIterator) Run(tables []*TableSchema) { // We don't need to reiterate those tables as it has already been done. delete(tablesWithData, table) } else { - d.targetPaginationKeys.Store(tableName, maxPaginationKey) + d.TargetPaginationKeys.Store(tableName, maxPaginationKey) } } @@ -79,9 +78,9 @@ func (d *DataIterator) Run(tables []*TableSchema) { logger := d.logger.WithField("table", table.String()) - targetPaginationKeyInterface, found := d.targetPaginationKeys.Load(table.String()) + targetPaginationKeyInterface, found := d.TargetPaginationKeys.Load(table.String()) if !found { - err := fmt.Errorf("%s not found in targetPaginationKeys, this is likely a programmer error", table.String()) + err := fmt.Errorf("%s not found in TargetPaginationKeys, this is likely a programmer error", table.String()) logger.WithError(err).Error("this is definitely a bug") d.ErrorHandler.Fatal("data_iterator", err) return diff --git a/ferry.go b/ferry.go index c933443e..3718931b 100644 --- a/ferry.go +++ b/ferry.go @@ -111,6 +111,7 @@ func (f *Ferry) NewDataIterator() *DataIterator { ReadRetries: f.Config.DBReadRetries, }, StateTracker: f.StateTracker, + TargetPaginationKeys: &sync.Map{}, } if f.CopyFilter != nil { @@ -993,7 +994,7 @@ func (f *Ferry) Progress() *Progress { s.Tables = make(map[string]TableProgress) targetPaginationKeys := make(map[string]uint64) - f.DataIterator.targetPaginationKeys.Range(func(k, v interface{}) bool { + f.DataIterator.TargetPaginationKeys.Range(func(k, v interface{}) bool { targetPaginationKeys[k.(string)] = v.(uint64) return true }) diff --git a/test/go/data_iterator_sorter_test.go b/test/go/data_iterator_sorter_test.go index 78199285..8a32f9a5 100644 --- a/test/go/data_iterator_sorter_test.go +++ b/test/go/data_iterator_sorter_test.go @@ -3,6 +3,7 @@ package test import ( "fmt" "sort" + "sync" "testing" "github.com/stretchr/testify/suite" @@ -58,6 +59,7 @@ func (t *DataIteratorSorterTestSuite) SetupTest() { t.dataIterator = &ghostferry.DataIterator{ DB: t.Ferry.SourceDB, ErrorHandler: t.Ferry.ErrorHandler, + TargetPaginationKeys: &sync.Map{}, } } diff --git a/test/go/data_iterator_test.go b/test/go/data_iterator_test.go index 6d510423..f30d0cfb 100644 --- a/test/go/data_iterator_test.go +++ b/test/go/data_iterator_test.go @@ -57,6 +57,7 @@ func (this *DataIteratorTestSuite) SetupTest() { ReadRetries: config.DBReadRetries, }, StateTracker: ghostferry.NewStateTracker(config.DataIterationConcurrency * 10), + TargetPaginationKeys: &sync.Map{}, } this.receivedRows = make(map[string][]ghostferry.RowData, 0)