Skip to content

Commit

Permalink
Follow up
Browse files Browse the repository at this point in the history
  • Loading branch information
zale144 committed Apr 15, 2024
2 parents 2abf5e6 + b766728 commit f84fff2
Show file tree
Hide file tree
Showing 88 changed files with 739 additions and 540 deletions.
25 changes: 12 additions & 13 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
if err != nil {
m.logger.Error("Failed to save block", "error", err)
m.logger.Error("save block", "error", err)
return err
}

responses, err := m.executeBlock(ctx, block, commit)
if err != nil {
m.logger.Error("Failed to execute block", "error", err)
m.logger.Error("execute block", "error", err)
return err
}

Expand Down Expand Up @@ -74,22 +74,22 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty

err = batch.Commit()
if err != nil {
m.logger.Error("Failed to persist batch to disk", "error", err)
m.logger.Error("persist batch to disk", "error", err)
return err
}

// Commit block to app
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("Failed to commit to the block", "error", err)
m.logger.Error("commit to the block", "error", err)
return err
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
if err != nil {
m.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
Expand All @@ -103,7 +103,7 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty

_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
m.logger.Error("update state", "error", err)
return err
}
m.lastState = newState
Expand All @@ -124,7 +124,7 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {

err := m.applyBlock(ctx, prevCachedBlock, m.prevCommit[m.store.Height()+1], blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply previously cached block", "err", err)
m.logger.Debug("apply previously cached block", "err", err)
return err
}
prevCachedBlock, exists = m.prevBlock[m.store.Height()+1]
Expand All @@ -145,7 +145,7 @@ func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bo
// Validate incosistency in height wasn't caused by a crash and if so handle it.
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return isRequired, errors.Wrap(err, "failed to get app info")
return isRequired, errors.Wrap(err, "get app info")
}
if uint64(proxyAppInfo.LastBlockHeight) != block.Header.Height {
return isRequired, nil
Expand All @@ -160,13 +160,13 @@ func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bo

resp, err := m.store.LoadBlockResponses(block.Header.Height)
if err != nil {
return isRequired, errors.Wrap(err, "failed to load block responses")
return isRequired, errors.Wrap(err, "load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
return isRequired, errors.Wrap(err, "failed to update state")
return isRequired, errors.Wrap(err, "update state")
}
m.store.SetHeight(block.Header.Height)
return isRequired, nil
Expand All @@ -193,13 +193,12 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
m.logger.Error("Failed to marshal block", "error", err)
m.logger.Error("marshal block", "error", err)
return err
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
m.logger.Error("Failed to gossip block", "error", err)
m.logger.Error("gossip block", "error", err)
return err
}
return nil

}
6 changes: 3 additions & 3 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
)

func (m *Manager) RunInitChain(ctx context.Context) error {
//get the proposer's consensus pubkey
// get the proposer's consensus pubkey
proposer := m.settlementClient.GetProposer()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposer.PublicKey)
if err != nil {
return err
}
gensisValSet := []*tmtypes.Validator{tmtypes.NewValidator(tmPubKey, 1)}

//call initChain with both addresses
// call initChain with both addresses
res, err := m.executor.InitChain(m.genesis, gensisValSet)
if err != nil {
return err
}

//update the state with only the consensus pubkey
// update the state with only the consensus pubkey
m.executor.UpdateStateAfterInitChain(&m.lastState, res, gensisValSet)
if _, err := m.store.UpdateState(m.lastState, nil); err != nil {
return err
Expand Down
41 changes: 19 additions & 22 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -52,14 +53,15 @@ type Manager struct {

// Synchronization
syncTargetDiode diodes.Diode
syncTarget uint64
isSyncedCond sync.Cond

syncTarget atomic.Uint64
isSyncedCond sync.Cond

// Block production
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
lastSubmissionTime int64
batchInProcess atomic.Value
lastSubmissionTime atomic.Int64
batchInProcess sync.Mutex
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

Expand All @@ -86,24 +88,20 @@ func NewManager(
p2pClient *p2p.Client,
logger types.Logger,
) (*Manager, error) {

proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
}

exec, err := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
if err != nil {
return nil, fmt.Errorf("failed to create block executor: %w", err)
return nil, fmt.Errorf("create block executor: %w", err)
}
s, err := getInitialState(store, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to get initial state: %w", err)
return nil, fmt.Errorf("get initial state: %w", err)
}

batchInProcess := atomic.Value{}
batchInProcess.Store(false)

agg := &Manager{
pubsub: pubsub,
p2pClient: p2pClient,
Expand All @@ -119,7 +117,6 @@ func NewManager(
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
syncTargetDiode: diodes.NewOneToOne(1, nil),
isSyncedCond: *sync.NewCond(new(sync.Mutex)),
batchInProcess: batchInProcess,
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
Expand Down Expand Up @@ -154,7 +151,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {

err := m.syncBlockManager(ctx)
if err != nil {
err = fmt.Errorf("failed to sync block manager: %w", err)
err = fmt.Errorf("sync block manager: %w", err)
return err
}

Expand All @@ -176,32 +173,32 @@ func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err != nil {
// TODO: separate between fresh rollapp and non-registred rollapp
if err == settlement.ErrBatchNotFound {
// TODO: separate between fresh rollapp and non-registered rollapp
if errors.Is(err, settlement.ErrBatchNotFound) {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1))
return nil
}
return err
}
atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight)
m.syncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load())
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
m.syncTarget.Store(endHeight)
m.lastSubmissionTime.Store(time.Now().UnixNano())
}

func getAddress(key crypto.PrivKey) ([]byte, error) {
Expand Down Expand Up @@ -242,12 +239,12 @@ func (m *Manager) applyBlockCallback(event pubsub.Message) {
} else {
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
m.logger.Debug("apply block", "err", err)
}
}
err := m.attemptApplyCachedBlocks(context.Background())
if err != nil {
m.logger.Debug("Failed to apply previous cached blocks", "err", err)
m.logger.Debug("apply previous cached blocks", "err", err)
}
}

Expand All @@ -259,7 +256,7 @@ func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultR
// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger types.Logger) (types.State, error) {
s, err := store.LoadState()
if err == types.ErrNoStateFound {
if errors.Is(err, types.ErrNoStateFound) {
logger.Info("failed to find state in the store, creating new state from genesis")
return types.NewFromGenesisDoc(genesis)
}
Expand Down
32 changes: 17 additions & 15 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -56,7 +55,8 @@ func TestInitialState(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", logger)
BoostrapTime: 30 * time.Second,
}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down Expand Up @@ -94,7 +94,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand All @@ -118,7 +117,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NotNil(t, manager)

t.Log("Taking the manager out of sync by submitting a batch")
syncTarget := atomic.LoadUint64(&manager.syncTarget)

syncTarget := manager.syncTarget.Load()
numBatchesToAdd := 2
nextBatchStartHeight := syncTarget + 1
var batch *types.Batch
Expand All @@ -134,11 +134,11 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
time.Sleep(time.Millisecond * 500)
}

//Initially sync target is 0
assert.True(t, manager.syncTarget == 0)
// Initially sync target is 0
assert.Zero(t, manager.syncTarget.Load())
assert.True(t, manager.store.Height() == 0)

//enough time to sync and produce blocks
// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.
Expand All @@ -150,8 +150,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err, "Manager start should not produce an error")
}()
<-ctx.Done()
assert.True(t, manager.syncTarget == batch.EndHeight)
//validate that we produced blocks
assert.Equal(t, batch.EndHeight, manager.syncTarget.Load())
// validate that we produced blocks
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}

Expand Down Expand Up @@ -376,8 +376,10 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: tc.AppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:],
}).Once()
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.produceBlock(context.Background(), true)
Expand Down Expand Up @@ -405,7 +407,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1000
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes //enough for 2 block, not enough for 10 blocks
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

Expand Down Expand Up @@ -438,7 +440,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

// Call createNextDABatch function
startHeight := atomic.LoadUint64(&manager.syncTarget) + 1
startHeight := manager.syncTarget.Load() + 1
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.createNextDABatch(startHeight, endHeight)
assert.NoError(err)
Expand All @@ -452,8 +454,8 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1)
assert.Less(batch.EndHeight, endHeight)

//validate next added block to batch would have been actually too big
//First relax the byte limit so we could proudce larger batch
// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.conf.BlockBatchMaxSizeBytes = 10 * manager.conf.BlockBatchMaxSizeBytes
newBatch, err := manager.createNextDABatch(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)
Expand Down
Loading

0 comments on commit f84fff2

Please sign in to comment.