diff --git a/block/block.go b/block/block.go index a21ad6d22..40d52158b 100644 --- a/block/block.go +++ b/block/block.go @@ -121,12 +121,12 @@ func (m *Manager) attemptApplyCachedBlocks() error { for { expectedHeight := m.State.NextHeight() - cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight) + cachedBlock, blockExists := m.blockCache.Get(expectedHeight) if !blockExists { break } if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil { - m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height) + m.blockCache.Delete(cachedBlock.Block.Header.Height) // TODO: can we take an action here such as dropping the peer / reducing their reputation? return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err) } @@ -137,7 +137,7 @@ func (m *Manager) attemptApplyCachedBlocks() error { } m.logger.Info("Block applied", "height", expectedHeight) - m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height) + m.blockCache.Delete(cachedBlock.Block.Header.Height) } return nil diff --git a/block/block_cache.go b/block/block_cache.go index 4e8a188a0..b224f69fc 100644 --- a/block/block_cache.go +++ b/block/block_cache.go @@ -9,23 +9,23 @@ type Cache struct { cache map[uint64]types.CachedBlock } -func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) { +func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) { m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source} types.BlockCacheSizeGauge.Set(float64(m.Size())) } -func (m *Cache) DeleteBlockFromCache(h uint64) { +func (m *Cache) Delete(h uint64) { delete(m.cache, h) types.BlockCacheSizeGauge.Set(float64(m.Size())) } -func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) { +func (m *Cache) Get(h uint64) (types.CachedBlock, bool) { ret, found := m.cache[h] return ret, found } -func (m *Cache) HasBlockInCache(h uint64) bool { - _, found := m.GetBlockFromCache(h) +func (m *Cache) Has(h uint64) bool { + _, found := m.Get(h) return found } diff --git a/block/p2p.go b/block/p2p.go index 30d8e76b7..7de670b11 100644 --- a/block/p2p.go +++ b/block/p2p.go @@ -39,7 +39,7 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) { m.retrieverMu.Lock() // needed to protect blockCache access // It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks - if m.blockCache.HasBlockInCache(height) { + if m.blockCache.Has(height) { m.retrieverMu.Unlock() return } @@ -51,7 +51,7 @@ func (m *Manager) onReceivedBlock(event pubsub.Message) { nextHeight := m.State.NextHeight() if height >= nextHeight { - m.blockCache.AddBlockToCache(height, &block, &commit, source) + m.blockCache.Add(height, &block, &commit, source) } m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant diff --git a/block/retriever.go b/block/retriever.go index 50ce23324..e07bcfa61 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -136,7 +136,7 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { lastAppliedHeight = float64(block.Header.Height) - m.blockCache.DeleteBlockFromCache(block.Header.Height) + m.blockCache.Delete(block.Header.Height) } } types.LastReceivedDAHeightGauge.Set(lastAppliedHeight) diff --git a/block/submit.go b/block/submit.go index 4b714593f..138550797 100644 --- a/block/submit.go +++ b/block/submit.go @@ -176,14 +176,14 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight Commits: make([]*types.Commit, 0, batchSize), } - for height := startHeight; height <= endHeightInclusive; height++ { - block, err := m.Store.LoadBlock(height) + for h := startHeight; h <= endHeightInclusive; h++ { + block, err := m.Store.LoadBlock(h) if err != nil { - return nil, fmt.Errorf("load block: height: %d: %w", height, err) + return nil, fmt.Errorf("load block: h: %d: %w", h, err) } - commit, err := m.Store.LoadCommit(height) + commit, err := m.Store.LoadCommit(h) if err != nil { - return nil, fmt.Errorf("load commit: height: %d: %w", height, err) + return nil, fmt.Errorf("load commit: h: %d: %w", h, err) } batch.Blocks = append(batch.Blocks, block) @@ -196,8 +196,8 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight batch.Blocks = batch.Blocks[:len(batch.Blocks)-1] batch.Commits = batch.Commits[:len(batch.Commits)-1] - if height == startHeight { - return nil, fmt.Errorf("block size exceeds max batch size: height %d: size: %d: %w", height, totalSize, gerrc.ErrOutOfRange) + if h == startHeight { + return nil, fmt.Errorf("block size exceeds max batch size: h %d: size: %d: %w", h, totalSize, gerrc.ErrOutOfRange) } break } diff --git a/utils/event/funcs.go b/utils/event/funcs.go index ee925e07a..8b76b7ce0 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -24,9 +24,13 @@ func MustSubscribe( logger types.Logger, ) { subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery) - if err != nil && !errors.Is(err, context.Canceled) { - logger.Error("subscribe to events") - panic(err) + if err != nil { + err = fmt.Errorf("subscribe unbuffered: %w", err) + if !errors.Is(err, context.Canceled) { + logger.Error("Must subscribe.", "err", err) + panic(err) + } + return } for {