diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 667400248363..c26653ac3cfa 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -50,7 +50,8 @@ func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) { } func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) { - // TODO: Add (in tests?) verification whether k>b[len(b)] + // putSeq is only be called for the data in the Key bucket. The keys + // in the Key bucket should be monotonically increasing revisions. txw.putInternal(bucket, k, v) } @@ -80,6 +81,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb, ok := txr.buckets[k] if !ok { delete(txw.buckets, k) + if seq, ok := txw.bucket2seq[k]; ok && !seq { + wb.dedupe() + } txr.buckets[k] = wb continue } @@ -124,7 +128,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer { bufVersion: 0, } for bucketName, bucket := range txr.txBuffer.buckets { - txrCopy.txBuffer.buckets[bucketName] = bucket.Copy() + txrCopy.txBuffer.buckets[bucketName] = bucket.CopyUsed() } return txrCopy } @@ -148,7 +152,7 @@ func newBucketBuffer() *bucketBuffer { func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 } idx := sort.Search(bb.used, f) - if idx < 0 { + if idx < 0 || idx >= bb.used { return nil, nil } if len(endKey) == 0 { @@ -201,10 +205,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { return } + bb.dedupe() +} +// dedupe removes duplicates, using only newest update +func (bb *bucketBuffer) dedupe() { + if bb.used <= 1 { + return + } sort.Stable(bb) - - // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { @@ -221,11 +230,11 @@ func (bb *bucketBuffer) Less(i, j int) bool { } func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] } -func (bb *bucketBuffer) Copy() *bucketBuffer { +func (bb *bucketBuffer) CopyUsed() *bucketBuffer { bbCopy := bucketBuffer{ - buf: make([]kv, len(bb.buf)), + buf: make([]kv, bb.used), used: bb.used, } - copy(bbCopy.buf, bb.buf) + copy(bbCopy.buf, bb.buf[:bb.used]) return &bbCopy } diff --git a/server/mvcc/backend/tx_buffer_test.go b/server/mvcc/backend/tx_buffer_test.go new file mode 100644 index 000000000000..f194a3574eb4 --- /dev/null +++ b/server/mvcc/backend/tx_buffer_test.go @@ -0,0 +1,142 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_bucketBuffer_CopyUsed_After_Add(t *testing.T) { + bb := &bucketBuffer{buf: make([]kv, 10), used: 0} + for i := 0; i < 20; i++ { + k := fmt.Sprintf("key%d", i) + v := fmt.Sprintf("val%d", i) + bb.add([]byte(k), []byte(v)) + bbCopy := bb.CopyUsed() + assert.Equal(t, bb.used, bbCopy.used) + assert.Len(t, bbCopy.buf, bbCopy.used) + assert.GreaterOrEqual(t, len(bb.buf), len(bbCopy.buf)) + } +} + +func Test_bucketBuffer_CopyUsed(t *testing.T) { + tests := []struct { + name string + bufLen int + used int + wantPanic bool + wantUsed int + wantBufLen int + }{ + { + name: "used is 0", + bufLen: 10, + used: 0, + wantPanic: false, + wantUsed: 0, + wantBufLen: 0, + }, + { + name: "used is greater than 0 and less than len(buf)", + bufLen: 10, + used: 5, + wantPanic: false, + wantUsed: 5, + wantBufLen: 5, + }, + { + name: "used is equal to len(buf)", + bufLen: 10, + used: 10, + wantPanic: false, + wantUsed: 10, + wantBufLen: 10, + }, + { + name: "used is greater than len(buf)", + bufLen: 10, + used: 11, + wantPanic: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := &bucketBuffer{buf: make([]kv, tt.bufLen), used: tt.used} + if tt.wantPanic { + assert.Panicsf(t, func() { + bb.CopyUsed() + }, "expected panic when used (%d) and the length of buf (%d)", tt.used, tt.bufLen) + } else { + bbCopy := bb.CopyUsed() + assert.Equal(t, tt.wantUsed, bbCopy.used) + assert.Len(t, bbCopy.buf, tt.wantBufLen) + } + }) + } +} + +func TestDedupe(t *testing.T) { + tests := []struct { + name string + keys, vals, expectedKeys, expectedVals []string + }{ + { + name: "empty", + keys: []string{}, + vals: []string{}, + expectedKeys: []string{}, + expectedVals: []string{}, + }, + { + name: "single kv", + keys: []string{"key1"}, + vals: []string{"val1"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val1"}, + }, + { + name: "duplicate key", + keys: []string{"key1", "key1"}, + vals: []string{"val1", "val2"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val2"}, + }, + { + name: "unordered keys", + keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"}, + vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"}, + expectedKeys: []string{"key1", "key2", "key3", "key4"}, + expectedVals: []string{"val2", "val4", "val1", "val6"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := &bucketBuffer{buf: make([]kv, 10), used: 0} + for i := 0; i < len(tt.keys); i++ { + bb.add([]byte(tt.keys[i]), []byte(tt.vals[i])) + } + bb.dedupe() + assert.Len(t, tt.expectedKeys, bb.used) + for i := 0; i < bb.used; i++ { + assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i])) + assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i])) + } + }) + } +}