diff --git a/pkg/storage/pebble_key_schema.go b/pkg/storage/pebble_key_schema.go index e3a03de2ee4d..f527db3c6add 100644 --- a/pkg/storage/pebble_key_schema.go +++ b/pkg/storage/pebble_key_schema.go @@ -471,15 +471,6 @@ func (ks *cockroachKeySeeker) seekGEOnSuffix(index int, seekSuffix []byte) (row // Invariant: f(l-1) == false, f(u) == true. l := index u := ks.roachKeyChanged.SeekSetBitGE(index + 1) - if ks.suffixTypes&hasEmptySuffixes != 0 { - // Check if the key at index has an empty suffix. Since empty suffixes sort - // first, this is the only key in the range [index, u) which could have an - // empty suffix. - if len(ks.untypedVersions.At(index)) == 0 && ks.mvccWallTimes.At(index) == 0 && ks.mvccLogical.At(index) == 0 { - // Our seek suffix is not empty, so it must come after the empty suffix. - l = index + 1 - } - } for l < u { m := int(uint(l+u) >> 1) // avoid overflow when computing m @@ -488,9 +479,10 @@ func (ks *cockroachKeySeeker) seekGEOnSuffix(index int, seekSuffix []byte) (row if len(mVer) == 0 { wallTime := ks.mvccWallTimes.At(m) logicalTime := uint32(ks.mvccLogical.At(m)) - if buildutil.CrdbTestBuild && wallTime == 0 && logicalTime == 0 { - // This can only happen for row at index. - panic(errors.AssertionFailedf("unexpected empty suffix at %d (l=%d, u=%d)", m, l, u)) + if wallTime == 0 && logicalTime == 0 { + // This row has an empty suffix. Since the seek suffix is not empty, it comes after. + l = m + 1 + continue } // Note: this path is not very performance sensitive: blocks that mix MVCC diff --git a/pkg/storage/pebble_key_schema_test.go b/pkg/storage/pebble_key_schema_test.go index 32b1af7e585f..b842a3795c26 100644 --- a/pkg/storage/pebble_key_schema_test.go +++ b/pkg/storage/pebble_key_schema_test.go @@ -340,6 +340,8 @@ func TestKeySchema_RandomKeys(t *testing.T) { var it colblk.DataBlockIter it.InitOnce(&keySchema, EngineKeyCompare, EngineKeySplit, nil) require.NoError(t, it.Init(&dec, block.NoTransforms)) + // Ensure that a scan across the block finds all the relevant keys. + var valBuf []byte for k, kv := 0, it.First(); kv != nil; k, kv = k+1, it.Next() { require.True(t, EngineKeyEqual(keys[k], kv.K.UserKey)) require.Zero(t, EngineKeyCompare(keys[k], kv.K.UserKey)) @@ -352,6 +354,42 @@ func TestKeySchema_RandomKeys(t *testing.T) { t.Fatalf("key %q is longer than original key %q", kv.K.UserKey, keys[k]) } checkEngineKey(kv.K.UserKey) + + // We write keys[k] as the value too, so check that it's verbatim equal. + value, callerOwned, err := kv.V.Value(valBuf) + require.NoError(t, err) + require.Equal(t, keys[k], value) + if callerOwned { + valBuf = value + } + } + // Ensure that seeking to each key finds the key. + for i := range keys { + kv := it.SeekGE(keys[i], 0) + require.True(t, EngineKeyEqual(keys[i], kv.K.UserKey)) + require.Zero(t, EngineKeyCompare(keys[i], kv.K.UserKey)) + } + // Ensure seeking to just the prefix of each key finds a key with the same + // prefix. + for i := range keys { + si := EngineKeySplit(keys[i]) + kv := it.SeekGE(keys[i][:si], 0) + require.True(t, EngineKeyEqual(keys[i][:si], pebble.Split(EngineKeySplit).Prefix(kv.K.UserKey))) } + // Ensure seeking to the key but in random order finds the key. + for _, i := range rng.Perm(len(keys)) { + kv := it.SeekGE(keys[i], 0) + require.True(t, EngineKeyEqual(keys[i], kv.K.UserKey)) + require.Zero(t, EngineKeyCompare(keys[i], kv.K.UserKey)) + + // We write keys[k] as the value too, so check that it's verbatim equal. + value, callerOwned, err := kv.V.Value(valBuf) + require.NoError(t, err) + require.Equal(t, keys[i], value) + if callerOwned { + valBuf = value + } + } + require.NoError(t, it.Close()) }