From 2d3c5db5b391766bd6a362b69a87d9f42f974b05 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 18 Mar 2024 15:45:37 -0700 Subject: [PATCH] [rdbms] Detect if `Scanner` is stuck in a loop (#313) --- lib/rdbms/primary_key/primary_keys.go | 8 ++- lib/rdbms/primary_key/primary_keys_test.go | 68 +++++++++++++--------- lib/rdbms/scan/scan.go | 13 ++++- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/lib/rdbms/primary_key/primary_keys.go b/lib/rdbms/primary_key/primary_keys.go index 54b1a52f..69bd74ca 100644 --- a/lib/rdbms/primary_key/primary_keys.go +++ b/lib/rdbms/primary_key/primary_keys.go @@ -60,14 +60,16 @@ func (k *Keys) LoadValues(startingValues, endingValues []any) error { return nil } -func (k *Keys) UpdateStartingValue(keyName string, startingVal any) error { +// UpdateStartingValue sets the starting value for a primary key and returns whether the value changed. +func (k *Keys) UpdateStartingValue(keyName string, startingVal any) (bool, error) { idx := slices.IndexFunc(k.keys, func(x Key) bool { return x.Name == keyName }) if idx < 0 { - return fmt.Errorf("no key named %s", keyName) + return false, fmt.Errorf("no key named %s", keyName) } + changed := !equal(k.keys[idx].StartingValue, startingVal) k.keys[idx].StartingValue = startingVal - return nil + return changed, nil } func (k *Keys) KeyNames() []string { diff --git a/lib/rdbms/primary_key/primary_keys_test.go b/lib/rdbms/primary_key/primary_keys_test.go index 1a54b0f1..feb4b257 100644 --- a/lib/rdbms/primary_key/primary_keys_test.go +++ b/lib/rdbms/primary_key/primary_keys_test.go @@ -8,15 +8,15 @@ import ( func TestNewKeys(t *testing.T) { // ensure upsert doesn't mutate original arguments to `NewKeys` - { - keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}} - keys := NewKeys(keysArray) - assert.NoError(t, keys.UpdateStartingValue("foo", "new starting value")) - assert.Equal(t, "foo", keys.keys[0].Name) - assert.Equal(t, "new starting value", keys.keys[0].StartingValue) - assert.Equal(t, "foo", keysArray[0].Name) - assert.Equal(t, 20, keysArray[0].StartingValue) - } + keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}} + keys := NewKeys(keysArray) + changed, err := keys.UpdateStartingValue("foo", "new starting value") + assert.NoError(t, err) + assert.True(t, changed) + assert.Equal(t, "foo", keys.keys[0].Name) + assert.Equal(t, "new starting value", keys.keys[0].StartingValue) + assert.Equal(t, "foo", keysArray[0].Name) + assert.Equal(t, 20, keysArray[0].StartingValue) } func TestPrimaryKeys_LoadValues(t *testing.T) { @@ -94,12 +94,13 @@ func TestPrimaryKeys_LoadValues(t *testing.T) { func TestKeys_UpdateStartingValue(t *testing.T) { type _tc struct { name string - keys *Keys + keys []Key keyName string startingVal any - expectedKeys []Key - expectedErr string + expectedChanged bool + expectedKeys []Key + expectedErr string } startVal2 := "Start2" @@ -107,37 +108,50 @@ func TestKeys_UpdateStartingValue(t *testing.T) { tcs := []_tc{ { name: "Key doesn't exist", - keys: &Keys{ - keys: []Key{ - {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, - }, + keys: []Key{ + {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, }, keyName: "Key2", startingVal: startVal2, expectedErr: "no key named Key2", }, { - name: "Update existing key", - keys: &Keys{ - keys: []Key{ - {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, - }, + name: "Update existing key with existing start value", + keys: []Key{ + {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, + {Name: "Key2", StartingValue: 2, EndingValue: 2}, }, - keyName: "Key1", - startingVal: startVal2, + keyName: "Key1", + startingVal: "Start1", + expectedChanged: false, + expectedKeys: []Key{ + {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, + {Name: "Key2", StartingValue: 2, EndingValue: 2}, + }, + }, + { + name: "Update existing key with new value", + keys: []Key{ + {Name: "Key1", StartingValue: "Start1", EndingValue: "End1"}, + }, + keyName: "Key1", + startingVal: startVal2, + expectedChanged: true, expectedKeys: []Key{ - {Name: "Key1", StartingValue: "Start2", EndingValue: "End1"}, + {Name: "Key1", StartingValue: startVal2, EndingValue: "End1"}, }, }, } for _, tc := range tcs { - err := tc.keys.UpdateStartingValue(tc.keyName, tc.startingVal) + keys := &Keys{tc.keys} + changed, err := keys.UpdateStartingValue(tc.keyName, tc.startingVal) if tc.expectedErr != "" { assert.ErrorContains(t, err, tc.expectedErr, tc.name) } else { - assert.NoError(t, err) - assert.Equal(t, tc.expectedKeys, tc.keys.keys, tc.name) + assert.NoError(t, err, tc.name) + assert.Equal(t, tc.expectedKeys, keys.keys, tc.name) + assert.Equal(t, tc.expectedChanged, changed, tc.name) } } } diff --git a/lib/rdbms/scan/scan.go b/lib/rdbms/scan/scan.go index 5b161f34..947af1a1 100644 --- a/lib/rdbms/scan/scan.go +++ b/lib/rdbms/scan/scan.go @@ -87,6 +87,7 @@ func (s *Scanner) Next() ([]map[string]any, error) { return nil, err } + wasFirstBatch := s.isFirstBatch s.isFirstBatch = false if len(rows) == 0 || s.primaryKeys.IsExhausted() { @@ -94,10 +95,20 @@ func (s *Scanner) Next() ([]map[string]any, error) { } else { // Update the starting keys so that the next scan will pick off where we last left off. lastRow := rows[len(rows)-1] + var startingValuesChanged bool for _, pk := range s.primaryKeys.Keys() { - if err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name]); err != nil { + changed, err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name]) + if err != nil { + s.done = true return nil, err } + startingValuesChanged = startingValuesChanged || changed + } + + if !wasFirstBatch && !startingValuesChanged { + // Detect situations where the scanner is stuck in a loop. + // Typically the second batch will use a > comparison instead of a >= comparison for the lower bound. + return nil, fmt.Errorf("primarky key start values did not change, scanner is likely stuck in a loop") } }