From 1628a5c23569b24b2ac11494eb90e79b4e2797cd Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:34:14 +0100 Subject: [PATCH] fix(concurrency): use atomic specific types instead of atomic helpers (#682) --- block/manager.go | 17 +++++++++-------- block/manager_test.go | 10 +++++----- block/pruning.go | 3 +-- block/retriever.go | 2 +- block/submit.go | 7 +++---- block/submit_test.go | 18 +++++++++--------- da/local/local.go | 13 +++++++------ settlement/grpc/grpc.go | 16 ++++++++-------- store/store.go | 4 ++-- 9 files changed, 45 insertions(+), 45 deletions(-) diff --git a/block/manager.go b/block/manager.go index 043a2a75a..9961cda47 100644 --- a/block/manager.go +++ b/block/manager.go @@ -53,13 +53,14 @@ type Manager struct { // Synchronization syncTargetDiode diodes.Diode - syncTarget uint64 - isSyncedCond sync.Cond + + syncTarget atomic.Uint64 + isSyncedCond sync.Cond // Block production shouldProduceBlocksCh chan bool produceEmptyBlockCh chan bool - lastSubmissionTime int64 + lastSubmissionTime atomic.Int64 batchInProcess atomic.Value produceBlockMutex sync.Mutex applyCachedBlockMutex sync.Mutex @@ -181,18 +182,18 @@ func (m *Manager) syncBlockManager(ctx context.Context) error { // Since we requested the latest batch and got batch not found it means // the SL still hasn't got any batches for this chain. m.logger.Info("No batches for chain found in SL. Start writing first batch") - atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1)) + m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1)) return nil } return err } - atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight) + m.syncTarget.Store(resultRetrieveBatch.EndHeight) err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight) if err != nil { return err } - m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget)) + m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load()) return nil } @@ -200,8 +201,8 @@ func (m *Manager) syncBlockManager(ctx context.Context) error { func (m *Manager) updateSyncParams(endHeight uint64) { types.RollappHubHeightGauge.Set(float64(endHeight)) m.logger.Info("Received new syncTarget", "syncTarget", endHeight) - atomic.StoreUint64(&m.syncTarget, endHeight) - atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano()) + m.syncTarget.Store(endHeight) + m.lastSubmissionTime.Store(time.Now().UnixNano()) } func getAddress(key crypto.PrivKey) ([]byte, error) { diff --git a/block/manager_test.go b/block/manager_test.go index 605034944..487426d18 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "errors" - "sync/atomic" "testing" "time" @@ -118,7 +117,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) { require.NotNil(t, manager) t.Log("Taking the manager out of sync by submitting a batch") - syncTarget := atomic.LoadUint64(&manager.syncTarget) + + syncTarget := manager.syncTarget.Load() numBatchesToAdd := 2 nextBatchStartHeight := syncTarget + 1 var batch *types.Batch @@ -135,7 +135,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { } // Initially sync target is 0 - assert.True(t, manager.syncTarget == 0) + assert.Zero(t, manager.syncTarget.Load()) assert.True(t, manager.store.Height() == 0) // enough time to sync and produce blocks @@ -150,7 +150,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.NoError(t, err, "Manager start should not produce an error") }() <-ctx.Done() - assert.True(t, manager.syncTarget == batch.EndHeight) + assert.Equal(t, batch.EndHeight, manager.syncTarget.Load()) // validate that we produced blocks assert.Greater(t, manager.store.Height(), batch.EndHeight) } @@ -440,7 +440,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) { } // Call createNextDABatch function - startHeight := atomic.LoadUint64(&manager.syncTarget) + 1 + startHeight := manager.syncTarget.Load() + 1 endHeight := startHeight + uint64(tc.blocksToProduce) - 1 batch, err := manager.createNextDABatch(startHeight, endHeight) assert.NoError(err) diff --git a/block/pruning.go b/block/pruning.go index a3109ed65..75a98043b 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -2,11 +2,10 @@ package block import ( "fmt" - "sync/atomic" ) func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) { - syncTarget := atomic.LoadUint64(&m.syncTarget) + syncTarget := m.syncTarget.Load() if retainHeight > int64(syncTarget) { return 0, fmt.Errorf("cannot prune uncommitted blocks") diff --git a/block/retriever.go b/block/retriever.go index 7be53e594..34ece1368 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -28,7 +28,7 @@ func (m *Manager) RetriveLoop(ctx context.Context) { } // Check if after we sync we are synced or a new syncTarget was already set. // If we are synced then signal all goroutines waiting on isSyncedCond. - if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) { + if m.store.Height() >= m.syncTarget.Load() { m.logger.Info("Synced at height", "height", m.store.Height()) m.isSyncedCond.L.Lock() m.isSyncedCond.Signal() diff --git a/block/submit.go b/block/submit.go index 21cb98bba..9cce5c3e8 100644 --- a/block/submit.go +++ b/block/submit.go @@ -3,7 +3,6 @@ package block import ( "context" "fmt" - "sync/atomic" "time" "github.com/dymensionxyz/dymint/da" @@ -29,7 +28,7 @@ func (m *Manager) SubmitLoop(ctx context.Context) { func (m *Manager) handleSubmissionTrigger(ctx context.Context) { // SyncTarget is the height of the last block in the last batch as seen by this node. - syncTarget := atomic.LoadUint64(&m.syncTarget) + syncTarget := m.syncTarget.Load() height := m.store.Height() // no new blocks produced yet if height <= syncTarget { @@ -65,7 +64,7 @@ func (m *Manager) handleSubmissionTrigger(ctx context.Context) { func (m *Manager) submitNextBatch() (uint64, error) { // Get the batch start and end height - startHeight := atomic.LoadUint64(&m.syncTarget) + 1 + startHeight := m.syncTarget.Load() + 1 endHeight := uint64(m.store.Height()) // Create the batch @@ -112,7 +111,7 @@ func (m *Manager) submitNextBatch() (uint64, error) { } func (m *Manager) validateBatch(batch *types.Batch) error { - syncTarget := atomic.LoadUint64(&m.syncTarget) + syncTarget := m.syncTarget.Load() if batch.StartHeight != syncTarget+1 { return fmt.Errorf("batch start height != syncTarget + 1. StartHeight %d, m.syncTarget %d", batch.StartHeight, syncTarget) } diff --git a/block/submit_test.go b/block/submit_test.go index da6401f37..059bce03e 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -41,17 +41,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) { initialHeight := uint64(0) require.Zero(manager.store.Height()) require.True(manager.batchInProcess.Load() == false) - require.Zero(manager.syncTarget) + require.Zero(manager.syncTarget.Load()) // Produce block and validate that we produced blocks err = manager.produceBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.store.Height(), initialHeight) - assert.Zero(t, manager.syncTarget) + assert.Zero(t, manager.syncTarget.Load()) // submit and validate sync target manager.handleSubmissionTrigger(ctx) - assert.EqualValues(t, 1, manager.syncTarget) + assert.EqualValues(t, 1, manager.syncTarget.Load()) } func TestBatchSubmissionFailedSubmission(t *testing.T) { @@ -88,23 +88,23 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { initialHeight := uint64(0) require.Zero(manager.store.Height()) require.True(manager.batchInProcess.Load() == false) - require.Zero(manager.syncTarget) + require.Zero(manager.syncTarget.Load()) // Produce block and validate that we produced blocks err = manager.produceBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.store.Height(), initialHeight) - assert.Zero(t, manager.syncTarget) + assert.Zero(t, manager.syncTarget.Load()) // try to submit, we expect failure mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once() manager.handleSubmissionTrigger(ctx) - assert.EqualValues(t, 0, manager.syncTarget) + assert.EqualValues(t, 0, manager.syncTarget.Load()) // try to submit again, we expect success mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() manager.handleSubmissionTrigger(ctx) - assert.EqualValues(t, 1, manager.syncTarget) + assert.EqualValues(t, 1, manager.syncTarget.Load()) } func TestBatchSubmissionAfterTimeout(t *testing.T) { @@ -142,7 +142,7 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) { require.Equal(initialHeight, manager.store.Height()) require.True(manager.batchInProcess.Load() == false) - require.True(manager.syncTarget == 0) + require.Zero(manager.syncTarget.Load()) var wg sync.WaitGroup mCtx, cancel := context.WithTimeout(context.Background(), runTime) @@ -162,5 +162,5 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) { <-mCtx.Done() wg.Wait() // Wait for all goroutines to finish - require.True(manager.syncTarget > 0) + require.True(manager.syncTarget.Load() > 0) } diff --git a/da/local/local.go b/da/local/local.go index 625bbe7f8..f3e074bb2 100644 --- a/da/local/local.go +++ b/da/local/local.go @@ -18,7 +18,7 @@ import ( type DataAvailabilityLayerClient struct { logger types.Logger dalcKV store.KVStore - daHeight uint64 + daHeight atomic.Uint64 config config } @@ -37,7 +37,7 @@ var ( func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalcKV store.KVStore, logger types.Logger, options ...da.Option) error { m.logger = logger m.dalcKV = dalcKV - m.daHeight = 1 + m.daHeight.Store(1) if len(config) > 0 { var err error m.config.BlockTime, err = time.ParseDuration(string(config)) @@ -77,7 +77,8 @@ func (m *DataAvailabilityLayerClient) GetClientType() da.Client { // This should create a transaction which (potentially) // triggers a state transition in the DA layer. func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch { - daHeight := atomic.LoadUint64(&m.daHeight) + daHeight := m.daHeight.Load() + m.logger.Debug("Submitting batch to DA layer", "start height", batch.StartHeight, "end height", batch.EndHeight, "da height", daHeight) blob, err := batch.MarshalBinary() @@ -94,7 +95,7 @@ func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}} } - atomic.StoreUint64(&m.daHeight, daHeight+1) + m.daHeight.Store(daHeight + 1) // guaranteed no ABA problem as submit batch is only called when the object is locked return da.ResultSubmitBatch{ BaseResult: da.BaseResult{ @@ -116,7 +117,7 @@ func (m *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu // RetrieveBatches returns block at given height from data availability layer. func (m *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch { - if daMetaData.Height >= atomic.LoadUint64(&m.daHeight) { + if daMetaData.Height >= m.daHeight.Load() { return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: "batch not found", Error: da.ErrBlobNotFound}} } @@ -160,5 +161,5 @@ func getKey(daHeight uint64, height uint64) []byte { func (m *DataAvailabilityLayerClient) updateDAHeight() { blockStep := rand.Uint64()%10 + 1 //#nosec - atomic.AddUint64(&m.daHeight, blockStep) + m.daHeight.Add(blockStep) } diff --git a/settlement/grpc/grpc.go b/settlement/grpc/grpc.go index ad4db8e51..0e9184cbe 100644 --- a/settlement/grpc/grpc.go +++ b/settlement/grpc/grpc.go @@ -58,22 +58,21 @@ func (m *LayerClient) Init(config settlement.Config, pubsub *pubsub.Server, logg return nil } -// HubClient implements The HubClient interface +var _ settlement.HubClient = (*HubGrpcClient)(nil) + type HubGrpcClient struct { ctx context.Context ProposerPubKey string slStateIndex uint64 logger types.Logger pubsub *pubsub.Server - latestHeight uint64 + latestHeight atomic.Uint64 conn *grpc.ClientConn sl slmock.MockSLClient stopchan chan struct{} refreshTime int } -var _ settlement.HubClient = &HubGrpcClient{} - func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger types.Logger) (*HubGrpcClient, error) { ctx := context.Background() @@ -113,18 +112,19 @@ func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger types. } logger.Debug("Starting grpc SL ", "index", slStateIndex) - return &HubGrpcClient{ + ret := &HubGrpcClient{ ctx: ctx, ProposerPubKey: proposer, logger: logger, pubsub: pubsub, - latestHeight: latestHeight, slStateIndex: slStateIndex, conn: conn, sl: client, stopchan: stopchan, refreshTime: config.SLGrpc.RefreshTime, - }, nil + } + ret.latestHeight.Store(latestHeight) + return ret, nil } func initConfig(conf settlement.Config) (proposer string, err error) { @@ -273,7 +273,7 @@ func (c *HubGrpcClient) saveBatch(batch *settlement.Batch) { } c.logger.Debug("Setting grpc SL Index to ", "index", setIndexReply.GetIndex()) // Save latest height in memory and in store - atomic.StoreUint64(&c.latestHeight, batch.EndHeight) + c.latestHeight.Store(batch.EndHeight) } func (c *HubGrpcClient) convertBatchtoSettlementBatch(batch *types.Batch, daResult *da.ResultSubmitBatch) *settlement.Batch { diff --git a/store/store.go b/store/store.go index 920c5af92..dd2d5ad90 100644 --- a/store/store.go +++ b/store/store.go @@ -48,7 +48,7 @@ func (s *DefaultStore) NewBatch() Batch { // SetHeight sets the height saved in the Store if it is higher than the existing height func (s *DefaultStore) SetHeight(height uint64) { - storeHeight := atomic.LoadUint64(&s.height) + storeHeight := s.Height() if height > storeHeight { _ = atomic.CompareAndSwapUint64(&s.height, storeHeight, height) } @@ -61,7 +61,7 @@ func (s *DefaultStore) Height() uint64 { // SetBase sets the height saved in the Store of the earliest block func (s *DefaultStore) SetBase(height uint64) { - baseHeight := atomic.LoadUint64(&s.baseHeight) + baseHeight := s.Base() if height > baseHeight { _ = atomic.CompareAndSwapUint64(&s.baseHeight, baseHeight, height) }