Skip to content

Commit

Permalink
Unify engines
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Oct 10, 2024
1 parent dcfc4ab commit 04a18db
Show file tree
Hide file tree
Showing 28 changed files with 926 additions and 698 deletions.
282 changes: 197 additions & 85 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/api/health"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/engine/snowman/syncer"
"github.com/ava-labs/avalanchego/snow/engine/unified"
"github.com/ava-labs/avalanchego/snow/networking/handler"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
Expand Down Expand Up @@ -63,7 +65,6 @@ import (

p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p"
smcon "github.com/ava-labs/avalanchego/snow/consensus/snowman"
aveng "github.com/ava-labs/avalanchego/snow/engine/avalanche"
avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap"
avagetter "github.com/ava-labs/avalanchego/snow/engine/avalanche/getter"
smeng "github.com/ava-labs/avalanchego/snow/engine/snowman"
Expand Down Expand Up @@ -933,23 +934,13 @@ func (m *manager) createAvalancheChain(
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
ConnectedValidators: connectedValidators,
Params: consensusParams,
Consensus: snowmanConsensus,
}
var snowmanEngine common.Engine
snowmanEngine, err = smeng.New(snowmanEngineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
snowmanEngine = common.TraceEngine(snowmanEngine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -968,18 +959,6 @@ func (m *manager) createAvalancheChain(
DB: blockBootstrappingDB,
VM: vmWrappingProposerVM,
}
var snowmanBootstrapper common.BootstrapableEngine
snowmanBootstrapper, err = smbootstrap.New(
bootstrapCfg,
snowmanEngine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
snowmanBootstrapper = common.TraceBootstrapableEngine(snowmanBootstrapper, m.Tracer)
}

avaGetHandler, err := avagetter.New(
vtxManager,
Expand All @@ -993,12 +972,6 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err)
}

// create engine gear
avalancheEngine := aveng.New(ctx, avaGetHandler, linearizableVM)
if m.TracingEnabled {
avalancheEngine = common.TraceEngine(avalancheEngine, m.Tracer)
}

// create bootstrap gear
avalancheBootstrapperConfig := avbootstrap.Config{
AllGetsServer: avaGetHandler,
Expand All @@ -1016,32 +989,33 @@ func (m *manager) createAvalancheChain(
avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID
}

var avalancheBootstrapper common.BootstrapableEngine
avalancheBootstrapper, err = avbootstrap.New(
avalancheBootstrapperConfig,
snowmanBootstrapper.Start,
avalancheMetrics,
)
ef := &engineFactory{
tracingEnabled: m.TracingEnabled,
getServer: snowGetHandler,
avaAncestorGetter: avaGetHandler,
avaMetrics: avalancheMetrics,
tracer: m.Tracer,
bootConfig: bootstrapCfg,
avaBootConfig: avalancheBootstrapperConfig,
snowmanConfig: snowmanEngineConfig,
logger: ctx.Log,
}

ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE,
State: snow.Bootstrapping,
})

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err)
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
avalancheBootstrapper = common.TraceBootstrapableEngine(avalancheBootstrapper, m.Tracer)
engine = common.TraceEngine(ue, m.Tracer)
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: &handler.Engine{
StateSyncer: nil,
Bootstrapper: avalancheBootstrapper,
Consensus: avalancheEngine,
},
Snowman: &handler.Engine{
StateSyncer: nil,
Bootstrapper: snowmanBootstrapper,
Consensus: snowmanEngine,
},
})
h.SetEngine(engine)

// Register health check for this chain
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
Expand Down Expand Up @@ -1327,7 +1301,6 @@ func (m *manager) createSnowmanChain(
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Expand All @@ -1336,15 +1309,6 @@ func (m *manager) createSnowmanChain(
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
var engine common.Engine
engine, err = smeng.New(engineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
engine = common.TraceEngine(engine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -1364,18 +1328,6 @@ func (m *manager) createSnowmanChain(
VM: vm,
Bootstrapped: bootstrapFunc,
}
var bootstrapper common.BootstrapableEngine
bootstrapper, err = smbootstrap.New(
bootstrapCfg,
engine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
bootstrapper = common.TraceBootstrapableEngine(bootstrapper, m.Tracer)
}

// create state sync gear
stateSyncCfg, err := syncer.NewConfig(
Expand All @@ -1392,24 +1344,42 @@ func (m *manager) createSnowmanChain(
if err != nil {
return nil, fmt.Errorf("couldn't initialize state syncer configuration: %w", err)
}
stateSyncer := syncer.New(
stateSyncCfg,
bootstrapper.Start,
)

if m.TracingEnabled {
stateSyncer = common.TraceStateSyncer(stateSyncer, m.Tracer)
ef := &engineFactory{
tracingEnabled: m.TracingEnabled,
getServer: snowGetHandler,
avaAncestorGetter: invalidEngineAncestorsGetter{},
hasStateSync: hasStateSync(stateSyncCfg),
tracer: m.Tracer,
bootConfig: bootstrapCfg,
snowmanConfig: engineConfig,
stateSyncConfig: stateSyncCfg,
logger: ctx.Log,
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: nil,
Snowman: &handler.Engine{
StateSyncer: stateSyncer,
Bootstrapper: bootstrapper,
Consensus: engine,
},
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.StateSyncing,
})

if !ef.HasStateSync() {
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.Bootstrapping,
})
}

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
engine = common.TraceEngine(ue, m.Tracer)
}
h.SetEngine(engine)

// Register health checks
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err)
Expand All @@ -1423,6 +1393,17 @@ func (m *manager) createSnowmanChain(
}, nil
}

func hasStateSync(stateSyncCfg syncer.Config) bool {
var hasStateSync bool
ssVM, isStateSyncable := stateSyncCfg.VM.(block.StateSyncableVM)
if isStateSyncable {
if enabled, err := ssVM.StateSyncEnabled(context.Background()); err == nil && enabled {
hasStateSync = true
}
}
return hasStateSync
}

func (m *manager) IsBootstrapped(id ids.ID) bool {
m.chainsLock.Lock()
chain, exists := m.chains[id]
Expand Down Expand Up @@ -1581,3 +1562,134 @@ func (m *manager) getOrMakeVMRegisterer(vmID ids.ID, chainAlias string) (metrics
)
return chainReg, err
}

type engineFactory struct {
tracingEnabled bool
tracer trace.Tracer
hasStateSync bool
logger logging.Logger
// stateSync
stateSyncConfig syncer.Config
// bootstrap
bootConfig smbootstrap.Config
// snowman
snowmanConfig smeng.Config
getServer common.AllGetsServer
// avalanche
avaBootConfig avbootstrap.Config
avaAncestorGetter common.GetAncestorsHandler
// avalanche metrics
avaMetrics prometheus.Registerer
}

func (ef *engineFactory) ClearBootstrapDB() error {
return database.AtomicClear(ef.bootConfig.DB, ef.bootConfig.DB)
}

func (ef *engineFactory) NewAvalancheAncestorsGetter() common.GetAncestorsHandler {
return ef.avaAncestorGetter
}

func (ef *engineFactory) GetServer() common.AllGetsServer {
return ef.getServer
}

func (ef *engineFactory) HasStateSync() bool {
return ef.hasStateSync
}

func (ef *engineFactory) NewStateSyncer(f unified.OnFinishedFunc) (common.StateSyncer, error) {
stateSyncer := syncer.New(ef.stateSyncConfig, f)

/* if ef.tracingEnabled {
stateSyncer = &tracedStateSyncer{
Enabler: common.NewTracedIsEnabled(stateSyncer, ef.tracer),
StateSyncer: stateSyncer,
}
}*/

return stateSyncer, nil
}

func (ef *engineFactory) NewAvalancheSyncer(f unified.OnFinishedFunc) (common.AvalancheBootstrapableEngine, error) {
var avalancheBootstrapper common.AvalancheBootstrapableEngine
var err error

avalancheBootstrapper, err = avbootstrap.New(
ef.avaBootConfig,
f,
ef.avaMetrics,
)
if err != nil {
ef.logger.Fatal("error initializing avalanche bootstrapper:", zap.Error(err))
return nil, err
}

/* if ef.tracingEnabled {
avalancheBootstrapper = &tracedClearer{
Clearer: common.NewTracedClearer(avalancheBootstrapper, ef.tracer),
AvalancheBootstrapableEngine: avalancheBootstrapper,
}
}*/

return avalancheBootstrapper, nil
}

func (ef *engineFactory) NewSnowman() (common.ConsensusEngine, error) {
snowmanEngine, err := smeng.New(ef.snowmanConfig)
if err != nil {
ef.logger.Fatal("error initializing snowman engine:", zap.Error(err))
return nil, err
}
return snowmanEngine, nil
}

func (ef *engineFactory) NewSnowBootstrapper(f unified.OnFinishedFunc) (common.BootstrapableEngine, error) {
var bootstrapper common.BootstrapableEngine
var err error
bootstrapper, err = smbootstrap.New(
ef.bootConfig,
f,
)
if err != nil {
ef.logger.Fatal("error initializing snowman bootstrapper:", zap.Error(err))
return nil, err
}

/* if ef.tracingEnabled {
bootstrapper = &tracedClearer{
Clearer: common.NewTracedClearer(bootstrapper, ef.tracer),
AvalancheBootstrapableEngine: bootstrapper,
AcceptedHandler: bootstrapper,
AcceptedFrontierHandler: bootstrapper,
}
}*/

return bootstrapper, nil
}

type invalidEngineAncestorsGetter struct{}

func (invalidEngineAncestorsGetter) GetAncestors(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, engineType p2ppb.EngineType) error {
return fmt.Errorf("this engine does not run %s", engineType)
}

type tracedStateSyncer struct {
common.Enabler
common.StateSyncer
}

func (tss *tracedStateSyncer) IsEnabled(ctx context.Context) (bool, error) {
return tss.Enabler.IsEnabled(ctx)
}

type tracedClearer struct {
common.Clearer
common.AvalancheBootstrapableEngine
common.AcceptedFrontierHandler
common.AcceptedHandler
}

func (tc *tracedClearer) Clear(ctx context.Context) error {
return tc.Clearer.Clear(ctx)
}
Loading

0 comments on commit 04a18db

Please sign in to comment.