Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory #75

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func TestConnect(t *testing.T) {
const n = 4
ctrl := gomock.NewController(t)
td := setup(t, ctrl, n)
builder := modules.NewConsensusBuilder(1, td.keys[0])
builder := modules.NewBuilder(1, td.keys[0])
testutil.TestModules(t, ctrl, 1, td.keys[0], &builder)
teardown := createServers(t, td, ctrl)
defer teardown()
td.builders.Build()

cfg := NewConfig(td.creds, gorums.WithDialTimeout(time.Second))

builder.Register(cfg)
builder.Add(cfg)
builder.Build()

err := cfg.Connect(td.replicas)
Expand All @@ -58,7 +58,7 @@ func testBase(t *testing.T, typ any, send func(modules.Configuration), handle ev
defer serverTeardown()

cfg := NewConfig(td.creds, gorums.WithDialTimeout(time.Second))
td.builders[0].Register(cfg)
td.builders[0].Add(cfg)
hl := td.builders.Build()

err := cfg.Connect(td.replicas)
Expand All @@ -69,8 +69,14 @@ func testBase(t *testing.T, typ any, send func(modules.Configuration), handle ev

ctx, cancel := context.WithCancel(context.Background())
for _, hs := range hl[1:] {
hs.EventLoop().RegisterHandler(typ, handle)
go hs.Run(ctx)
var (
eventLoop *eventloop.EventLoop
synchronizer modules.Synchronizer
)
hs.GetAll(&eventLoop, &synchronizer)
eventLoop.RegisterHandler(typ, handle)
synchronizer.Start(ctx)
go eventLoop.Run(ctx)
}
send(cfg)
cancel()
Expand Down Expand Up @@ -219,7 +225,7 @@ func createServers(t *testing.T, td testData, ctrl *gomock.Controller) (teardown
for i := range servers {
servers[i] = NewServer(gorums.WithGRPCServerOptions(grpc.Creds(td.creds)))
servers[i].StartOnListener(td.listeners[i])
td.builders[i].Register(servers[i])
td.builders[i].Add(servers[i])
}
return func() {
for _, srv := range servers {
Expand Down
97 changes: 55 additions & 42 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"fmt"
"strings"

"github.com/relab/hotstuff/eventloop"
"github.com/relab/hotstuff/logging"
"github.com/relab/hotstuff/modules"
"github.com/relab/hotstuff/synchronizer"

"github.com/relab/gorums"
"github.com/relab/hotstuff"
Expand All @@ -21,12 +24,11 @@ import (

// Replica provides methods used by hotstuff to send messages to replicas.
type Replica struct {
node *hotstuffpb.Node
id hotstuff.ID
pubKey hotstuff.PublicKey
voteCancel context.CancelFunc
newViewCancel context.CancelFunc
md map[string]string
eventLoop *eventloop.EventLoop
node *hotstuffpb.Node
id hotstuff.ID
pubKey hotstuff.PublicKey
md map[string]string
}

// ID returns the replica's ID.
Expand All @@ -44,22 +46,20 @@ func (r *Replica) Vote(cert hotstuff.PartialCert) {
if r.node == nil {
return
}
var ctx context.Context
r.voteCancel()
ctx, r.voteCancel = context.WithCancel(context.Background())
ctx, cancel := synchronizer.TimeoutContext(r.eventLoop.Context(), r.eventLoop)
defer cancel()
pCert := hotstuffpb.PartialCertToProto(cert)
r.node.Vote(ctx, pCert, gorums.WithNoSendWaiting())
r.node.Vote(ctx, pCert)
}

// NewView sends the quorum certificate to the other replica.
func (r *Replica) NewView(msg hotstuff.SyncInfo) {
if r.node == nil {
return
}
var ctx context.Context
r.newViewCancel()
ctx, r.newViewCancel = context.WithCancel(context.Background())
r.node.NewView(ctx, hotstuffpb.SyncInfoToProto(msg), gorums.WithNoSendWaiting())
ctx, cancel := synchronizer.TimeoutContext(r.eventLoop.Context(), r.eventLoop)
defer cancel()
r.node.NewView(ctx, hotstuffpb.SyncInfoToProto(msg))
}

// Metadata returns the gRPC metadata from this replica's connection.
Expand All @@ -78,20 +78,26 @@ type Config struct {
}

type subConfig struct {
mods *modules.ConsensusCore
eventLoop *eventloop.EventLoop
logger logging.Logger
opts *modules.Options

cfg *hotstuffpb.Configuration
replicas map[hotstuff.ID]modules.Replica
}

// InitModule gives the module a reference to the ConsensusCore object.
// It also allows the module to set module options using the OptionsBuilder.
func (cfg *Config) InitModule(mods *modules.ConsensusCore, _ *modules.OptionsBuilder) {
cfg.mods = mods
// InitModule initializes the configuration.
func (cfg *Config) InitModule(mods *modules.Core) {
mods.GetAll(
&cfg.eventLoop,
&cfg.logger,
&cfg.subConfig.opts,
)

// We delay processing `replicaConnected` events until after the configurations `connected` event has occurred.
cfg.mods.EventLoop().RegisterHandler(replicaConnected{}, func(event any) {
cfg.eventLoop.RegisterHandler(replicaConnected{}, func(event any) {
if !cfg.connected {
cfg.mods.EventLoop().DelayUntil(connected{}, event)
cfg.eventLoop.DelayUntil(ConnectedEvent{}, event)
return
}
cfg.replicaConnected(event.(replicaConnected))
Expand Down Expand Up @@ -129,19 +135,19 @@ func (cfg *Config) replicaConnected(c replicaConnected) {

id, err := GetPeerIDFromContext(c.ctx, cfg)
if err != nil {
cfg.mods.Logger().Warnf("Failed to get id for %v: %v", info.Addr, err)
cfg.logger.Warnf("Failed to get id for %v: %v", info.Addr, err)
return
}

replica, ok := cfg.replicas[id]
if !ok {
cfg.mods.Logger().Warnf("Replica with id %d was not found", id)
cfg.logger.Warnf("Replica with id %d was not found", id)
return
}

replica.(*Replica).md = readMetadata(md)

cfg.mods.Logger().Debugf("Replica %d connected from address %v", id, info.Addr)
cfg.logger.Debugf("Replica %d connected from address %v", id, info.Addr)
}

const keyPrefix = "hotstuff-"
Expand Down Expand Up @@ -181,10 +187,10 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
opts := cfg.opts
cfg.opts = nil // options are not needed beyond this point, so we delete them.

md := mapToMetadata(cfg.mods.Options().ConnectionMetadata())
md := mapToMetadata(cfg.subConfig.opts.ConnectionMetadata())

// embed own ID to allow other replicas to identify messages from this replica
md.Set("id", fmt.Sprintf("%d", cfg.mods.ID()))
md.Set("id", fmt.Sprintf("%d", cfg.subConfig.opts.ID()))

opts = append(opts, gorums.WithMetadata(md))

Expand All @@ -195,14 +201,13 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
for _, replica := range replicas {
// also initialize Replica structures
cfg.replicas[replica.ID] = &Replica{
id: replica.ID,
pubKey: replica.PubKey,
newViewCancel: func() {},
voteCancel: func() {},
md: make(map[string]string),
eventLoop: cfg.eventLoop,
id: replica.ID,
pubKey: replica.PubKey,
md: make(map[string]string),
}
// we do not want to connect to ourself
if replica.ID != cfg.mods.ID() {
if replica.ID != cfg.subConfig.opts.ID() {
idMapping[replica.Address] = uint32(replica.ID)
}
}
Expand All @@ -225,7 +230,7 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
cfg.connected = true

// this event is sent so that any delayed `replicaConnected` events can be processed.
cfg.mods.EventLoop().AddEvent(connected{})
cfg.eventLoop.AddEvent(ConnectedEvent{})

return nil
}
Expand Down Expand Up @@ -254,9 +259,11 @@ func (cfg *Config) SubConfig(ids []hotstuff.ID) (sub modules.Configuration, err
return nil, err
}
return &subConfig{
mods: cfg.mods,
cfg: newCfg,
replicas: replicas,
eventLoop: cfg.eventLoop,
logger: cfg.logger,
opts: cfg.subConfig.opts,
cfg: newCfg,
replicas: replicas,
}, nil
}

Expand All @@ -279,10 +286,11 @@ func (cfg *subConfig) Propose(proposal hotstuff.ProposeMsg) {
if cfg.cfg == nil {
return
}
ctx, cancel := synchronizer.TimeoutContext(cfg.eventLoop.Context(), cfg.eventLoop)
defer cancel()
cfg.cfg.Propose(
cfg.mods.Synchronizer().ViewContext(),
ctx,
hotstuffpb.ProposalToProto(proposal),
gorums.WithNoSendWaiting(),
)
}

Expand All @@ -291,10 +299,14 @@ func (cfg *subConfig) Timeout(msg hotstuff.TimeoutMsg) {
if cfg.cfg == nil {
return
}

// will wait until the second timeout before cancelling
ctx, cancel := synchronizer.TimeoutContext(cfg.eventLoop.Context(), cfg.eventLoop)
defer cancel()

cfg.cfg.Timeout(
cfg.mods.Synchronizer().ViewContext(),
ctx,
hotstuffpb.TimeoutMsgToProto(msg),
gorums.WithNoSendWaiting(),
)
}

Expand All @@ -305,7 +317,7 @@ func (cfg *subConfig) Fetch(ctx context.Context, hash hotstuff.Hash) (*hotstuff.
qcErr, ok := err.(gorums.QuorumCallError)
// filter out context errors
if !ok || (qcErr.Reason != context.Canceled.Error() && qcErr.Reason != context.DeadlineExceeded.Error()) {
cfg.mods.Logger().Infof("Failed to fetch block: %v", err)
cfg.logger.Infof("Failed to fetch block: %v", err)
}
return nil, false
}
Expand Down Expand Up @@ -335,4 +347,5 @@ func (q qspec) FetchQF(in *hotstuffpb.BlockHash, replies map[uint32]*hotstuffpb.
return nil, false
}

type connected struct{}
// ConnectedEvent is sent when the configuration has connected to the other replicas.
type ConnectedEvent struct{}
Loading