Skip to content

Commit

Permalink
Merge pull request #73 from relab/module-refactor
Browse files Browse the repository at this point in the history
Refactor module system
  • Loading branch information
johningve authored Aug 23, 2022
2 parents eb133bb + 040b049 commit 5196107
Show file tree
Hide file tree
Showing 50 changed files with 1,519 additions and 1,098 deletions.
18 changes: 12 additions & 6 deletions backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func TestConnect(t *testing.T) {
const n = 4
ctrl := gomock.NewController(t)
td := setup(t, ctrl, n)
builder := modules.NewConsensusBuilder(1, td.keys[0])
builder := modules.NewBuilder(1, td.keys[0])
testutil.TestModules(t, ctrl, 1, td.keys[0], &builder)
teardown := createServers(t, td, ctrl)
defer teardown()
td.builders.Build()

cfg := NewConfig(td.creds, gorums.WithDialTimeout(time.Second))

builder.Register(cfg)
builder.Add(cfg)
builder.Build()

err := cfg.Connect(td.replicas)
Expand All @@ -58,7 +58,7 @@ func testBase(t *testing.T, typ any, send func(modules.Configuration), handle ev
defer serverTeardown()

cfg := NewConfig(td.creds, gorums.WithDialTimeout(time.Second))
td.builders[0].Register(cfg)
td.builders[0].Add(cfg)
hl := td.builders.Build()

err := cfg.Connect(td.replicas)
Expand All @@ -69,8 +69,14 @@ func testBase(t *testing.T, typ any, send func(modules.Configuration), handle ev

ctx, cancel := context.WithCancel(context.Background())
for _, hs := range hl[1:] {
hs.EventLoop().RegisterHandler(typ, handle)
go hs.Run(ctx)
var (
eventLoop *eventloop.EventLoop
synchronizer modules.Synchronizer
)
hs.Get(&eventLoop, &synchronizer)
eventLoop.RegisterHandler(typ, handle)
synchronizer.Start(ctx)
go eventLoop.Run(ctx)
}
send(cfg)
cancel()
Expand Down Expand Up @@ -219,7 +225,7 @@ func createServers(t *testing.T, td testData, ctrl *gomock.Controller) (teardown
for i := range servers {
servers[i] = NewServer(gorums.WithGRPCServerOptions(grpc.Creds(td.creds)))
servers[i].StartOnListener(td.listeners[i])
td.builders[i].Register(servers[i])
td.builders[i].Add(servers[i])
}
return func() {
for _, srv := range servers {
Expand Down
56 changes: 35 additions & 21 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"strings"

"github.com/relab/hotstuff/eventloop"
"github.com/relab/hotstuff/logging"
"github.com/relab/hotstuff/modules"

"github.com/relab/gorums"
Expand Down Expand Up @@ -78,20 +80,28 @@ type Config struct {
}

type subConfig struct {
mods *modules.ConsensusCore
eventLoop *eventloop.EventLoop
logger logging.Logger
opts *modules.Options
synchronizer modules.Synchronizer

cfg *hotstuffpb.Configuration
replicas map[hotstuff.ID]modules.Replica
}

// InitModule gives the module a reference to the ConsensusCore object.
// It also allows the module to set module options using the OptionsBuilder.
func (cfg *Config) InitModule(mods *modules.ConsensusCore, _ *modules.OptionsBuilder) {
cfg.mods = mods
// InitModule initializes the configuration.
func (cfg *Config) InitModule(mods *modules.Core) {
mods.Get(
&cfg.eventLoop,
&cfg.logger,
&cfg.subConfig.opts,
&cfg.synchronizer,
)

// We delay processing `replicaConnected` events until after the configurations `connected` event has occurred.
cfg.mods.EventLoop().RegisterHandler(replicaConnected{}, func(event any) {
cfg.eventLoop.RegisterHandler(replicaConnected{}, func(event any) {
if !cfg.connected {
cfg.mods.EventLoop().DelayUntil(connected{}, event)
cfg.eventLoop.DelayUntil(ConnectedEvent{}, event)
return
}
cfg.replicaConnected(event.(replicaConnected))
Expand Down Expand Up @@ -129,19 +139,19 @@ func (cfg *Config) replicaConnected(c replicaConnected) {

id, err := GetPeerIDFromContext(c.ctx, cfg)
if err != nil {
cfg.mods.Logger().Warnf("Failed to get id for %v: %v", info.Addr, err)
cfg.logger.Warnf("Failed to get id for %v: %v", info.Addr, err)
return
}

replica, ok := cfg.replicas[id]
if !ok {
cfg.mods.Logger().Warnf("Replica with id %d was not found", id)
cfg.logger.Warnf("Replica with id %d was not found", id)
return
}

replica.(*Replica).md = readMetadata(md)

cfg.mods.Logger().Debugf("Replica %d connected from address %v", id, info.Addr)
cfg.logger.Debugf("Replica %d connected from address %v", id, info.Addr)
}

const keyPrefix = "hotstuff-"
Expand Down Expand Up @@ -181,10 +191,10 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
opts := cfg.opts
cfg.opts = nil // options are not needed beyond this point, so we delete them.

md := mapToMetadata(cfg.mods.Options().ConnectionMetadata())
md := mapToMetadata(cfg.subConfig.opts.ConnectionMetadata())

// embed own ID to allow other replicas to identify messages from this replica
md.Set("id", fmt.Sprintf("%d", cfg.mods.ID()))
md.Set("id", fmt.Sprintf("%d", cfg.subConfig.opts.ID()))

opts = append(opts, gorums.WithMetadata(md))

Expand All @@ -202,7 +212,7 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
md: make(map[string]string),
}
// we do not want to connect to ourself
if replica.ID != cfg.mods.ID() {
if replica.ID != cfg.subConfig.opts.ID() {
idMapping[replica.Address] = uint32(replica.ID)
}
}
Expand All @@ -225,7 +235,7 @@ func (cfg *Config) Connect(replicas []ReplicaInfo) (err error) {
cfg.connected = true

// this event is sent so that any delayed `replicaConnected` events can be processed.
cfg.mods.EventLoop().AddEvent(connected{})
cfg.eventLoop.AddEvent(ConnectedEvent{})

return nil
}
Expand Down Expand Up @@ -254,9 +264,12 @@ func (cfg *Config) SubConfig(ids []hotstuff.ID) (sub modules.Configuration, err
return nil, err
}
return &subConfig{
mods: cfg.mods,
cfg: newCfg,
replicas: replicas,
eventLoop: cfg.eventLoop,
logger: cfg.logger,
opts: cfg.subConfig.opts,
synchronizer: cfg.synchronizer,
cfg: newCfg,
replicas: replicas,
}, nil
}

Expand All @@ -280,7 +293,7 @@ func (cfg *subConfig) Propose(proposal hotstuff.ProposeMsg) {
return
}
cfg.cfg.Propose(
cfg.mods.Synchronizer().ViewContext(),
cfg.synchronizer.ViewContext(),
hotstuffpb.ProposalToProto(proposal),
gorums.WithNoSendWaiting(),
)
Expand All @@ -292,7 +305,7 @@ func (cfg *subConfig) Timeout(msg hotstuff.TimeoutMsg) {
return
}
cfg.cfg.Timeout(
cfg.mods.Synchronizer().ViewContext(),
cfg.synchronizer.ViewContext(),
hotstuffpb.TimeoutMsgToProto(msg),
gorums.WithNoSendWaiting(),
)
Expand All @@ -305,7 +318,7 @@ func (cfg *subConfig) Fetch(ctx context.Context, hash hotstuff.Hash) (*hotstuff.
qcErr, ok := err.(gorums.QuorumCallError)
// filter out context errors
if !ok || (qcErr.Reason != context.Canceled.Error() && qcErr.Reason != context.DeadlineExceeded.Error()) {
cfg.mods.Logger().Infof("Failed to fetch block: %v", err)
cfg.logger.Infof("Failed to fetch block: %v", err)
}
return nil, false
}
Expand Down Expand Up @@ -335,4 +348,5 @@ func (q qspec) FetchQF(in *hotstuffpb.BlockHash, replies map[uint32]*hotstuffpb.
return nil, false
}

type connected struct{}
// ConnectedEvent is sent when the configuration has connected to the other replicas.
type ConnectedEvent struct{}
55 changes: 33 additions & 22 deletions backend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package backend
import (
"context"
"fmt"
"github.com/relab/hotstuff/modules"
"net"
"strconv"

"github.com/relab/hotstuff/eventloop"
"github.com/relab/hotstuff/logging"
"github.com/relab/hotstuff/modules"

"github.com/relab/gorums"
"github.com/relab/hotstuff"
"github.com/relab/hotstuff/internal/proto/hotstuffpb"
Expand All @@ -20,22 +23,30 @@ import (
// Server is the Server-side of the gorums backend.
// It is responsible for calling handler methods on the consensus instance.
type Server struct {
mods *modules.ConsensusCore
blockChain modules.BlockChain
configuration modules.Configuration
eventLoop *eventloop.EventLoop
logger logging.Logger

gorumsSrv *gorums.Server
}

// InitModule gives the module a reference to the ConsensusCore object.
// It also allows the module to set module options using the OptionsBuilder.
func (srv *Server) InitModule(mods *modules.ConsensusCore, _ *modules.OptionsBuilder) {
srv.mods = mods
// InitModule initializes the Server.
func (srv *Server) InitModule(mods *modules.Core) {
mods.Get(
&srv.eventLoop,
&srv.configuration,
&srv.blockChain,
&srv.logger,
)
}

// NewServer creates a new Server.
func NewServer(opts ...gorums.ServerOption) *Server {
srv := &Server{}

opts = append(opts, gorums.WithConnectCallback(func(ctx context.Context) {
srv.mods.EventLoop().AddEvent(replicaConnected{ctx})
srv.eventLoop.AddEvent(replicaConnected{ctx})
}))

srv.gorumsSrv = gorums.NewServer(opts...)
Expand Down Expand Up @@ -64,7 +75,7 @@ func (srv *Server) StartOnListener(listener net.Listener) {
go func() {
err := srv.gorumsSrv.Serve(listener)
if err != nil {
srv.mods.Logger().Errorf("An error occurred while serving: %v", err)
srv.logger.Errorf("An error occurred while serving: %v", err)
}
}()
}
Expand Down Expand Up @@ -123,42 +134,42 @@ type serviceImpl struct {

// Propose handles a replica's response to the Propose QC from the leader.
func (impl *serviceImpl) Propose(ctx gorums.ServerCtx, proposal *hotstuffpb.Proposal) {
id, err := GetPeerIDFromContext(ctx, impl.srv.mods.Configuration())
id, err := GetPeerIDFromContext(ctx, impl.srv.configuration)
if err != nil {
impl.srv.mods.Logger().Infof("Failed to get client ID: %v", err)
impl.srv.logger.Infof("Failed to get client ID: %v", err)
return
}

proposal.Block.Proposer = uint32(id)
proposeMsg := hotstuffpb.ProposalFromProto(proposal)
proposeMsg.ID = id

impl.srv.mods.EventLoop().AddEvent(proposeMsg)
impl.srv.eventLoop.AddEvent(proposeMsg)
}

// Vote handles an incoming vote message.
func (impl *serviceImpl) Vote(ctx gorums.ServerCtx, cert *hotstuffpb.PartialCert) {
id, err := GetPeerIDFromContext(ctx, impl.srv.mods.Configuration())
id, err := GetPeerIDFromContext(ctx, impl.srv.configuration)
if err != nil {
impl.srv.mods.Logger().Infof("Failed to get client ID: %v", err)
impl.srv.logger.Infof("Failed to get client ID: %v", err)
return
}

impl.srv.mods.EventLoop().AddEvent(hotstuff.VoteMsg{
impl.srv.eventLoop.AddEvent(hotstuff.VoteMsg{
ID: id,
PartialCert: hotstuffpb.PartialCertFromProto(cert),
})
}

// NewView handles the leader's response to receiving a NewView rpc from a replica.
func (impl *serviceImpl) NewView(ctx gorums.ServerCtx, msg *hotstuffpb.SyncInfo) {
id, err := GetPeerIDFromContext(ctx, impl.srv.mods.Configuration())
id, err := GetPeerIDFromContext(ctx, impl.srv.configuration)
if err != nil {
impl.srv.mods.Logger().Infof("Failed to get client ID: %v", err)
impl.srv.logger.Infof("Failed to get client ID: %v", err)
return
}

impl.srv.mods.EventLoop().AddEvent(hotstuff.NewViewMsg{
impl.srv.eventLoop.AddEvent(hotstuff.NewViewMsg{
ID: id,
SyncInfo: hotstuffpb.SyncInfoFromProto(msg),
})
Expand All @@ -169,12 +180,12 @@ func (impl *serviceImpl) Fetch(ctx gorums.ServerCtx, pb *hotstuffpb.BlockHash) (
var hash hotstuff.Hash
copy(hash[:], pb.GetHash())

block, ok := impl.srv.mods.BlockChain().LocalGet(hash)
block, ok := impl.srv.blockChain.LocalGet(hash)
if !ok {
return nil, status.Errorf(codes.NotFound, "requested block was not found")
}

impl.srv.mods.Logger().Debugf("OnFetch: %.8s", hash)
impl.srv.logger.Debugf("OnFetch: %.8s", hash)

return hotstuffpb.BlockToProto(block), nil
}
Expand All @@ -183,11 +194,11 @@ func (impl *serviceImpl) Fetch(ctx gorums.ServerCtx, pb *hotstuffpb.BlockHash) (
func (impl *serviceImpl) Timeout(ctx gorums.ServerCtx, msg *hotstuffpb.TimeoutMsg) {
var err error
timeoutMsg := hotstuffpb.TimeoutMsgFromProto(msg)
timeoutMsg.ID, err = GetPeerIDFromContext(ctx, impl.srv.mods.Configuration())
timeoutMsg.ID, err = GetPeerIDFromContext(ctx, impl.srv.configuration)
if err != nil {
impl.srv.mods.Logger().Infof("Could not get ID of replica: %v", err)
impl.srv.logger.Infof("Could not get ID of replica: %v", err)
}
impl.srv.mods.EventLoop().AddEvent(timeoutMsg)
impl.srv.eventLoop.AddEvent(timeoutMsg)
}

type replicaConnected struct {
Expand Down
Loading

0 comments on commit 5196107

Please sign in to comment.