Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): block gossiping misbehavior detection #1117

Open
wants to merge 29 commits into
base: srene/fix-test-p2p-validation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("save block responses: %w", err)
}

_, err = m.Store.SaveBlockSource(block.Header.Height, blockMetaData.Source.String(), nil)
if err != nil {
return fmt.Errorf("save block responses: %w", err)
}

// Commit block to app
appHash, retainHeight, err = m.Executor.Commit(m.State, block, responses)
if err != nil {
Expand Down
86 changes: 66 additions & 20 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"

uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

// Manager is responsible for aggregating transactions into blocks.
Expand Down Expand Up @@ -67,9 +69,8 @@
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
// Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet.
syncFromDaMu sync.Mutex
Retriever da.BatchRetriever

Retriever da.BatchRetriever
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache *Cache
Expand All @@ -85,6 +86,14 @@

// indexer
indexerService *txindex.IndexerService

syncingC chan struct{}

validateC chan struct{}

synced *uchannel.Nudger

validator *StateUpdateValidator
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -128,6 +137,9 @@
},
FraudHandler: nil, // TODO: create a default handler
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
syncingC: make(chan struct{}, 1),
validateC: make(chan struct{}, 1),
synced: uchannel.NewNudger(),
}

err = m.LoadStateOnInit(store, genesis, logger)
Expand All @@ -146,6 +158,8 @@
return nil, err
}

m.validator = NewStateUpdateValidator(m.logger, m)

return m, nil
}

Expand Down Expand Up @@ -175,21 +189,28 @@
return m.PruningLoop(ctx)
})

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SyncLoop(ctx)
})

err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
go func() {
err := m.syncFromSettlement()
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
}()

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.ValidateLoop(ctx)
})
go uevent.MustSubscribe(ctx, m.Pubsub, "syncLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
go uevent.MustSubscribe(ctx, m.Pubsub, "validateLoop", settlement.EventQueryNewSettlementBatchFinalized, m.onNewStateUpdateFinalized, m.logger)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)

return nil
}

Expand All @@ -198,11 +219,10 @@
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
m.DAClient.WaitForSyncing()

// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSyncing()
// check if sequencer in the middle of rotation
nextSeqAddr, missing, err := m.MissingLastBatch()
if err != nil {
Expand Down Expand Up @@ -265,9 +285,10 @@

// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
// Update sequencers list from SL
err := m.UpdateSequencerSetFromSL()
if err != nil {
return fmt.Errorf("update bonded sequencer set: %w", err)
m.logger.Error("update bonded sequencer set", "error", err)
}

res, err := m.SLClient.GetLatestBatch()
Expand All @@ -282,14 +303,21 @@
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}

m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)

// get the latest finalized height to know from where to start validating
err = m.UpdateFinalizedHeight()
if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
// try to sync to last state update submitted on startup
m.triggerStateUpdateSyncing()
// try to validate all pending state updates on startup
m.triggerStateUpdateValidation()

return nil
}

Expand All @@ -306,6 +334,24 @@
}
}

// UpdateFinalizedHeight retrieves the latest finalized batch and updates validation height with it
func (m *Manager) UpdateFinalizedHeight() error {
res, err := m.SLClient.GetLatestFinalizedBatch()
if err != nil && !errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
return fmt.Errorf("getting finalized height. err: %w", err)
}
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No finalized batches for chain found in SL.")
m.State.SetLastValidatedHeight(0)
} else {
// update validation height with latest finalized height (it will be updated only of finalized height is higher)
m.State.SetLastValidatedHeight(res.EndHeight)
}
return nil
}

// ValidateConfigWithRollappParams checks the configuration params are consistent with the params in the dymint state (e.g. DA and version)
func (m *Manager) ValidateConfigWithRollappParams() error {
if version.Commit != m.State.RollappParams.Version {
Expand Down
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NoError(t, err)

manager, err := testutil.GetManagerWithProposerKey(chainId, testutil.GetManagerConfig(), proposerKey, nil, 1, 1, 0, proxyApp, nil)
//manager, err := testutil.GetManager(chainId, testutil.GetManagerConfig(), nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
require.NotNil(t, manager)

Expand Down Expand Up @@ -387,7 +386,6 @@ func TestApplyLocalBlock_WithFraudCheck(t *testing.T) {
}()
<-ctx.Done()
assert.Equal(t, batchs[1].EndHeight(), manager.LastSubmittedHeight.Load())

mockExecutor.AssertExpectations(t)
mockFraudHandler.AssertExpectations(t)
}
Expand All @@ -404,6 +402,7 @@ func TestRetrieveDaBatchesFailed(t *testing.T) {
Client: da.Mock,
Height: 1,
}

err = manager.ProcessNextDABatch(daMetaData)
t.Log(err)
assert.ErrorIs(t, err, da.ErrBlobNotFound)
Expand Down Expand Up @@ -746,6 +745,7 @@ func TestDAFetch(t *testing.T) {
LastBlockHeight: int64(batch.EndHeight()),
LastBlockAppHash: commitHash[:],
})

err := manager.ProcessNextDABatch(c.daMetaData)
require.Equal(c.err, err)
})
Expand Down
8 changes: 4 additions & 4 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

// PruneBlocks prune all block related data from dymint store up to (but not including) retainHeight. It returns the number of blocks pruned, used for testing.
func (m *Manager) PruneBlocks(retainHeight uint64) (uint64, error) {
nextSubmissionHeight := m.NextHeightToSubmit()
if m.IsProposer() && nextSubmissionHeight < retainHeight { // do not delete anything that we might submit in future
m.logger.Debug("cannot prune blocks before they have been submitted. using height last submitted height for pruning", "retain_height", retainHeight, "height_to_submit", m.NextHeightToSubmit())
retainHeight = nextSubmissionHeight
if m.IsProposer() { // do not delete anything that we might submit in future
retainHeight = max(m.NextHeightToSubmit(), retainHeight)
} else { // do not delete anything that is not validated yet
retainHeight = max(m.State.NextValidationHeight(), retainHeight)
}

// prune blocks from blocksync store
Expand Down
75 changes: 3 additions & 72 deletions block/retriever.go
Original file line number Diff line number Diff line change
@@ -1,94 +1,25 @@
package block

import (
"context"
"errors"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewStateUpdate will try to sync to new height, if not already synced
func (m *Manager) onNewStateUpdate(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted)
if !ok {
m.logger.Error("onReceivedBatch", "err", "wrong event data received")
return
}
h := eventData.EndHeight
m.UpdateTargetHeight(h)
err := m.syncToTargetHeight(h)
if err != nil {
m.logger.Error("sync until target", "err", err)
}
}

// syncToTargetHeight syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
defer m.syncFromDaMu.Unlock()
m.syncFromDaMu.Lock()
for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() {
// if we have the block locally, we don't need to fetch it from the DA
err := m.applyLocalBlock(currH)
if err == nil {
m.logger.Info("Synced from local", "store height", currH, "target height", targetHeight)
continue
}
if !errors.Is(err, gerrc.ErrNotFound) {
m.logger.Error("Apply local block", "err", err)
}

err = m.syncFromDABatch()
if err != nil {
return fmt.Errorf("process next DA batch: %w", err)
}

// if height havent been updated, we are stuck
if m.State.NextHeight() == currH {
return fmt.Errorf("stuck at height %d", currH)
}
m.logger.Info("Synced from DA", "store height", m.State.Height(), "target height", targetHeight)

}

err := m.attemptApplyCachedBlocks()
if err != nil {
uevent.MustPublish(context.TODO(), m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.logger.Error("Attempt apply cached blocks.", "err", err)
}

return nil
}

func (m *Manager) syncFromDABatch() error {
// It's important that we query the state index before fetching the batch, rather
// than e.g. keep it and increment it, because we might be concurrently applying blocks
// and may require a higher index than expected.
res, err := m.SLClient.GetHeightState(m.State.NextHeight())
if err != nil {
return fmt.Errorf("retrieve state: %w", err)
}
stateIndex := res.State.StateIndex

settlementBatch, err := m.SLClient.GetBatchAtIndex(stateIndex)
settlementBatch, err := m.SLClient.GetBatchAtHeight(m.State.NextHeight())
if err != nil {
return fmt.Errorf("retrieve batch: %w", err)
}
m.logger.Info("Retrieved batch.", "state_index", stateIndex)
m.logger.Info("Retrieved batch.", "state_index", settlementBatch.StateIndex)

// update the proposer when syncing from the settlement layer
proposer := m.State.Sequencers.GetByAddress(settlementBatch.Batch.Sequencer)
if proposer == nil {
return fmt.Errorf("proposer not found: batch: %d: %s", stateIndex, settlementBatch.Batch.Sequencer)
return fmt.Errorf("proposer not found: batch: %d: %s", settlementBatch.StateIndex, settlementBatch.Batch.Sequencer)
}
m.State.Sequencers.SetProposer(proposer)

Expand Down
Loading
Loading