diff --git a/raft.go b/raft.go index d1048294..775d8e9a 100644 --- a/raft.go +++ b/raft.go @@ -346,7 +346,11 @@ type raft struct { // Messages in this list have the type MsgAppResp, MsgVoteResp, or // MsgPreVoteResp. See the comment in raft.send for details. msgsAfterAppend []pb.Message - + // msgsAfterAppendPool is a pool for msgsAfterAppend to avoid frequently + // reallocating the underlying array. When assigning a new slice to msgsAfterAppend, + // we should try to get it from the pool first; before releasing the msgsAfterAppend + // slice, we should put the underlying array into the pool. + msgsAfterAppendPool *sync.Pool // the leader id lead uint64 // leadTransferee is id of the leader transfer target when its value is not zero. @@ -424,6 +428,11 @@ func newRaft(c *Config) *raft { preVote: c.PreVote, readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, + msgsAfterAppendPool: &sync.Pool{ + New: func() interface{} { + return make([]pb.Message, 0, 8) + }, + }, } cfg, prs, err := confchange.Restore(confchange.Changer{ @@ -545,6 +554,9 @@ func (r *raft) send(m pb.Message) { // because the safety of such behavior has not been formally verified, // we err on the side of safety and omit a `&& !m.Reject` condition // above. + if cap(r.msgsAfterAppend) == 0 { + r.msgsAfterAppend = r.msgsAfterAppendPool.Get().([]pb.Message) + } r.msgsAfterAppend = append(r.msgsAfterAppend, m) } else { if m.To == r.id { diff --git a/rawnode.go b/rawnode.go index 0f3807c6..8a225554 100644 --- a/rawnode.go +++ b/rawnode.go @@ -252,9 +252,19 @@ func newStorageAppendMsg(r *raft, rd Ready) pb.Message { // be contained in msgsAfterAppend). This ordering allows the MsgAppResp // handling to use a fast-path in r.raftLog.term() before the newly appended // entries are removed from the unstable log. - m.Responses = r.msgsAfterAppend + + // msgsAfterAppend is a pooled slice, so we need to make a deep copy + msgsCount, needResp := len(r.msgsAfterAppend), false if needStorageAppendRespMsg(r, rd) { - m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + msgsCount++ + needResp = true + } + if msgsCount != 0 { + // NB: allocating precisely as much as needed. + m.Responses = append(make([]pb.Message, 0, msgsCount), r.msgsAfterAppend...) + if needResp { + m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + } } return m } @@ -428,6 +438,11 @@ func (rn *RawNode) acceptReady(rd Ready) { } } rn.raft.msgs = nil + // Release the msgsAfterAppend slice and put it back into the msgsAfterAppendPool. + // This clears the slice but reserves the underlying array, which we can pool and reuse. + if cap(rn.raft.msgsAfterAppend) > 0 { + rn.raft.msgsAfterAppendPool.Put(rn.raft.msgsAfterAppend[:0]) + } rn.raft.msgsAfterAppend = nil rn.raft.raftLog.acceptUnstable() if len(rd.CommittedEntries) > 0 {