diff --git a/server/config/config.go b/server/config/config.go index 32b7a26191b..b4a8f61a575 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -34,6 +34,10 @@ import ( "go.etcd.io/etcd/server/v3/storage/datadir" ) +const ( + grpcOverheadBytes = 512 * 1024 +) + // ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct { Name string @@ -358,3 +362,7 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration { } func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) } + +func (c *ServerConfig) MaxRequestBytesWithOverhead() uint { + return c.MaxRequestBytes + grpcOverheadBytes +} diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index a20a9cd3f58..32949207805 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -30,8 +30,7 @@ import ( ) const ( - grpcOverheadBytes = 512 * 1024 - maxSendBytes = math.MaxInt32 + maxSendBytes = math.MaxInt32 ) func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server { @@ -62,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...)) opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...)) - opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) + opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead()))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 49bf81fcae6..b0a7e4a1926 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -43,7 +43,7 @@ type watchServer struct { clusterID int64 memberID int64 - maxRequestBytes int + maxRequestBytes uint sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -58,7 +58,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { clusterID: int64(s.Cluster().ID()), memberID: int64(s.MemberID()), - maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes), + maxRequestBytes: s.Cfg.MaxRequestBytesWithOverhead(), sg: s, watchable: s.Watchable(), @@ -126,7 +126,7 @@ type serverWatchStream struct { clusterID int64 memberID int64 - maxRequestBytes int + maxRequestBytes uint sg apply.RaftStatusGetter watchable mvcc.WatchableKV @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool { func sendFragments( wr *pb.WatchResponse, - maxRequestBytes int, + maxRequestBytes uint, sendFunc func(*pb.WatchResponse) error, ) error { // no need to fragment if total request size is smaller // than max request limit or response contains only one event - if wr.Size() < maxRequestBytes || len(wr.Events) < 2 { + if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 { return sendFunc(wr) } @@ -562,7 +562,7 @@ func sendFragments( cur := ow for _, ev := range wr.Events[idx:] { cur.Events = append(cur.Events, ev) - if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes { + if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes { cur.Events = cur.Events[:len(cur.Events)-1] break } diff --git a/server/etcdserver/api/v3rpc/watch_test.go b/server/etcdserver/api/v3rpc/watch_test.go index e7868ddf8d2..caa86f91ad7 100644 --- a/server/etcdserver/api/v3rpc/watch_test.go +++ b/server/etcdserver/api/v3rpc/watch_test.go @@ -27,7 +27,7 @@ import ( func TestSendFragment(t *testing.T) { tt := []struct { wr *pb.WatchResponse - maxRequestBytes int + maxRequestBytes uint fragments int werr error }{