Skip to content

Commit

Permalink
feat(p2p): block sync protocol (#915)
Browse files Browse the repository at this point in the history
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Michael Tsitrin <[email protected]>
Co-authored-by: Omri <[email protected]>
Co-authored-by: Daniel T <[email protected]>
  • Loading branch information
5 people committed Aug 13, 2024
1 parent 5e141d5 commit ee9c621
Show file tree
Hide file tree
Showing 46 changed files with 1,121 additions and 280 deletions.
3 changes: 1 addition & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.PruneBlocks(uint64(retainHeight))
Expand Down Expand Up @@ -132,7 +131,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
Expand Down
4 changes: 2 additions & 2 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type Cache struct {
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c}
func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

Expand Down
76 changes: 39 additions & 37 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -66,15 +65,15 @@ type Manager struct {
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet.
syncFromDaMu sync.Mutex
Retriever da.BatchRetriever
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache *Cache

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -102,18 +101,17 @@ func NewManager(
}

m := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
logger: logger,
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down Expand Up @@ -148,37 +146,39 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

if !isSequencer {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

eg, ctx := errgroup.WithContext(ctx)

if isSequencer {
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- nBytes
return m.ProduceBlockLoop(ctx, bytesProducedC)
})

} else {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.RetrieveLoop(ctx)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SyncToTargetHeightLoop(ctx)
})
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
go func() {
err := m.syncFromSettlement()
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
}()

// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
}

go func() {
Expand Down Expand Up @@ -207,21 +207,23 @@ func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
res, err := m.SLClient.GetLatestBatch()
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -55,10 +56,11 @@ func TestInitialState(t *testing.T) {
// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, logger)
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
40 changes: 32 additions & 8 deletions block/gossip.go → block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,40 @@ import (
"context"
"fmt"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewGossipedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData, _ := event.Data().(p2p.GossipedBlock)
// onReceivedBlock receives a block received event from P2P, saves the block to a cache and tries to apply the blocks from the cache.
func (m *Manager) onReceivedBlock(event pubsub.Message) {
eventData, ok := event.Data().(p2p.P2PBlockEvent)
if !ok {
m.logger.Error("onReceivedBlock", "err", "wrong event data received")
return
}
var source types.BlockSource

if len(event.Events()[p2p.EventTypeKey]) != 1 {
m.logger.Error("onReceivedBlock", "err", "wrong number of event types received with the event", "received", len(event.Events()[p2p.EventTypeKey]))
return
}

switch event.Events()[p2p.EventTypeKey][0] {
case p2p.EventNewBlockSyncBlock:
source = types.BlockSync
case p2p.EventNewGossipedBlock:
source = types.Gossiped
default:
m.logger.Error("onReceivedBlock", "err", "wrong event type received", "type", event.Events()[p2p.EventTypeKey][0])
return
}

block := eventData.Block
commit := eventData.Commit
m.retrieverMu.Lock() // needed to protect blockCache access
height := block.Header.Height
m.retrieverMu.Lock() // needed to protect blockCache access

// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if m.blockCache.HasBlockInCache(height) {
m.retrieverMu.Unlock()
Expand All @@ -30,7 +51,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {

nextHeight := m.State.NextHeight()
if height >= nextHeight {
m.blockCache.AddBlockToCache(height, &block, &commit)
m.blockCache.AddBlockToCache(height, &block, &commit, source)
}
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

Expand All @@ -40,8 +61,10 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
}

// gossipBlock sends created blocks by the sequencer to full-nodes using P2P gossipSub
func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
m.logger.Info("Gossipping block", "height", block.Header.Height)
gossipedBlock := p2p.P2PBlockEvent{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
Expand All @@ -51,5 +74,6 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}

return nil
}
3 changes: 2 additions & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -181,7 +182,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
}
}

if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil {
if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced}); err != nil {
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

Expand Down
5 changes: 5 additions & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"context"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
Expand All @@ -14,6 +15,10 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error {
gerrc.ErrInvalidArgument)
}

err := m.p2pClient.RemoveBlocks(context.TODO(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning block-sync store", "retain_height", retainHeight, "err", err)
}
pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
Expand Down
41 changes: 19 additions & 22 deletions block/retriever.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
package block

import (
"context"
"errors"
"fmt"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// RetrieveLoop listens for new target sync heights and then syncs the chain by
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) (err error) {
m.logger.Info("Started retrieve loop.")
p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

for {
targetHeight := p.Next() // We only care about the latest one
if targetHeight == nil {
return
}

if err = m.syncToTargetHeight(*(*uint64)(targetHeight)); err != nil {
err = fmt.Errorf("sync until target: %w", err)
return
}
// onNewStateUpdate will try to sync to new height, if not already synced
func (m *Manager) onNewStateUpdate(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted)
if !ok {
m.logger.Error("onReceivedBatch", "err", "wrong event data received")
return
}
h := eventData.EndHeight
m.UpdateTargetHeight(h)
err := m.syncToTargetHeight(h)
if err != nil {
m.logger.Error("sync until target", "err", err)
}
}

// syncToTargetHeight syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
defer m.syncFromDaMu.Unlock()
m.syncFromDaMu.Lock()
for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() {
// if we have the block locally, we don't need to fetch it from the DA
err := m.applyLocalBlock(currH)
Expand Down Expand Up @@ -100,7 +97,7 @@ func (m *Manager) applyLocalBlock(height uint64) error {
}

m.retrieverMu.Lock()
err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDbBlock})
err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDb})
if err != nil {
return fmt.Errorf("apply block from local store: height: %d: %w", height, err)
}
Expand All @@ -113,7 +110,6 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daMetaData.Height)
batchResp := m.fetchBatch(daMetaData)
if batchResp.Code != da.StatusSuccess {
m.logger.Error("fetching batch from DA", batchResp.Message)
return batchResp.Error
}

Expand All @@ -132,7 +128,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Error("validate block from DA", "height", block.Header.Height, "err", err)
continue
}
err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DABlock, DAHeight: daMetaData.Height})

err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DA, DAHeight: daMetaData.Height})
if err != nil {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}
Expand Down
2 changes: 1 addition & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *Manager) UpdateStateFromApp() error {
}
vals, err := m.Store.LoadValidators(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
return errorsmod.Wrap(err, "load validators")
}

// update the state with the hash, last store height and last validators.
Expand Down
Loading

0 comments on commit ee9c621

Please sign in to comment.