Skip to content

Commit

Permalink
[rdbms] Detect if Scanner is stuck in a loop (#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Mar 18, 2024
1 parent 7e6d149 commit 2d3c5db
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 31 deletions.
8 changes: 5 additions & 3 deletions lib/rdbms/primary_key/primary_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ func (k *Keys) LoadValues(startingValues, endingValues []any) error {
return nil
}

func (k *Keys) UpdateStartingValue(keyName string, startingVal any) error {
// UpdateStartingValue sets the starting value for a primary key and returns whether the value changed.
func (k *Keys) UpdateStartingValue(keyName string, startingVal any) (bool, error) {
idx := slices.IndexFunc(k.keys, func(x Key) bool { return x.Name == keyName })
if idx < 0 {
return fmt.Errorf("no key named %s", keyName)
return false, fmt.Errorf("no key named %s", keyName)
}

changed := !equal(k.keys[idx].StartingValue, startingVal)
k.keys[idx].StartingValue = startingVal
return nil
return changed, nil
}

func (k *Keys) KeyNames() []string {
Expand Down
68 changes: 41 additions & 27 deletions lib/rdbms/primary_key/primary_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

func TestNewKeys(t *testing.T) {
// ensure upsert doesn't mutate original arguments to `NewKeys`
{
keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}}
keys := NewKeys(keysArray)
assert.NoError(t, keys.UpdateStartingValue("foo", "new starting value"))
assert.Equal(t, "foo", keys.keys[0].Name)
assert.Equal(t, "new starting value", keys.keys[0].StartingValue)
assert.Equal(t, "foo", keysArray[0].Name)
assert.Equal(t, 20, keysArray[0].StartingValue)
}
keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}}
keys := NewKeys(keysArray)
changed, err := keys.UpdateStartingValue("foo", "new starting value")
assert.NoError(t, err)
assert.True(t, changed)
assert.Equal(t, "foo", keys.keys[0].Name)
assert.Equal(t, "new starting value", keys.keys[0].StartingValue)
assert.Equal(t, "foo", keysArray[0].Name)
assert.Equal(t, 20, keysArray[0].StartingValue)
}

func TestPrimaryKeys_LoadValues(t *testing.T) {
Expand Down Expand Up @@ -94,50 +94,64 @@ func TestPrimaryKeys_LoadValues(t *testing.T) {
func TestKeys_UpdateStartingValue(t *testing.T) {
type _tc struct {
name string
keys *Keys
keys []Key
keyName string
startingVal any

expectedKeys []Key
expectedErr string
expectedChanged bool
expectedKeys []Key
expectedErr string
}

startVal2 := "Start2"

tcs := []_tc{
{
name: "Key doesn't exist",
keys: &Keys{
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
keyName: "Key2",
startingVal: startVal2,
expectedErr: "no key named Key2",
},
{
name: "Update existing key",
keys: &Keys{
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
name: "Update existing key with existing start value",
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
{Name: "Key2", StartingValue: 2, EndingValue: 2},
},
keyName: "Key1",
startingVal: startVal2,
keyName: "Key1",
startingVal: "Start1",
expectedChanged: false,
expectedKeys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
{Name: "Key2", StartingValue: 2, EndingValue: 2},
},
},
{
name: "Update existing key with new value",
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
keyName: "Key1",
startingVal: startVal2,
expectedChanged: true,
expectedKeys: []Key{
{Name: "Key1", StartingValue: "Start2", EndingValue: "End1"},
{Name: "Key1", StartingValue: startVal2, EndingValue: "End1"},
},
},
}

for _, tc := range tcs {
err := tc.keys.UpdateStartingValue(tc.keyName, tc.startingVal)
keys := &Keys{tc.keys}
changed, err := keys.UpdateStartingValue(tc.keyName, tc.startingVal)
if tc.expectedErr != "" {
assert.ErrorContains(t, err, tc.expectedErr, tc.name)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedKeys, tc.keys.keys, tc.name)
assert.NoError(t, err, tc.name)
assert.Equal(t, tc.expectedKeys, keys.keys, tc.name)
assert.Equal(t, tc.expectedChanged, changed, tc.name)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion lib/rdbms/scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,28 @@ func (s *Scanner) Next() ([]map[string]any, error) {
return nil, err
}

wasFirstBatch := s.isFirstBatch
s.isFirstBatch = false

if len(rows) == 0 || s.primaryKeys.IsExhausted() {
s.done = true
} else {
// Update the starting keys so that the next scan will pick off where we last left off.
lastRow := rows[len(rows)-1]
var startingValuesChanged bool
for _, pk := range s.primaryKeys.Keys() {
if err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name]); err != nil {
changed, err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name])
if err != nil {
s.done = true
return nil, err
}
startingValuesChanged = startingValuesChanged || changed
}

if !wasFirstBatch && !startingValuesChanged {
// Detect situations where the scanner is stuck in a loop.
// Typically the second batch will use a > comparison instead of a >= comparison for the lower bound.
return nil, fmt.Errorf("primarky key start values did not change, scanner is likely stuck in a loop")
}
}

Expand Down

0 comments on commit 2d3c5db

Please sign in to comment.