Skip to content

Commit

Permalink
core/filtermaps: added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Oct 3, 2024
1 parent 5c17d79 commit 28cdf15
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 168 deletions.
1 change: 0 additions & 1 deletion core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type FilterMaps struct {
revertPoints map[uint64]*revertPoint

waitIdleCh chan chan bool
testHook func(int)
}

// filterMap is a full or partial in-memory representation of a filter map where
Expand Down
63 changes: 12 additions & 51 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,10 @@ const (
cachedRevertPoints = 64 // revert points for most recent blocks in memory
)

const (
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
testHookExtendTailEpoch
testHookExtendTail
testHookPruneTail
testHookPruneTailMaps
testHookRevert
testHookWait
testHookStop
)

// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
defer func() {
f.closeWg.Done()
if f.testHook != nil {
f.testHook(testHookStop)
}
}()
defer f.closeWg.Done()

if f.noHistory {
f.reset()
Expand Down Expand Up @@ -95,10 +77,6 @@ func (f *FilterMaps) updateLoop() {
if stop {
return
}
delay := time.Second * 20
if f.testHook != nil {
delay = 0
}
loop:
for {
select {
Expand All @@ -115,12 +93,9 @@ func (f *FilterMaps) updateLoop() {
continue loop
}
ch <- false
case <-time.After(delay):
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
if f.testHook != nil {
f.testHook(testHookWait)
}
}
break
}
Expand Down Expand Up @@ -184,6 +159,10 @@ func (f *FilterMaps) updateLoop() {
// WaitIdle blocks until the indexer is in an idle state while synced up to the
// latest chain head.
func (f *FilterMaps) WaitIdle() {
if f.noHistory {
f.closeWg.Wait()
return
}
for {
ch := make(chan bool)
f.waitIdleCh <- ch
Expand Down Expand Up @@ -219,9 +198,6 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookInit)
}
return true
}

Expand Down Expand Up @@ -295,16 +271,10 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
if update.updatedRangeLength() >= f.mapsPerEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHeadEpoch)
}
update = f.newUpdateBatch()
}
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookUpdateHead)
}
return true
}

Expand Down Expand Up @@ -342,9 +312,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
// limit the amount of data updated in a single batch
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTailEpoch)
}
update = f.newUpdateBatch()
lastTailEpoch = tailEpoch
}
Expand All @@ -365,9 +332,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash
}
f.applyUpdateBatch(update)
if f.testHook != nil {
f.testHook(testHookExtendTail)
}
return number <= tailTarget
}

Expand Down Expand Up @@ -406,9 +370,6 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
fmr.tailBlockLvPointer = targetLvPointer
f.setRange(f.db, fmr)
if f.testHook != nil {
f.testHook(testHookPruneTail)
}
}

// tryPruneTailMaps removes unused filter maps and corresponding log index
Expand Down Expand Up @@ -461,6 +422,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
batch := f.db.NewBatch()
for *removeLvPtr < nextBlockNumber {
f.deleteBlockLvPointer(batch, *removeLvPtr)
if (*removeLvPtr)%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, *removeLvPtr)
}
(*removeLvPtr)++
}
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
Expand All @@ -481,9 +445,6 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
if f.testHook != nil {
f.testHook(testHookPruneTailMaps)
}
}

// updateBatch is a memory overlay collecting changes to the index log structure
Expand Down Expand Up @@ -873,6 +834,9 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
}
for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber)
if blockNumber%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, blockNumber)
}
}
newRange := f.filterMapsRange
newRange.headLvPointer = lvPointer
Expand All @@ -882,8 +846,5 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
if f.testHook != nil {
f.testHook(testHookRevert)
}
return nil
}
Loading

0 comments on commit 28cdf15

Please sign in to comment.