diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ad17b2be7ace..81eafc2e911d 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -37,6 +37,17 @@ var ( // maxWatchersPerSync is the number of watchers to sync in a single batch maxWatchersPerSync = 512 + // watchersSyncLoopPeriod is number od seconds we attempt to resync unsynched watchers. + // To frequent resyncing allocates a lot of memory. Value picked based on the following benchmark results: + // object size|streams| sync duration[s] | response throughput[MB/s] | etcd memory [GB] | 50%ile latency[s] | 90%ile latency[s] | 99%ile latency[s] | 99.9%ile latency[s] | + // -----------|-------|------------------|---------------------------|------------------|-------------------|-------------------|-------------------|---------------------| + // 1MB | 20 | 0.1 | 1579 | 14.9 | 3.47 | 6.32 | 8.22 | 9.4 | + // 30KB | 1000 | 0.1 | 2006 | 5.5 | 4.26 | 12.74 | 18.81 | 22.69 | + // 3KB | 10000 | 0.1 | 2061 | 1.8 | 4.78 | 12.82 | 19.33 | 23.68 | + // 1MB | 20 | 1 | 1634 | 9 | 3.45 | 5.63 | 7.98 | 9.34 | + // 30KB | 1000 | 1 | 2257 | 1.4 | 4.07 | 11.76 | 17.8 | 21.87 | + // 3KB | 10000 | 1 | 2154 | 1.1 | 4.29 | 11.74 | 18.01 | 22.36 | + watchersSyncLoopPeriod = time.Second ) func ChanBufLen() int { return chanBufLen } @@ -215,8 +226,7 @@ func (s *watchableStore) Restore(b backend.Backend) error { func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() - waitDuration := 100 * time.Millisecond - delayTicker := time.NewTicker(waitDuration) + delayTicker := time.NewTicker(watchersSyncLoopPeriod) defer delayTicker.Stop() for { @@ -231,7 +241,7 @@ func (s *watchableStore) syncWatchersLoop() { } syncDuration := time.Since(st) - delayTicker.Reset(waitDuration) + delayTicker.Reset(watchersSyncLoopPeriod) // more work pending? if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers { // be fair to other store operations by yielding time taken diff --git a/tests/integration/clientv3/experimental/recipes/v3_barrier_test.go b/tests/integration/clientv3/experimental/recipes/v3_barrier_test.go index b6dfef385a7a..d9dca4dd4309 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_barrier_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_barrier_test.go @@ -79,7 +79,7 @@ func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client t.Fatalf("could not release barrier (%v)", err) } - timerC := time.After(time.Duration(waiters*100) * time.Millisecond) + timerC := time.After(time.Second) for i := 0; i < waiters; i++ { select { case <-timerC: diff --git a/tests/integration/v3election_grpc_test.go b/tests/integration/v3election_grpc_test.go index d0ca72b42555..d95e6e555501 100644 --- a/tests/integration/v3election_grpc_test.go +++ b/tests/integration/v3election_grpc_test.go @@ -74,7 +74,7 @@ func TestV3ElectionCampaign(t *testing.T) { } select { - case <-time.After(200 * time.Millisecond): + case <-time.After(time.Second): t.Fatalf("campaigner unelected after resign") case <-campaignc: }