Skip to content

Commit

Permalink
Increase sync watchers period to 1 second based on experiments with w…
Browse files Browse the repository at this point in the history
…atch starvation

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 29, 2024
1 parent 8f91eb1 commit 98540ba
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
16 changes: 13 additions & 3 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ var (

// maxWatchersPerSync is the number of watchers to sync in a single batch
maxWatchersPerSync = 512
// watchersSyncLoopPeriod is number od seconds we attempt to resync unsynched watchers.
// To frequent resyncing allocates a lot of memory. Value picked based on the following benchmark results:
// object size|streams| sync duration[s] | response throughput[MB/s] | etcd memory [GB] | 50%ile latency[s] | 90%ile latency[s] | 99%ile latency[s] | 99.9%ile latency[s] |
// -----------|-------|------------------|---------------------------|------------------|-------------------|-------------------|-------------------|---------------------|
// 1MB | 20 | 0.1 | 1579 | 14.9 | 3.47 | 6.32 | 8.22 | 9.4 |
// 30KB | 1000 | 0.1 | 2006 | 5.5 | 4.26 | 12.74 | 18.81 | 22.69 |
// 3KB | 10000 | 0.1 | 2061 | 1.8 | 4.78 | 12.82 | 19.33 | 23.68 |
// 1MB | 20 | 1 | 1634 | 9 | 3.45 | 5.63 | 7.98 | 9.34 |
// 30KB | 1000 | 1 | 2257 | 1.4 | 4.07 | 11.76 | 17.8 | 21.87 |
// 3KB | 10000 | 1 | 2154 | 1.1 | 4.29 | 11.74 | 18.01 | 22.36 |
watchersSyncLoopPeriod = time.Second
)

func ChanBufLen() int { return chanBufLen }
Expand Down Expand Up @@ -215,8 +226,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()

waitDuration := 100 * time.Millisecond
delayTicker := time.NewTicker(waitDuration)
delayTicker := time.NewTicker(watchersSyncLoopPeriod)
defer delayTicker.Stop()

for {
Expand All @@ -231,7 +241,7 @@ func (s *watchableStore) syncWatchersLoop() {
}
syncDuration := time.Since(st)

delayTicker.Reset(waitDuration)
delayTicker.Reset(watchersSyncLoopPeriod)
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
// be fair to other store operations by yielding time taken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client
t.Fatalf("could not release barrier (%v)", err)
}

timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
timerC := time.After(time.Second)
for i := 0; i < waiters; i++ {
select {
case <-timerC:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3election_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestV3ElectionCampaign(t *testing.T) {
}

select {
case <-time.After(200 * time.Millisecond):
case <-time.After(time.Second):
t.Fatalf("campaigner unelected after resign")
case <-campaignc:
}
Expand Down

0 comments on commit 98540ba

Please sign in to comment.