Skip to content

Commit

Permalink
Merge branch 'main' into mtsitrin/941-sequencer-rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin committed Aug 8, 2024
2 parents 07d5d70 + 01fc2d5 commit 16d2b6b
Show file tree
Hide file tree
Showing 47 changed files with 1,141 additions and 305 deletions.
8 changes: 4 additions & 4 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,23 @@ func (m *Manager) attemptApplyCachedBlocks() error {
for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight)
cachedBlock, blockExists := m.blockCache.Get(expectedHeight)
if !blockExists {
break
}
if err := m.validateBlockBeforeApply(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Info("Block applied", "height", expectedHeight)

m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ type Cache struct {
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c}
func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) DeleteBlockFromCache(h uint64) {
func (m *Cache) Delete(h uint64) {
delete(m.cache, h)
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) {
func (m *Cache) Get(h uint64) (types.CachedBlock, bool) {
ret, found := m.cache[h]
return ret, found
}

func (m *Cache) HasBlockInCache(h uint64) bool {
_, found := m.GetBlockFromCache(h)
func (m *Cache) Has(h uint64) bool {
_, found := m.Get(h)
return found
}

Expand Down
82 changes: 37 additions & 45 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -66,15 +65,15 @@ type Manager struct {
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet.
syncFromDaMu sync.Mutex
Retriever da.BatchRetriever
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache *Cache

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -102,18 +101,17 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
logger: logger,
Pubsub: pubsub,
P2PClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down Expand Up @@ -150,40 +148,32 @@ func (m *Manager) Start(ctx context.Context) error {
isSequencer := m.IsSequencer()
m.logger.Info("sequencer mode", "isSequencer", isSequencer)

eg, ctx := errgroup.WithContext(ctx)

/* ----------------------------- full node mode ----------------------------- */
if !isSequencer {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)

err := m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.RetrieveLoop(ctx)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SyncToTargetHeightLoop(ctx)
})
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
go func() {
_ = eg.Wait()
m.logger.Info("Block manager err group finished.")
err := m.syncFromSettlement()
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
}()

// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
return nil
}

/* ----------------------------- sequencer mode ----------------------------- */
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

eg, ctx := errgroup.WithContext(ctx)
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()

err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
// check if sequencer in the middle of rotation
next, err := m.SLClient.IsRotationInProgress()
if err != nil {
Expand Down Expand Up @@ -248,8 +238,8 @@ func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
err := m.UpdateBondedSequencerSetFromSL()
if err != nil {
return fmt.Errorf("update bonded sequencer set: %w", err)
Expand All @@ -262,12 +252,14 @@ func (m *Manager) syncBlockManager() error {
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -58,10 +59,11 @@ func TestInitialState(t *testing.T) {
// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, logger)
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
42 changes: 33 additions & 9 deletions block/gossip.go → block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,42 @@ import (
"context"
"fmt"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewGossipedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData, _ := event.Data().(p2p.GossipedBlock)
// onReceivedBlock receives a block received event from P2P, saves the block to a cache and tries to apply the blocks from the cache.
func (m *Manager) onReceivedBlock(event pubsub.Message) {
eventData, ok := event.Data().(p2p.P2PBlockEvent)
if !ok {
m.logger.Error("onReceivedBlock", "err", "wrong event data received")
return
}
var source types.BlockSource

if len(event.Events()[p2p.EventTypeKey]) != 1 {
m.logger.Error("onReceivedBlock", "err", "wrong number of event types received with the event", "received", len(event.Events()[p2p.EventTypeKey]))
return
}

switch event.Events()[p2p.EventTypeKey][0] {
case p2p.EventNewBlockSyncBlock:
source = types.BlockSync
case p2p.EventNewGossipedBlock:
source = types.Gossiped
default:
m.logger.Error("onReceivedBlock", "err", "wrong event type received", "type", event.Events()[p2p.EventTypeKey][0])
return
}

block := eventData.Block
commit := eventData.Commit
m.retrieverMu.Lock() // needed to protect blockCache access
height := block.Header.Height
m.retrieverMu.Lock() // needed to protect blockCache access

// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if m.blockCache.HasBlockInCache(height) {
if m.blockCache.Has(height) {
m.retrieverMu.Unlock()
return
}
Expand All @@ -30,7 +51,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {

nextHeight := m.State.NextHeight()
if height >= nextHeight {
m.blockCache.AddBlockToCache(height, &block, &commit)
m.blockCache.Add(height, &block, &commit, source)
}
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

Expand All @@ -40,8 +61,10 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
}

// gossipBlock sends created blocks by the sequencer to full-nodes using P2P gossipSub
func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
m.logger.Info("Gossipping block", "height", block.Header.Height)
gossipedBlock := p2p.P2PBlockEvent{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
Expand All @@ -51,5 +74,6 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}

return nil
}
3 changes: 2 additions & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -120,7 +121,7 @@ func (m *Manager) ProduceApplyGossipBlock(ctx context.Context, allowEmpty bool,
return nil, nil, fmt.Errorf("produce block: %w", err)
}

if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil {
if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced}); err != nil {
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

Expand Down
5 changes: 5 additions & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"context"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
Expand All @@ -14,6 +15,10 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error {
gerrc.ErrInvalidArgument)
}

err := m.P2PClient.RemoveBlocks(context.TODO(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning block-sync store", "retain_height", retainHeight, "err", err)
}
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
Expand Down
Loading

0 comments on commit 16d2b6b

Please sign in to comment.