Skip to content

Commit

Permalink
use atomic type for syncTarget
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt committed Apr 15, 2024
1 parent 01ff0a4 commit d54ef67
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 58 deletions.
19 changes: 9 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type Manager struct {

// Synchronization
syncTargetDiode diodes.Diode
syncTarget uint64
isSyncedCond sync.Cond

syncTarget atomic.Uint64
isSyncedCond sync.Cond

// Block production
shouldProduceBlocksCh chan bool
Expand Down Expand Up @@ -86,7 +87,6 @@ func NewManager(
p2pClient *p2p.Client,
logger types.Logger,
) (*Manager, error) {

proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
m.logger.Info("Starting the block manager")

if isAggregator {
//make sure local signing key is the registered on the hub
// make sure local signing key is the registered on the hub
slProposerKey := m.settlementClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.proposerKey.GetPublic().Raw()
if !bytes.Equal(slProposerKey, localProposerKey) {
Expand Down Expand Up @@ -176,31 +176,31 @@ func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err != nil {
//TODO: separate between fresh rollapp and non-registred rollapp
// TODO: separate between fresh rollapp and non-registred rollapp
if err == settlement.ErrBatchNotFound {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1))
return nil
}
return err
}
atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight)
m.syncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load())
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
m.syncTarget.Store(endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
}

Expand All @@ -218,7 +218,6 @@ func (m *Manager) EventListener(ctx context.Context, isAggregator bool) {
if !isAggregator {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100)
}

}

func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
Expand Down
32 changes: 17 additions & 15 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -56,7 +55,8 @@ func TestInitialState(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", logger)
BoostrapTime: 30 * time.Second,
}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down Expand Up @@ -94,7 +94,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand All @@ -118,7 +117,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NotNil(t, manager)

t.Log("Taking the manager out of sync by submitting a batch")
syncTarget := atomic.LoadUint64(&manager.syncTarget)

syncTarget := manager.syncTarget.Load()
numBatchesToAdd := 2
nextBatchStartHeight := syncTarget + 1
var batch *types.Batch
Expand All @@ -134,11 +134,11 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
time.Sleep(time.Millisecond * 500)
}

//Initially sync target is 0
assert.True(t, manager.syncTarget == 0)
// Initially sync target is 0
assert.Zero(t, manager.syncTarget.Load())
assert.True(t, manager.store.Height() == 0)

//enough time to sync and produce blocks
// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.
Expand All @@ -150,8 +150,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err, "Manager start should not produce an error")
}()
<-ctx.Done()
assert.True(t, manager.syncTarget == batch.EndHeight)
//validate that we produced blocks
assert.Equal(t, batch.EndHeight, manager.syncTarget.Load())
// validate that we produced blocks
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}

Expand Down Expand Up @@ -376,8 +376,10 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: tc.AppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:],
}).Once()
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.produceBlock(context.Background(), true)
Expand Down Expand Up @@ -405,7 +407,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1000
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes //enough for 2 block, not enough for 10 blocks
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

Expand Down Expand Up @@ -438,7 +440,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

// Call createNextDABatch function
startHeight := atomic.LoadUint64(&manager.syncTarget) + 1
startHeight := manager.syncTarget.Load() + 1
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.createNextDABatch(startHeight, endHeight)
assert.NoError(err)
Expand All @@ -452,8 +454,8 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1)
assert.Less(batch.EndHeight, endHeight)

//validate next added block to batch would have been actually too big
//First relax the byte limit so we could proudce larger batch
// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.conf.BlockBatchMaxSizeBytes = 10 * manager.conf.BlockBatchMaxSizeBytes
newBatch, err := manager.createNextDABatch(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)
Expand Down
5 changes: 2 additions & 3 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package block

import (
"fmt"
"sync/atomic"
)

func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) {
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()

if retainHeight > int64(syncTarget) {
return 0, fmt.Errorf("cannot prune uncommitted blocks")
Expand All @@ -17,7 +16,7 @@ func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}

//TODO: prune state/indexer and state/txindexer??
// TODO: prune state/indexer and state/txindexer??

return pruned, nil
}
8 changes: 4 additions & 4 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *Manager) RetriveLoop(ctx context.Context) {
}
// Check if after we sync we are synced or a new syncTarget was already set.
// If we are synced then signal all goroutines waiting on isSyncedCond.
if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) {
if m.store.Height() >= m.syncTarget.Load() {
m.logger.Info("Synced at height", "height", m.store.Height())
m.isSyncedCond.L.Lock()
m.isSyncedCond.Signal()
Expand Down Expand Up @@ -115,9 +115,9 @@ func (m *Manager) fetchBatch(daMetaData *da.DASubmitMetaData) da.ResultRetrieveB
},
}
}
//batchRes.MetaData includes proofs necessary to open disputes with the Hub
// batchRes.MetaData includes proofs necessary to open disputes with the Hub
batchRes := m.retriever.RetrieveBatches(daMetaData)
//TODO(srene) : for invalid transactions there is no specific error code since it will need to be validated somewhere else for fraud proving.
//NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
// TODO(srene) : for invalid transactions there is no specific error code since it will need to be validated somewhere else for fraud proving.
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
return batchRes
}
15 changes: 7 additions & 8 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/dymensionxyz/dymint/da"
Expand All @@ -14,10 +13,10 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.conf.BatchSubmitMaxTime)
defer ticker.Stop()

//TODO: add submission trigger by batch size (should be signaled from the the block production)
// TODO: add submission trigger by batch size (should be signaled from the the block production)
for {
select {
//Context canceled
// Context canceled
case <-ctx.Done():
return
// trigger by time
Expand All @@ -29,9 +28,9 @@ func (m *Manager) SubmitLoop(ctx context.Context) {

func (m *Manager) handleSubmissionTrigger(ctx context.Context) {
// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()
height := m.store.Height()
//no new blocks produced yet
// no new blocks produced yet
if height <= syncTarget {
return
}
Expand Down Expand Up @@ -65,7 +64,7 @@ func (m *Manager) handleSubmissionTrigger(ctx context.Context) {

func (m *Manager) submitNextBatch() (uint64, error) {
// Get the batch start and end height
startHeight := atomic.LoadUint64(&m.syncTarget) + 1
startHeight := m.syncTarget.Load() + 1
endHeight := uint64(m.store.Height())

// Create the batch
Expand Down Expand Up @@ -112,7 +111,7 @@ func (m *Manager) submitNextBatch() (uint64, error) {
}

func (m *Manager) validateBatch(batch *types.Batch) error {
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()
if batch.StartHeight != syncTarget+1 {
return fmt.Errorf("batch start height != syncTarget + 1. StartHeight %d, m.syncTarget %d", batch.StartHeight, syncTarget)
}
Expand Down Expand Up @@ -149,7 +148,7 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type
batch.Blocks = append(batch.Blocks, block)
batch.Commits = append(batch.Commits, commit)

//Check if the batch size is too big
// Check if the batch size is too big
totalSize := batch.ToProto().Size()
if totalSize > int(m.conf.BlockBatchMaxSizeBytes) {
// Nil out the last block and commit
Expand Down
32 changes: 14 additions & 18 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"sync"
Expand All @@ -13,8 +14,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/proxy"

"crypto/ed25519"

cosmosed25519 "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
"github.com/libp2p/go-libp2p/core/crypto"

Expand All @@ -24,9 +23,7 @@ import (
"github.com/dymensionxyz/dymint/types"
)

var (
ctx = context.Background()
)
var ctx = context.Background()

func TestBatchSubmissionHappyFlow(t *testing.T) {
require := require.New(t)
Expand All @@ -40,21 +37,21 @@ func TestBatchSubmissionHappyFlow(t *testing.T) {
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

//Check initial assertions
// Check initial assertions
initialHeight := uint64(0)
require.Zero(manager.store.Height())
require.True(manager.batchInProcess.Load() == false)
require.Zero(manager.syncTarget)
require.Zero(manager.syncTarget.Load())

// Produce block and validate that we produced blocks
err = manager.produceBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.store.Height(), initialHeight)
assert.Zero(t, manager.syncTarget)
assert.Zero(t, manager.syncTarget.Load())

// submit and validate sync target
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 1, manager.syncTarget)
assert.EqualValues(t, 1, manager.syncTarget.Load())
}

func TestBatchSubmissionFailedSubmission(t *testing.T) {
Expand Down Expand Up @@ -87,28 +84,27 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
manager, err := getManagerWithProposerKey(getManagerConfig(), lib2pPrivKey, mockLayerI, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

//Check initial assertions
// Check initial assertions
initialHeight := uint64(0)
require.Zero(manager.store.Height())
require.True(manager.batchInProcess.Load() == false)
require.Zero(manager.syncTarget)
require.Zero(manager.syncTarget.Load())

// Produce block and validate that we produced blocks
err = manager.produceBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.store.Height(), initialHeight)
assert.Zero(t, manager.syncTarget)
assert.Zero(t, manager.syncTarget.Load())

// try to submit, we expect failure
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("Failed to submit batch")).Once()
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 0, manager.syncTarget)
assert.EqualValues(t, 0, manager.syncTarget.Load())

// try to submit again, we expect success
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 1, manager.syncTarget)

assert.EqualValues(t, 1, manager.syncTarget.Load())
}

func TestBatchSubmissionAfterTimeout(t *testing.T) {
Expand Down Expand Up @@ -141,12 +137,12 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {
manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

//Check initial height
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.store.Height())
require.True(manager.batchInProcess.Load() == false)

require.True(manager.syncTarget == 0)
require.Zero(manager.syncTarget.Load())

var wg sync.WaitGroup
mCtx, cancel := context.WithTimeout(context.Background(), runTime)
Expand All @@ -166,5 +162,5 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {

<-mCtx.Done()
wg.Wait() // Wait for all goroutines to finish
require.True(manager.syncTarget > 0)
require.True(manager.syncTarget.Load() > 0)
}

0 comments on commit d54ef67

Please sign in to comment.