Skip to content

Commit

Permalink
backport fix watch event loss after compaction etcd-io#17555
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <[email protected]>
  • Loading branch information
chaochn47 committed Mar 19, 2024
1 parent f38dfe2 commit c3890fa
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
13 changes: 10 additions & 3 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package mvcc

import (
"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/clientv3"
"sync"
"time"

"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/clientv3"

"go.uber.org/zap"

"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)

// non-const so modifiable by tests
Expand Down Expand Up @@ -370,6 +372,11 @@ func (s *watchableStore) syncWatchers() int {
var victims watcherBatch
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
62 changes: 61 additions & 1 deletion mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -259,6 +262,63 @@ func TestWatchCompacted(t *testing.T) {
}
}

func TestWatchNoEventLossOnCompact(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
b, tmpPath := backend.NewDefaultTmpBackend()
lg := zaptest.NewLogger(t)
s := newWatchableStore(lg, b, &lease.FakeLessor{}, nil, nil, StoreConfig{})

defer func() {
s.store.Close()
os.Remove(tmpPath)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 4
testKey, testValue := []byte("foo"), []byte("bar")

maxRev := 10
compactRev := int64(5)
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(traceutil.TODO(), compactRev)
require.NoErrorf(t, err, "failed to compact kv (%v)", err)

w := s.NewWatchStream()
defer w.Close()

watchers := map[WatchID]int64{
0: 1,
1: 1, // create unsyncd watchers with startRev < compactRev
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
}
for id, startRev := range watchers {
_, err := w.Watch(id, testKey, nil, startRev)
require.NoError(t, err)
}
// fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()

for len(watchers) > 0 {
resp := <-w.Chan()
if resp.CompactRevision != 0 {
require.Equal(t, resp.CompactRevision, compactRev)
require.Contains(t, watchers, resp.WatchID)
delete(watchers, resp.WatchID)
continue
}
nextRev := watchers[resp.WatchID]
for _, ev := range resp.Events {
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
nextRev++
}
if nextRev == s.rev()+1 {
delete(watchers, resp.WatchID)
}
}
}

func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
Expand Down

0 comments on commit c3890fa

Please sign in to comment.