Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reuse of slices exposed through Ready #51

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,11 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message

// msgsAfterAppendPool is a pool for msgsAfterAppend to avoid frequently
// reallocating the underlying array. When assigning a new slice to msgsAfterAppend,
// we should try to get it from the pool first; before releasing the msgsAfterAppend
// slice, we should put the underlying array into the pool.
msgsAfterAppendPool *sync.Pool
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -424,6 +428,11 @@ func newRaft(c *Config) *raft {
preVote: c.PreVote,
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
msgsAfterAppendPool: &sync.Pool{
New: func() interface{} {
return make([]pb.Message, 0, 8)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the number used in the example given by #15 (comment) . Actually I don't really know what might be a good number, so adopted that directly.

},
},
}

cfg, prs, err := confchange.Restore(confchange.Changer{
Expand Down Expand Up @@ -545,6 +554,9 @@ func (r *raft) send(m pb.Message) {
// because the safety of such behavior has not been formally verified,
// we err on the side of safety and omit a `&& !m.Reject` condition
// above.
if cap(r.msgsAfterAppend) == 0 {
r.msgsAfterAppend = r.msgsAfterAppendPool.Get().([]pb.Message)
}
r.msgsAfterAppend = append(r.msgsAfterAppend, m)
} else {
if m.To == r.id {
Expand Down
19 changes: 17 additions & 2 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,19 @@ func newStorageAppendMsg(r *raft, rd Ready) pb.Message {
// be contained in msgsAfterAppend). This ordering allows the MsgAppResp
// handling to use a fast-path in r.raftLog.term() before the newly appended
// entries are removed from the unstable log.
m.Responses = r.msgsAfterAppend

// msgsAfterAppend is a pooled slice, so we need to make a deep copy
msgsCount, needResp := len(r.msgsAfterAppend), false
if needStorageAppendRespMsg(r, rd) {
m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd))
msgsCount++
needResp = true
}
if msgsCount != 0 {
// NB: allocating precisely as much as needed.
m.Responses = append(make([]pb.Message, 0, msgsCount), r.msgsAfterAppend...)
if needResp {
m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd))
}
}
return m
}
Expand Down Expand Up @@ -428,6 +438,11 @@ func (rn *RawNode) acceptReady(rd Ready) {
}
}
rn.raft.msgs = nil
// Release the msgsAfterAppend slice and put it back into the msgsAfterAppendPool.
// This clears the slice but reserves the underlying array, which we can pool and reuse.
if cap(rn.raft.msgsAfterAppend) > 0 {
rn.raft.msgsAfterAppendPool.Put(rn.raft.msgsAfterAppend[:0])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment explaining the logic of this optimization. Similar to what you've said (#51 (comment))

}
rn.raft.msgsAfterAppend = nil
rn.raft.raftLog.acceptUnstable()
if len(rd.CommittedEntries) > 0 {
Expand Down