From 0a54362ccacd980e07840cf613b2fc5a1bfbd15f Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 25 Mar 2024 23:06:08 +0000 Subject: [PATCH] add key dedupe when a write buffer writeback to an empty read buffer bucket. Signed-off-by: Siyuan Zhang --- server/storage/backend/tx_buffer.go | 12 +++++- server/storage/backend/tx_buffer_test.go | 50 ++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index c6907e7e6b5..821b300bfef 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -97,6 +97,9 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb, ok := txr.buckets[k] if !ok { delete(txw.buckets, k) + if seq, ok := txw.bucket2seq[k]; ok && !seq { + wb.dedupe() + } txr.buckets[k] = wb continue } @@ -218,10 +221,15 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { return } + bb.dedupe() +} +// dedupe removes duplicates, using only newest update +func (bb *bucketBuffer) dedupe() { + if bb.used <= 1 { + return + } sort.Stable(bb) - - // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { diff --git a/server/storage/backend/tx_buffer_test.go b/server/storage/backend/tx_buffer_test.go index 745b2a7ec9d..3bcabde256d 100644 --- a/server/storage/backend/tx_buffer_test.go +++ b/server/storage/backend/tx_buffer_test.go @@ -90,3 +90,53 @@ func Test_bucketBuffer_CopyUsed(t *testing.T) { }) } } + +func TestDedupe(t *testing.T) { + tests := []struct { + name string + keys, vals, expectedKeys, expectedVals []string + }{ + { + name: "empty", + keys: []string{}, + vals: []string{}, + expectedKeys: []string{}, + expectedVals: []string{}, + }, + { + name: "single kv", + keys: []string{"key1"}, + vals: []string{"val1"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val1"}, + }, + { + name: "duplicate key", + keys: []string{"key1", "key1"}, + vals: []string{"val1", "val2"}, + expectedKeys: []string{"key1"}, + expectedVals: []string{"val2"}, + }, + { + name: "unordered keys", + keys: []string{"key3", "key1", "key4", "key2", "key1", "key4"}, + vals: []string{"val1", "val5", "val3", "val4", "val2", "val6"}, + expectedKeys: []string{"key1", "key2", "key3", "key4"}, + expectedVals: []string{"val2", "val4", "val1", "val6"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := &bucketBuffer{buf: make([]kv, 10), used: 0} + for i := 0; i < len(tt.keys); i++ { + bb.add([]byte(tt.keys[i]), []byte(tt.vals[i])) + } + bb.dedupe() + assert.Equal(t, bb.used, len(tt.expectedKeys)) + for i := 0; i < bb.used; i++ { + assert.Equal(t, bb.buf[i].key, []byte(tt.expectedKeys[i])) + assert.Equal(t, bb.buf[i].val, []byte(tt.expectedVals[i])) + } + }) + } +}