Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reuse of slices exposed through Ready #51

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message

// msgsAfterAppendPool is a pool for msgsAfterAppend to avoid frequently
// reallocating the underlying array
msgsAfterAppendPool *sync.Pool
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -424,6 +426,12 @@ func newRaft(c *Config) *raft {
preVote: c.PreVote,
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
msgsAfterAppendPool: &sync.Pool{
New: func() interface{} {
sl := make([]pb.Message, 0, 8)
return sl
CaojiamingAlan marked this conversation as resolved.
Show resolved Hide resolved
},
},
}

cfg, prs, err := confchange.Restore(confchange.Changer{
Expand Down Expand Up @@ -545,6 +553,9 @@ func (r *raft) send(m pb.Message) {
// because the safety of such behavior has not been formally verified,
// we err on the side of safety and omit a `&& !m.Reject` condition
// above.
if cap(r.msgsAfterAppend) == 0 {
r.msgsAfterAppend = r.msgsAfterAppendPool.Get().([]pb.Message)
}
r.msgsAfterAppend = append(r.msgsAfterAppend, m)
} else {
if m.To == r.id {
Expand Down
9 changes: 8 additions & 1 deletion rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ func newStorageAppendMsg(r *raft, rd Ready) pb.Message {
// be contained in msgsAfterAppend). This ordering allows the MsgAppResp
// handling to use a fast-path in r.raftLog.term() before the newly appended
// entries are removed from the unstable log.
m.Responses = r.msgsAfterAppend

//msgsAfterAppend is a pooled slice, so we need to make a deep copy
CaojiamingAlan marked this conversation as resolved.
Show resolved Hide resolved
m.Responses = make([]pb.Message, len(r.msgsAfterAppend))
copy(m.Responses, r.msgsAfterAppend)
if needStorageAppendRespMsg(r, rd) {
m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd))
}
CaojiamingAlan marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -428,6 +431,10 @@ func (rn *RawNode) acceptReady(rd Ready) {
}
}
rn.raft.msgs = nil
//release the msgsAfterAppend slice and put it into the msgsAfterAppendPool
CaojiamingAlan marked this conversation as resolved.
Show resolved Hide resolved
if cap(rn.raft.msgsAfterAppend) > 0 {
rn.raft.msgsAfterAppendPool.Put(rn.raft.msgsAfterAppend[:0])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment explaining the logic of this optimization. Similar to what you've said (#51 (comment))

}
rn.raft.msgsAfterAppend = nil
rn.raft.raftLog.acceptUnstable()
if len(rd.CommittedEntries) > 0 {
Expand Down