Skip to content

Commit

Permalink
Reuse grpcOverheadBytes calculation by creating a function for that
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 2, 2024
1 parent 6fa7342 commit c77ea65
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
8 changes: 8 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"go.etcd.io/etcd/server/v3/storage/datadir"
)

const (
grpcOverheadBytes = 512 * 1024
)

// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
type ServerConfig struct {
Name string
Expand Down Expand Up @@ -358,3 +362,7 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration {
}

func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) }

func (c *ServerConfig) MaxRequestBytesWithOverhead() uint {
return c.MaxRequestBytes + grpcOverheadBytes
}
5 changes: 2 additions & 3 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
)

const (
grpcOverheadBytes = 512 * 1024
maxSendBytes = math.MaxInt32
maxSendBytes = math.MaxInt32
)

func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
Expand Down Expand Up @@ -62,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...))

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead())))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

Expand Down
12 changes: 6 additions & 6 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type watchServer struct {
clusterID int64
memberID int64

maxRequestBytes int
maxRequestBytes uint

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand All @@ -58,7 +58,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.MemberID()),

maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),
maxRequestBytes: s.Cfg.MaxRequestBytesWithOverhead(),

sg: s,
watchable: s.Watchable(),
Expand Down Expand Up @@ -126,7 +126,7 @@ type serverWatchStream struct {
clusterID int64
memberID int64

maxRequestBytes int
maxRequestBytes uint

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand Down Expand Up @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool {

func sendFragments(
wr *pb.WatchResponse,
maxRequestBytes int,
maxRequestBytes uint,
sendFunc func(*pb.WatchResponse) error,
) error {
// no need to fragment if total request size is smaller
// than max request limit or response contains only one event
if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 {
return sendFunc(wr)
}

Expand All @@ -562,7 +562,7 @@ func sendFragments(
cur := ow
for _, ev := range wr.Events[idx:] {
cur.Events = append(cur.Events, ev)
if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes {
cur.Events = cur.Events[:len(cur.Events)-1]
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func TestSendFragment(t *testing.T) {
tt := []struct {
wr *pb.WatchResponse
maxRequestBytes int
maxRequestBytes uint
fragments int
werr error
}{
Expand Down

0 comments on commit c77ea65

Please sign in to comment.