Skip to content

Commit

Permalink
fix(concurrency): use atomic specific types instead of atomic helpers (
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored Apr 15, 2024
1 parent 2043c7f commit 1628a5c
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 45 deletions.
17 changes: 9 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ type Manager struct {

// Synchronization
syncTargetDiode diodes.Diode
syncTarget uint64
isSyncedCond sync.Cond

syncTarget atomic.Uint64
isSyncedCond sync.Cond

// Block production
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
lastSubmissionTime int64
lastSubmissionTime atomic.Int64
batchInProcess atomic.Value
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex
Expand Down Expand Up @@ -181,27 +182,27 @@ func (m *Manager) syncBlockManager(ctx context.Context) error {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1))
return nil
}
return err
}
atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight)
m.syncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load())
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
m.syncTarget.Store(endHeight)
m.lastSubmissionTime.Store(time.Now().UnixNano())
}

func getAddress(key crypto.PrivKey) ([]byte, error) {
Expand Down
10 changes: 5 additions & 5 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -118,7 +117,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NotNil(t, manager)

t.Log("Taking the manager out of sync by submitting a batch")
syncTarget := atomic.LoadUint64(&manager.syncTarget)

syncTarget := manager.syncTarget.Load()
numBatchesToAdd := 2
nextBatchStartHeight := syncTarget + 1
var batch *types.Batch
Expand All @@ -135,7 +135,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

// Initially sync target is 0
assert.True(t, manager.syncTarget == 0)
assert.Zero(t, manager.syncTarget.Load())
assert.True(t, manager.store.Height() == 0)

// enough time to sync and produce blocks
Expand All @@ -150,7 +150,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err, "Manager start should not produce an error")
}()
<-ctx.Done()
assert.True(t, manager.syncTarget == batch.EndHeight)
assert.Equal(t, batch.EndHeight, manager.syncTarget.Load())
// validate that we produced blocks
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

// Call createNextDABatch function
startHeight := atomic.LoadUint64(&manager.syncTarget) + 1
startHeight := manager.syncTarget.Load() + 1
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.createNextDABatch(startHeight, endHeight)
assert.NoError(err)
Expand Down
3 changes: 1 addition & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package block

import (
"fmt"
"sync/atomic"
)

func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) {
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()

if retainHeight > int64(syncTarget) {
return 0, fmt.Errorf("cannot prune uncommitted blocks")
Expand Down
2 changes: 1 addition & 1 deletion block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *Manager) RetriveLoop(ctx context.Context) {
}
// Check if after we sync we are synced or a new syncTarget was already set.
// If we are synced then signal all goroutines waiting on isSyncedCond.
if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) {
if m.store.Height() >= m.syncTarget.Load() {
m.logger.Info("Synced at height", "height", m.store.Height())
m.isSyncedCond.L.Lock()
m.isSyncedCond.Signal()
Expand Down
7 changes: 3 additions & 4 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/dymensionxyz/dymint/da"
Expand All @@ -29,7 +28,7 @@ func (m *Manager) SubmitLoop(ctx context.Context) {

func (m *Manager) handleSubmissionTrigger(ctx context.Context) {
// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()
height := m.store.Height()
// no new blocks produced yet
if height <= syncTarget {
Expand Down Expand Up @@ -65,7 +64,7 @@ func (m *Manager) handleSubmissionTrigger(ctx context.Context) {

func (m *Manager) submitNextBatch() (uint64, error) {
// Get the batch start and end height
startHeight := atomic.LoadUint64(&m.syncTarget) + 1
startHeight := m.syncTarget.Load() + 1
endHeight := uint64(m.store.Height())

// Create the batch
Expand Down Expand Up @@ -112,7 +111,7 @@ func (m *Manager) submitNextBatch() (uint64, error) {
}

func (m *Manager) validateBatch(batch *types.Batch) error {
syncTarget := atomic.LoadUint64(&m.syncTarget)
syncTarget := m.syncTarget.Load()
if batch.StartHeight != syncTarget+1 {
return fmt.Errorf("batch start height != syncTarget + 1. StartHeight %d, m.syncTarget %d", batch.StartHeight, syncTarget)
}
Expand Down
18 changes: 9 additions & 9 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) {
initialHeight := uint64(0)
require.Zero(manager.store.Height())
require.True(manager.batchInProcess.Load() == false)
require.Zero(manager.syncTarget)
require.Zero(manager.syncTarget.Load())

// Produce block and validate that we produced blocks
err = manager.produceBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.store.Height(), initialHeight)
assert.Zero(t, manager.syncTarget)
assert.Zero(t, manager.syncTarget.Load())

// submit and validate sync target
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 1, manager.syncTarget)
assert.EqualValues(t, 1, manager.syncTarget.Load())
}

func TestBatchSubmissionFailedSubmission(t *testing.T) {
Expand Down Expand Up @@ -88,23 +88,23 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
initialHeight := uint64(0)
require.Zero(manager.store.Height())
require.True(manager.batchInProcess.Load() == false)
require.Zero(manager.syncTarget)
require.Zero(manager.syncTarget.Load())

// Produce block and validate that we produced blocks
err = manager.produceBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.store.Height(), initialHeight)
assert.Zero(t, manager.syncTarget)
assert.Zero(t, manager.syncTarget.Load())

// try to submit, we expect failure
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once()
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 0, manager.syncTarget)
assert.EqualValues(t, 0, manager.syncTarget.Load())

// try to submit again, we expect success
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
manager.handleSubmissionTrigger(ctx)
assert.EqualValues(t, 1, manager.syncTarget)
assert.EqualValues(t, 1, manager.syncTarget.Load())
}

func TestBatchSubmissionAfterTimeout(t *testing.T) {
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {
require.Equal(initialHeight, manager.store.Height())
require.True(manager.batchInProcess.Load() == false)

require.True(manager.syncTarget == 0)
require.Zero(manager.syncTarget.Load())

var wg sync.WaitGroup
mCtx, cancel := context.WithTimeout(context.Background(), runTime)
Expand All @@ -162,5 +162,5 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {

<-mCtx.Done()
wg.Wait() // Wait for all goroutines to finish
require.True(manager.syncTarget > 0)
require.True(manager.syncTarget.Load() > 0)
}
13 changes: 7 additions & 6 deletions da/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type DataAvailabilityLayerClient struct {
logger types.Logger
dalcKV store.KVStore
daHeight uint64
daHeight atomic.Uint64
config config
}

Expand All @@ -37,7 +37,7 @@ var (
func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalcKV store.KVStore, logger types.Logger, options ...da.Option) error {
m.logger = logger
m.dalcKV = dalcKV
m.daHeight = 1
m.daHeight.Store(1)
if len(config) > 0 {
var err error
m.config.BlockTime, err = time.ParseDuration(string(config))
Expand Down Expand Up @@ -77,7 +77,8 @@ func (m *DataAvailabilityLayerClient) GetClientType() da.Client {
// This should create a transaction which (potentially)
// triggers a state transition in the DA layer.
func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch {
daHeight := atomic.LoadUint64(&m.daHeight)
daHeight := m.daHeight.Load()

m.logger.Debug("Submitting batch to DA layer", "start height", batch.StartHeight, "end height", batch.EndHeight, "da height", daHeight)

blob, err := batch.MarshalBinary()
Expand All @@ -94,7 +95,7 @@ func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}

atomic.StoreUint64(&m.daHeight, daHeight+1)
m.daHeight.Store(daHeight + 1) // guaranteed no ABA problem as submit batch is only called when the object is locked

return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Expand All @@ -116,7 +117,7 @@ func (m *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu

// RetrieveBatches returns block at given height from data availability layer.
func (m *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
if daMetaData.Height >= atomic.LoadUint64(&m.daHeight) {
if daMetaData.Height >= m.daHeight.Load() {
return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: "batch not found", Error: da.ErrBlobNotFound}}
}

Expand Down Expand Up @@ -160,5 +161,5 @@ func getKey(daHeight uint64, height uint64) []byte {

func (m *DataAvailabilityLayerClient) updateDAHeight() {
blockStep := rand.Uint64()%10 + 1 //#nosec
atomic.AddUint64(&m.daHeight, blockStep)
m.daHeight.Add(blockStep)
}
16 changes: 8 additions & 8 deletions settlement/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,21 @@ func (m *LayerClient) Init(config settlement.Config, pubsub *pubsub.Server, logg
return nil
}

// HubClient implements The HubClient interface
var _ settlement.HubClient = (*HubGrpcClient)(nil)

type HubGrpcClient struct {
ctx context.Context
ProposerPubKey string
slStateIndex uint64
logger types.Logger
pubsub *pubsub.Server
latestHeight uint64
latestHeight atomic.Uint64
conn *grpc.ClientConn
sl slmock.MockSLClient
stopchan chan struct{}
refreshTime int
}

var _ settlement.HubClient = &HubGrpcClient{}

func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger types.Logger) (*HubGrpcClient, error) {
ctx := context.Background()

Expand Down Expand Up @@ -113,18 +112,19 @@ func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger types.
}
logger.Debug("Starting grpc SL ", "index", slStateIndex)

return &HubGrpcClient{
ret := &HubGrpcClient{
ctx: ctx,
ProposerPubKey: proposer,
logger: logger,
pubsub: pubsub,
latestHeight: latestHeight,
slStateIndex: slStateIndex,
conn: conn,
sl: client,
stopchan: stopchan,
refreshTime: config.SLGrpc.RefreshTime,
}, nil
}
ret.latestHeight.Store(latestHeight)
return ret, nil
}

func initConfig(conf settlement.Config) (proposer string, err error) {
Expand Down Expand Up @@ -273,7 +273,7 @@ func (c *HubGrpcClient) saveBatch(batch *settlement.Batch) {
}
c.logger.Debug("Setting grpc SL Index to ", "index", setIndexReply.GetIndex())
// Save latest height in memory and in store
atomic.StoreUint64(&c.latestHeight, batch.EndHeight)
c.latestHeight.Store(batch.EndHeight)
}

func (c *HubGrpcClient) convertBatchtoSettlementBatch(batch *types.Batch, daResult *da.ResultSubmitBatch) *settlement.Batch {
Expand Down
4 changes: 2 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *DefaultStore) NewBatch() Batch {

// SetHeight sets the height saved in the Store if it is higher than the existing height
func (s *DefaultStore) SetHeight(height uint64) {
storeHeight := atomic.LoadUint64(&s.height)
storeHeight := s.Height()
if height > storeHeight {
_ = atomic.CompareAndSwapUint64(&s.height, storeHeight, height)
}
Expand All @@ -61,7 +61,7 @@ func (s *DefaultStore) Height() uint64 {

// SetBase sets the height saved in the Store of the earliest block
func (s *DefaultStore) SetBase(height uint64) {
baseHeight := atomic.LoadUint64(&s.baseHeight)
baseHeight := s.Base()
if height > baseHeight {
_ = atomic.CompareAndSwapUint64(&s.baseHeight, baseHeight, height)
}
Expand Down

0 comments on commit 1628a5c

Please sign in to comment.