From e5ac368b27c26d5a18a2fe9f025abdbcbe9b8119 Mon Sep 17 00:00:00 2001 From: caojiamingalan Date: Thu, 11 May 2023 21:19:45 -0500 Subject: [PATCH 1/2] Allow reuse of slices exposed through Ready Signed-off-by: caojiamingalan --- raft.go | 13 ++++++++++++- rawnode.go | 9 ++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/raft.go b/raft.go index d1048294..53082cae 100644 --- a/raft.go +++ b/raft.go @@ -346,7 +346,9 @@ type raft struct { // Messages in this list have the type MsgAppResp, MsgVoteResp, or // MsgPreVoteResp. See the comment in raft.send for details. msgsAfterAppend []pb.Message - + // msgsAfterAppendPool is a pool for msgsAfterAppend to avoid frequently + // reallocating the underlying array + msgsAfterAppendPool *sync.Pool // the leader id lead uint64 // leadTransferee is id of the leader transfer target when its value is not zero. @@ -424,6 +426,12 @@ func newRaft(c *Config) *raft { preVote: c.PreVote, readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, + msgsAfterAppendPool: &sync.Pool{ + New: func() interface{} { + sl := make([]pb.Message, 0, 8) + return sl + }, + }, } cfg, prs, err := confchange.Restore(confchange.Changer{ @@ -545,6 +553,9 @@ func (r *raft) send(m pb.Message) { // because the safety of such behavior has not been formally verified, // we err on the side of safety and omit a `&& !m.Reject` condition // above. + if cap(r.msgsAfterAppend) == 0 { + r.msgsAfterAppend = r.msgsAfterAppendPool.Get().([]pb.Message) + } r.msgsAfterAppend = append(r.msgsAfterAppend, m) } else { if m.To == r.id { diff --git a/rawnode.go b/rawnode.go index 0f3807c6..6663b9a1 100644 --- a/rawnode.go +++ b/rawnode.go @@ -252,7 +252,10 @@ func newStorageAppendMsg(r *raft, rd Ready) pb.Message { // be contained in msgsAfterAppend). This ordering allows the MsgAppResp // handling to use a fast-path in r.raftLog.term() before the newly appended // entries are removed from the unstable log. - m.Responses = r.msgsAfterAppend + + //msgsAfterAppend is a pooled slice, so we need to make a deep copy + m.Responses = make([]pb.Message, len(r.msgsAfterAppend)) + copy(m.Responses, r.msgsAfterAppend) if needStorageAppendRespMsg(r, rd) { m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) } @@ -428,6 +431,10 @@ func (rn *RawNode) acceptReady(rd Ready) { } } rn.raft.msgs = nil + //release the msgsAfterAppend slice and put it into the msgsAfterAppendPool + if cap(rn.raft.msgsAfterAppend) > 0 { + rn.raft.msgsAfterAppendPool.Put(rn.raft.msgsAfterAppend[:0]) + } rn.raft.msgsAfterAppend = nil rn.raft.raftLog.acceptUnstable() if len(rd.CommittedEntries) > 0 { From 72e5c3d71328a9c6202a7a8bccd4c7f5408b6a89 Mon Sep 17 00:00:00 2001 From: caojiamingalan Date: Fri, 12 May 2023 14:02:42 -0500 Subject: [PATCH 2/2] Resolve some comments Signed-off-by: caojiamingalan --- raft.go | 7 ++++--- rawnode.go | 18 +++++++++++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/raft.go b/raft.go index 53082cae..775d8e9a 100644 --- a/raft.go +++ b/raft.go @@ -347,7 +347,9 @@ type raft struct { // MsgPreVoteResp. See the comment in raft.send for details. msgsAfterAppend []pb.Message // msgsAfterAppendPool is a pool for msgsAfterAppend to avoid frequently - // reallocating the underlying array + // reallocating the underlying array. When assigning a new slice to msgsAfterAppend, + // we should try to get it from the pool first; before releasing the msgsAfterAppend + // slice, we should put the underlying array into the pool. msgsAfterAppendPool *sync.Pool // the leader id lead uint64 @@ -428,8 +430,7 @@ func newRaft(c *Config) *raft { disableProposalForwarding: c.DisableProposalForwarding, msgsAfterAppendPool: &sync.Pool{ New: func() interface{} { - sl := make([]pb.Message, 0, 8) - return sl + return make([]pb.Message, 0, 8) }, }, } diff --git a/rawnode.go b/rawnode.go index 6663b9a1..8a225554 100644 --- a/rawnode.go +++ b/rawnode.go @@ -253,11 +253,18 @@ func newStorageAppendMsg(r *raft, rd Ready) pb.Message { // handling to use a fast-path in r.raftLog.term() before the newly appended // entries are removed from the unstable log. - //msgsAfterAppend is a pooled slice, so we need to make a deep copy - m.Responses = make([]pb.Message, len(r.msgsAfterAppend)) - copy(m.Responses, r.msgsAfterAppend) + // msgsAfterAppend is a pooled slice, so we need to make a deep copy + msgsCount, needResp := len(r.msgsAfterAppend), false if needStorageAppendRespMsg(r, rd) { - m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + msgsCount++ + needResp = true + } + if msgsCount != 0 { + // NB: allocating precisely as much as needed. + m.Responses = append(make([]pb.Message, 0, msgsCount), r.msgsAfterAppend...) + if needResp { + m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + } } return m } @@ -431,7 +438,8 @@ func (rn *RawNode) acceptReady(rd Ready) { } } rn.raft.msgs = nil - //release the msgsAfterAppend slice and put it into the msgsAfterAppendPool + // Release the msgsAfterAppend slice and put it back into the msgsAfterAppendPool. + // This clears the slice but reserves the underlying array, which we can pool and reuse. if cap(rn.raft.msgsAfterAppend) > 0 { rn.raft.msgsAfterAppendPool.Put(rn.raft.msgsAfterAppend[:0]) }