Skip to content

Commit

Permalink
core/filtermaps: nice log info during indexing/unindexing
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Oct 4, 2024
1 parent 455dd20 commit 2d8fc05
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 19 deletions.
39 changes: 25 additions & 14 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,22 @@ type blockchain interface {
// without the tree hashing and consensus changes:
// https://eips.ethereum.org/EIPS/eip-7745
type FilterMaps struct {
lock sync.RWMutex
db ethdb.KeyValueStore
closeCh chan struct{}
closeWg sync.WaitGroup
history, unindexLimit uint64
noHistory bool

Params
filterMapsRange
chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend
matchers map[*FilterMapsMatcherBackend]struct{}

// db and range are only modified by indexer under write lock; indexer can
// read them without a lock while matchers can access them under read lock
lock sync.RWMutex
db ethdb.KeyValueStore
filterMapsRange

matchers map[*FilterMapsMatcherBackend]struct{}

// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends
Expand All @@ -71,6 +75,11 @@ type FilterMaps struct {
lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint

startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool
startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time
lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time
ptrHeadUpdate, ptrTailExtend, ptrTailUnindex uint64

waitIdleCh chan chan bool
}

Expand Down Expand Up @@ -120,13 +129,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
}
params.deriveFields()
fm := &FilterMaps{
db: db,
chain: chain,
closeCh: make(chan struct{}),
waitIdleCh: make(chan chan bool),
history: history,
noHistory: noHistory,
Params: params,
db: db,
chain: chain,
closeCh: make(chan struct{}),
waitIdleCh: make(chan chan bool),
history: history,
noHistory: noHistory,
unindexLimit: unindexLimit,
Params: params,
filterMapsRange: filterMapsRange{
initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer,
Expand All @@ -151,13 +161,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
return fm
}

// Start starts the indexer.
func (f *FilterMaps) Start() {
f.closeWg.Add(2)
go f.removeBloomBits()
go f.updateLoop()
}

// Close ensures that the indexer is fully stopped before returning.
// Stop ensures that the indexer is fully stopped before returning.
func (f *FilterMaps) Stop() {
close(f.closeCh)
f.closeWg.Wait()
Expand All @@ -172,10 +183,10 @@ func (f *FilterMaps) reset() bool {
f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge()
f.lvPointerCache.Purge()
f.lock.Unlock()
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
f.lock.Unlock()
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
}

Expand Down
85 changes: 80 additions & 5 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
)

const (
startLvMap = 1 << 31 // map index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory
startLvMap = 1 << 31 // map index assigned to init block
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory
logFrequency = time.Second * 8 // log info frequency during long indexing/unindexing process
)

// updateLoop initializes and updates the log index structure according to the
Expand All @@ -44,7 +45,10 @@ func (f *FilterMaps) updateLoop() {
f.reset()
return
}

f.lock.Lock()
f.updateMapCache()
f.lock.Unlock()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp
} else {
Expand Down Expand Up @@ -198,6 +202,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
log.Info("Initialized log index", "head", head.Number.Uint64())
return true
}

Expand All @@ -209,6 +214,32 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
// indexer should exit and remaining parts of the old database will be removed
// at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
defer func() {
fmr := f.getRange()
if newHead.Hash() == fmr.headBlockHash {
if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
f.loggedHeadUpdate, f.startHeadUpdate = false, false
}
} else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-fmr.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now()
}
}

}()

if !f.startHeadUpdate {
f.lastLogHeadUpdate = time.Now()
f.startedHeadUpdate = f.lastLogHeadUpdate
f.startHeadUpdate = true
f.ptrHeadUpdate = f.getRange().headBlockNumber
}
// iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added
var (
Expand Down Expand Up @@ -305,14 +336,41 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
// tryExtendTail attempts to extend the log index backwards until the desired
// indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
defer func() {
fmr := f.getRange()
if fmr.tailBlockNumber <= tailTarget {
if f.loggedTailExtend {
log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
f.loggedTailExtend = false
}
}
}()

fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash

if !f.loggedTailExtend {
f.lastLogTailExtend = time.Now()
f.startedTailExtend = f.lastLogTailExtend
f.ptrTailExtend = fmr.tailBlockNumber
}

update := f.newUpdateBatch()
lastTailEpoch := update.tailEpoch()
for number > tailTarget && !stopFn() {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)

if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend {
log.Info("Reverse log indexing in progress", "history", update.headBlockNumber+1-update.tailBlockNumber,
"processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget,
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
f.loggedTailExtend = true
f.lastLogTailExtend = time.Now()
}

update = f.newUpdateBatch()
lastTailEpoch = tailEpoch
}
Expand All @@ -339,10 +397,27 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
// tryUnindexTail attempts to prune the log index tail until the desired indexed
// history length is achieved. Returns true if finished.
func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool {
if !f.loggedTailUnindex {
f.lastLogTailUnindex = time.Now()
f.startedTailUnindex = f.lastLogTailUnindex
f.ptrTailUnindex = f.getRange().tailBlockNumber
}
for {
if f.unindexTailEpoch(tailTarget) {
fmr := f.getRange()
log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
f.loggedTailUnindex = false
return true
}
if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex {
fmr := f.getRange()
log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex)))
f.loggedTailUnindex = true
f.lastLogTailUnindex = time.Now()
}
if stopFn() {
return false
}
Expand Down Expand Up @@ -402,6 +477,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
// by updating the tail pointers, except for targetLvPointer which is not changed
// yet as it marks the tail of the log index data stored in the database and
// therefore should be updated when map data is actually removed.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) {
// obtain target log value pointer
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
Expand Down Expand Up @@ -542,7 +618,6 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer)
}

// updatedRangeLength returns the lenght of the updated filter map range.
Expand Down
1 change: 1 addition & 0 deletions core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
// valid range with the current indexed range. This function should be called
// whenever a part of the log index has been removed, before adding new blocks
// to it.
// Note that this function assumes that the read lock is being held.
func (f *FilterMaps) updateMatchersValidRange() {
for fm := range f.matchers {
if !f.initialized {
Expand Down

0 comments on commit 2d8fc05

Please sign in to comment.