Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multiverse RPC]: add better universe root node cache #1169

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
179 changes: 158 additions & 21 deletions tapdb/multiverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type BatchedMultiverse interface {
type MultiverseStore struct {
db BatchedMultiverse

syncerCache *syncerRootNodeCache

rootNodeCache *rootNodeCache

proofCache *universeProofCache
Expand All @@ -125,6 +127,7 @@ type MultiverseStore struct {
func NewMultiverseStore(db BatchedMultiverse) *MultiverseStore {
return &MultiverseStore{
db: db,
syncerCache: newSyncerRootNodeCache(),
rootNodeCache: newRootNodeCache(),
proofCache: newUniverseProofCache(),
leafKeysCache: newUniverseLeafPageCache(),
Expand Down Expand Up @@ -200,20 +203,49 @@ func (b *MultiverseStore) MultiverseRootNode(ctx context.Context,
func (b *MultiverseStore) UniverseRootNode(ctx context.Context,
id universe.Identifier) (universe.Root, error) {

// First, we'll check the root node cache to see if we already have it.
rootNode := b.rootNodeCache.fetchRoot(id)
// For an individual universe root node, we always fetch it from the
// syncer cache, as that should have all root nodes that are currently
// known. We never update the syncer cache on a cache miss of a single
// root node, as that shouldn't happen (unless the cache is empty).
rootNode := b.syncerCache.fetchRoot(id)
if rootNode != nil {
return *rootNode, nil
}

b.rootNodeCache.Lock()
defer b.rootNodeCache.Unlock()
// If the cache is still empty, we'll populate it now.
if b.syncerCache.empty() {
// We attempt to acquire the write lock to fill the cache. If
// another goroutine is already filling the cache, we'll wait
// for it to finish that way.
b.syncerCache.Lock()
defer b.syncerCache.Unlock()

// Because another goroutine might have filled the cache while
// we were waiting for the lock, we'll check again if the item
// is now in the cache.
rootNode = b.syncerCache.fetchRoot(id)
if rootNode != nil {
return *rootNode, nil
}

// Check to see if the cache was populated while we were waiting for
// the lock.
rootNode = b.rootNodeCache.fetchRoot(id)
if rootNode != nil {
return *rootNode, nil
// Populate the cache with all the root nodes.
err := b.fillSyncerCache(ctx)
if err != nil {
return universe.Root{}, fmt.Errorf("error filling "+
"syncer cache: %w", err)
}

// We now try again to fetch the root node from the cache.
rootNode = b.syncerCache.fetchRoot(id)
if rootNode != nil {
return *rootNode, nil
}

// Still no luck with the cache (this should really never
// happen), so we'll go to the secondary cache or the disk to
// fetch it.
log.Warnf("Fetching root node from disk for id %v, cache miss "+
"even after filling the cache", id)
}

var universeRoot UniverseRoot
Expand Down Expand Up @@ -250,8 +282,6 @@ func (b *MultiverseStore) UniverseRootNode(ctx context.Context,
AssetName: universeRoot.AssetName,
}

b.rootNodeCache.cacheRoot(id, dbRoot)

return dbRoot, nil
}

Expand Down Expand Up @@ -302,7 +332,62 @@ func (b *MultiverseStore) UniverseLeafKeys(ctx context.Context,
func (b *MultiverseStore) RootNodes(ctx context.Context,
q universe.RootNodesQuery) ([]universe.Root, error) {

// Attempt to read directly from the root node cache.
// Is this a query for the syncer cache (ascending and
// WithAmountsById=false)? This cache is complete (all root nodes) and
// can be directly sliced into, but it doesn't have any amounts by ID
// and doesn't support descending order.
if isQueryForSyncerCache(q) {
// First, check to see if we have the root nodes cached in the
// syncer cache.
rootNodes := b.syncerCache.fetchRoots(q, false)
if len(rootNodes) > 0 {
return rootNodes, nil
}

// If the cache is still empty, we'll populate it now.
if b.syncerCache.empty() {
// We attempt to acquire the write lock to fill the
// cache. If another goroutine is already filling the
// cache, we'll wait for it to finish that way.
b.syncerCache.Lock()
defer b.syncerCache.Unlock()

// Because another goroutine might have filled the cache
// while we were waiting for the lock, we'll check again
// if the item is now in the cache.
rootNodes = b.syncerCache.fetchRoots(q, true)
if len(rootNodes) > 0 {
return rootNodes, nil
}

// Populate the cache with all the root nodes.
err := b.fillSyncerCache(ctx)
if err != nil {
return nil, fmt.Errorf("error filling syncer "+
"cache: %w", err)
}

// We now try again to fetch the root nodes page from
// the cache.
rootNodes = b.syncerCache.fetchRoots(q, true)
if len(rootNodes) > 0 {
return rootNodes, nil
}

// Still no luck with the cache (this should really
// never happen), so we'll go to the secondary cache or
// disk to fetch it.
log.Warnf("Fetching root nodes page from disk for "+
"query %v, cache miss even after filling the "+
"cache", q)
}
}

// Attempt to read directly from the root node cache next. This
// secondary cache only contains the last few pages of root nodes that
// were queried with parameters the syncer doesn't use. This might serve
// some UI requests where the first few pages are queried multiple
// times, so an LRU based cache that's smaller makes sense..
rootNodes := b.rootNodeCache.fetchRoots(q, false)
if len(rootNodes) > 0 {
log.Tracef("read %d root nodes from cache", len(rootNodes))
Expand Down Expand Up @@ -443,6 +528,44 @@ func (b *MultiverseStore) queryRootNodes(ctx context.Context,
return uniRoots, nil
}

// fillSyncerCache populates the syncer cache with all the root nodes that are
// currently known. This is used to quickly serve the syncer with the root nodes
// it needs to sync the multiverse.
//
// NOTE: This method must be called while holding the syncer cache lock.
func (b *MultiverseStore) fillSyncerCache(ctx context.Context) error {
now := time.Now()
log.Infof("Populating syncer root cache...")

params := sqlc.UniverseRootsParams{
SortDirection: sqlInt16(universe.SortAscending),
NumOffset: 0,
NumLimit: universe.MaxPageSize,
}

allRoots := make([]universe.Root, 0, numCachedProofs)
for {
newRoots, err := b.queryRootNodes(ctx, params, false)
if err != nil {
return err
}

allRoots = append(allRoots, newRoots...)
params.NumOffset += universe.MaxPageSize

if len(newRoots) < universe.MaxPageSize {
break
}
}

log.Debugf("Populating %v root nodes into syncer cache, took=%v",
len(allRoots), time.Since(now))

b.syncerCache.replaceCache(allRoots)

return nil
}

// FetchProofLeaf returns a proof leaf for the target key. If the key doesn't
// have a script key specified, then all the proof leafs for the outpoint will
// be returned. If neither are specified, then all inserted proof
Expand Down Expand Up @@ -625,6 +748,11 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context,
b.rootNodeCache.wipeCache()
b.proofCache.delProofsForAsset(id)
b.leafKeysCache.wipeCache(id.String())
b.syncerCache.addOrReplace(universe.Root{
ID: id,
AssetName: leaf.Asset.Tag,
Node: issuanceProof.MultiverseRoot,
})

// Notify subscribers about the new proof leaf, now that we're sure we
// have written it to the database. But we only care about transfer
Expand All @@ -643,30 +771,31 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
items []*universe.Item) error {

insertProof := func(item *universe.Item,
dbTx BaseMultiverseStore) error {
dbTx BaseMultiverseStore) (*universe.Proof, error) {

// Upsert proof leaf into the asset (group) specific universe
// tree.
_, err := universeUpsertProofLeaf(
return universeUpsertProofLeaf(
ctx, dbTx, item.ID, item.Key, item.Leaf,
item.MetaReveal,
)
if err != nil {
return err
}

return nil
}

var writeTx BaseMultiverseOptions
var (
writeTx BaseMultiverseOptions
uniProofs []*universe.Proof
)
dbErr := b.db.ExecTx(
ctx, &writeTx, func(store BaseMultiverseStore) error {
uniProofs = make([]*universe.Proof, len(items))
for idx := range items {
item := items[idx]
err := insertProof(item, store)
uniProof, err := insertProof(item, store)
if err != nil {
return err
}

uniProofs[idx] = uniProof
}

return nil
Expand All @@ -690,6 +819,13 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
items[idx].Leaf.RawProof,
)
}

// Update the syncer cache with the new root node.
b.syncerCache.addOrReplace(universe.Root{
ID: items[idx].ID,
AssetName: items[idx].Leaf.Asset.Tag,
Node: uniProofs[idx].UniverseRoot,
})
}

// Invalidate the root node cache for all the assets we just inserted.
Expand Down Expand Up @@ -740,6 +876,7 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context,

b.proofCache.Delete(id.String())
b.leafKeysCache.wipeCache(id.String())
b.syncerCache.remove(id.Key())

return id.String(), dbErr
}
Expand Down
Loading