From 016ddfdfce4023c0f7a7539809fd475ab96d9dbc Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Thu, 19 May 2022 14:07:48 +0300 Subject: [PATCH] Use a channel for utxo change events (#2052) * Use a channel from within consensus in order to raise change events in order -- note that this is only a draft commit for discussion * Fix compilation * Check for nil * Allow nil virtualChangeChan * Remove redundant comments * Call notifyVirtualChange instead of notifyUTXOsChanged * Remove redundant comment * Add a separate function for initVirtualChangeHandler * Remove redundant type * Check for nil in the right place * Fix integration test * Add data to virtual changeset and cleanup block added event logic * Renames * Comment Co-authored-by: Ori Newman --- app/component_manager.go | 7 +- app/protocol/flowcontext/blocks.go | 31 ++------ app/protocol/flowcontext/flow_context.go | 11 +-- app/protocol/flowcontext/orphans.go | 31 +++----- .../flows/v5/blockrelay/handle_relay_invs.go | 19 +++-- app/protocol/flows/v5/blockrelay/ibd.go | 14 +--- app/protocol/manager.go | 5 -- app/rpc/manager.go | 79 +++++++++++-------- app/rpc/rpccontext/notificationmanager.go | 13 +++ app/rpc/rpchandlers/get_blocks_test.go | 4 + domain/consensus/consensus.go | 48 ++++++++++- domain/consensus/factory.go | 10 ++- domain/consensus/factory_test.go | 3 +- .../model/externalapi/virtualchangeset.go | 2 + domain/domain.go | 18 +++-- domain/utxoindex/utxoindex.go | 3 + 16 files changed, 176 insertions(+), 122 deletions(-) diff --git a/app/component_manager.go b/app/component_manager.go index 88eff5031b..e1b171cb78 100644 --- a/app/component_manager.go +++ b/app/component_manager.go @@ -2,6 +2,7 @@ package app import ( "fmt" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "sync/atomic" "github.com/kaspanet/kaspad/domain/miningmanager/mempool" @@ -67,6 +68,7 @@ func (a *ComponentManager) Stop() { } a.protocolManager.Close() + close(a.protocolManager.Context().Domain().VirtualChangeChannel()) return } @@ -118,7 +120,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database, if err != nil { return nil, err } - rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, interrupt) + rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt) return &ComponentManager{ cfg: cfg, @@ -139,6 +141,7 @@ func setupRPC( connectionManager *connmanager.ConnectionManager, addressManager *addressmanager.AddressManager, utxoIndex *utxoindex.UTXOIndex, + virtualChangeChan chan *externalapi.VirtualChangeSet, shutDownChan chan<- struct{}, ) *rpc.Manager { @@ -150,9 +153,9 @@ func setupRPC( connectionManager, addressManager, utxoIndex, + virtualChangeChan, shutDownChan, ) - protocolManager.SetOnVirtualChange(rpcManager.NotifyVirtualChange) protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate) protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG) protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride) diff --git a/app/protocol/flowcontext/blocks.go b/app/protocol/flowcontext/blocks.go index 994bf259e4..dcc5a37f64 100644 --- a/app/protocol/flowcontext/blocks.go +++ b/app/protocol/flowcontext/blocks.go @@ -16,29 +16,24 @@ import ( // OnNewBlock updates the mempool after a new block arrival, and // relays newly unorphaned transactions and possibly rebroadcast // manually added transactions when not in IBD. -func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock, - virtualChangeSet *externalapi.VirtualChangeSet) error { +func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error { hash := consensushashing.BlockHash(block) log.Tracef("OnNewBlock start for block %s", hash) defer log.Tracef("OnNewBlock end for block %s", hash) - unorphaningResults, err := f.UnorphanBlocks(block) + unorphanedBlocks, err := f.UnorphanBlocks(block) if err != nil { return err } - log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphaningResults)) + log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphanedBlocks)) newBlocks := []*externalapi.DomainBlock{block} - newVirtualChangeSets := []*externalapi.VirtualChangeSet{virtualChangeSet} - for _, unorphaningResult := range unorphaningResults { - newBlocks = append(newBlocks, unorphaningResult.block) - newVirtualChangeSets = append(newVirtualChangeSets, unorphaningResult.virtualChangeSet) - } + newBlocks = append(newBlocks, unorphanedBlocks...) allAcceptedTransactions := make([]*externalapi.DomainTransaction, 0) - for i, newBlock := range newBlocks { + for _, newBlock := range newBlocks { log.Debugf("OnNewBlock: passing block %s transactions to mining manager", hash) acceptedTransactions, err := f.Domain().MiningManager().HandleNewBlockTransactions(newBlock.Transactions) if err != nil { @@ -48,8 +43,7 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock, if f.onBlockAddedToDAGHandler != nil { log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash) - virtualChangeSet = newVirtualChangeSets[i] - err := f.onBlockAddedToDAGHandler(newBlock, virtualChangeSet) + err := f.onBlockAddedToDAGHandler(newBlock) if err != nil { return err } @@ -59,15 +53,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock, return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions) } -// OnVirtualChange calls the handler function whenever the virtual block changes. -func (f *FlowContext) OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error { - if f.onVirtualChangeHandler != nil && virtualChangeSet != nil { - return f.onVirtualChangeHandler(virtualChangeSet) - } - - return nil -} - // OnNewBlockTemplate calls the handler function whenever a new block template is available for miners. func (f *FlowContext) OnNewBlockTemplate() error { // Clear current template cache. Note we call this even if the handler is nil, in order to keep the @@ -130,7 +115,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error { return protocolerrors.Errorf(false, "cannot add header only block") } - virtualChangeSet, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true) + _, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true) if err != nil { if errors.As(err, &ruleerrors.RuleError{}) { log.Warnf("Validation failed for block %s: %s", consensushashing.BlockHash(block), err) @@ -141,7 +126,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error { if err != nil { return err } - err = f.OnNewBlock(block, virtualChangeSet) + err = f.OnNewBlock(block) if err != nil { return err } diff --git a/app/protocol/flowcontext/flow_context.go b/app/protocol/flowcontext/flow_context.go index a9103b4204..f32476d52e 100644 --- a/app/protocol/flowcontext/flow_context.go +++ b/app/protocol/flowcontext/flow_context.go @@ -20,10 +20,7 @@ import ( // OnBlockAddedToDAGHandler is a handler function that's triggered // when a block is added to the DAG -type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error - -// OnVirtualChangeHandler is a handler function that's triggered when the virtual changes -type OnVirtualChangeHandler func(virtualChangeSet *externalapi.VirtualChangeSet) error +type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error // OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available type OnNewBlockTemplateHandler func() error @@ -47,7 +44,6 @@ type FlowContext struct { timeStarted int64 - onVirtualChangeHandler OnVirtualChangeHandler onBlockAddedToDAGHandler OnBlockAddedToDAGHandler onNewBlockTemplateHandler OnNewBlockTemplateHandler onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler @@ -111,11 +107,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) { return f.Domain().Consensus().IsNearlySynced() } -// SetOnVirtualChangeHandler sets the onVirtualChangeHandler handler -func (f *FlowContext) SetOnVirtualChangeHandler(onVirtualChangeHandler OnVirtualChangeHandler) { - f.onVirtualChangeHandler = onVirtualChangeHandler -} - // SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) { f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler diff --git a/app/protocol/flowcontext/orphans.go b/app/protocol/flowcontext/orphans.go index 62536c3980..2087d68178 100644 --- a/app/protocol/flowcontext/orphans.go +++ b/app/protocol/flowcontext/orphans.go @@ -15,12 +15,6 @@ import ( // on: 2^orphanResolutionRange * PHANTOM K. const maxOrphans = 600 -// UnorphaningResult is the result of unorphaning a block -type UnorphaningResult struct { - block *externalapi.DomainBlock - virtualChangeSet *externalapi.VirtualChangeSet -} - // AddOrphan adds the block to the orphan set func (f *FlowContext) AddOrphan(orphanBlock *externalapi.DomainBlock) { f.orphansMutex.Lock() @@ -57,7 +51,7 @@ func (f *FlowContext) IsOrphan(blockHash *externalapi.DomainHash) bool { } // UnorphanBlocks removes the block from the orphan set, and remove all of the blocks that are not orphans anymore. -func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*UnorphaningResult, error) { +func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*externalapi.DomainBlock, error) { f.orphansMutex.Lock() defer f.orphansMutex.Unlock() @@ -66,7 +60,7 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno rootBlockHash := consensushashing.BlockHash(rootBlock) processQueue := f.addChildOrphansToProcessQueue(rootBlockHash, []externalapi.DomainHash{}) - var unorphaningResults []*UnorphaningResult + var unorphanedBlocks []*externalapi.DomainBlock for len(processQueue) > 0 { var orphanHash externalapi.DomainHash orphanHash, processQueue = processQueue[0], processQueue[1:] @@ -90,21 +84,18 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno } } if canBeUnorphaned { - virtualChangeSet, unorphaningSucceeded, err := f.unorphanBlock(orphanHash) + unorphaningSucceeded, err := f.unorphanBlock(orphanHash) if err != nil { return nil, err } if unorphaningSucceeded { - unorphaningResults = append(unorphaningResults, &UnorphaningResult{ - block: orphanBlock, - virtualChangeSet: virtualChangeSet, - }) + unorphanedBlocks = append(unorphanedBlocks, orphanBlock) processQueue = f.addChildOrphansToProcessQueue(&orphanHash, processQueue) } } } - return unorphaningResults, nil + return unorphanedBlocks, nil } // addChildOrphansToProcessQueue finds all child orphans of `blockHash` @@ -143,24 +134,24 @@ func (f *FlowContext) findChildOrphansOfBlock(blockHash *externalapi.DomainHash) return childOrphans } -func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (*externalapi.VirtualChangeSet, bool, error) { +func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (bool, error) { orphanBlock, ok := f.orphans[orphanHash] if !ok { - return nil, false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash) + return false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash) } delete(f.orphans, orphanHash) - virtualChangeSet, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true) + _, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true) if err != nil { if errors.As(err, &ruleerrors.RuleError{}) { log.Warnf("Validation failed for orphan block %s: %s", orphanHash, err) - return nil, false, nil + return false, nil } - return nil, false, err + return false, err } log.Infof("Unorphaned block %s", orphanHash) - return virtualChangeSet, true, nil + return true, nil } // GetOrphanRoots returns the roots of the missing ancestors DAG of the given orphan diff --git a/app/protocol/flows/v5/blockrelay/handle_relay_invs.go b/app/protocol/flows/v5/blockrelay/handle_relay_invs.go index 4ddb84e3d0..5e47e77701 100644 --- a/app/protocol/flows/v5/blockrelay/handle_relay_invs.go +++ b/app/protocol/flows/v5/blockrelay/handle_relay_invs.go @@ -26,8 +26,7 @@ var orphanResolutionRange uint32 = 5 type RelayInvsContext interface { Domain() domain.Domain Config() *config.Config - OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error - OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error + OnNewBlock(block *externalapi.DomainBlock) error OnNewBlockTemplate() error OnPruningPointUTXOSetOverride() error SharedRequestedBlocks() *flowcontext.SharedRequestedBlocks @@ -174,7 +173,7 @@ func (flow *handleRelayInvsFlow) start() error { if err != nil { return err } - missingParents, virtualChangeSet, err := flow.processBlock(block) + missingParents, err := flow.processBlock(block) if err != nil { if errors.Is(err, ruleerrors.ErrPrunedBlock) { log.Infof("Ignoring pruned block %s", inv.Hash) @@ -233,7 +232,7 @@ func (flow *handleRelayInvsFlow) start() error { } log.Infof("Accepted block %s via relay", inv.Hash) - err = flow.OnNewBlock(block, virtualChangeSet) + err = flow.OnNewBlock(block) if err != nil { return err } @@ -320,25 +319,25 @@ func (flow *handleRelayInvsFlow) readMsgBlock() (msgBlock *appmessage.MsgBlock, } } -func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, *externalapi.VirtualChangeSet, error) { +func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, error) { blockHash := consensushashing.BlockHash(block) - virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true) + _, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true) if err != nil { if !errors.As(err, &ruleerrors.RuleError{}) { - return nil, nil, errors.Wrapf(err, "failed to process block %s", blockHash) + return nil, errors.Wrapf(err, "failed to process block %s", blockHash) } missingParentsError := &ruleerrors.ErrMissingParents{} if errors.As(err, missingParentsError) { - return missingParentsError.MissingParentHashes, nil, nil + return missingParentsError.MissingParentHashes, nil } // A duplicate block should not appear to the user as a warning and is already reported in the calling function if !errors.Is(err, ruleerrors.ErrDuplicateBlock) { log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err) } - return nil, nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash) + return nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash) } - return nil, virtualChangeSet, nil + return nil, nil } func (flow *handleRelayInvsFlow) relayBlock(block *externalapi.DomainBlock) error { diff --git a/app/protocol/flows/v5/blockrelay/ibd.go b/app/protocol/flows/v5/blockrelay/ibd.go index 126503f37e..2f989bc9f6 100644 --- a/app/protocol/flows/v5/blockrelay/ibd.go +++ b/app/protocol/flows/v5/blockrelay/ibd.go @@ -20,8 +20,7 @@ import ( type IBDContext interface { Domain() domain.Domain Config() *config.Config - OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error - OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error + OnNewBlock(block *externalapi.DomainBlock) error OnNewBlockTemplate() error OnPruningPointUTXOSetOverride() error IsIBDRunning() bool @@ -655,7 +654,7 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa return err } - virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, false) + _, err = flow.Domain().Consensus().ValidateAndInsertBlock(block, false) if err != nil { if errors.Is(err, ruleerrors.ErrDuplicateBlock) { log.Debugf("Skipping IBD Block %s as it has already been added to the DAG", blockHash) @@ -663,7 +662,7 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa } return protocolerrors.ConvertToBanningProtocolErrorIfRuleError(err, "invalid block %s", blockHash) } - err = flow.OnNewBlock(block, virtualChangeSet) + err = flow.OnNewBlock(block) if err != nil { return err } @@ -706,12 +705,7 @@ func (flow *handleIBDFlow) resolveVirtual(estimatedVirtualDAAScoreTarget uint64) } log.Infof("Resolving virtual. Estimated progress: %d%%", percents) } - virtualChangeSet, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual() - if err != nil { - return err - } - - err = flow.OnVirtualChange(virtualChangeSet) + _, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual() if err != nil { return err } diff --git a/app/protocol/manager.go b/app/protocol/manager.go index 2cfe553eed..c6c3046aa9 100644 --- a/app/protocol/manager.go +++ b/app/protocol/manager.go @@ -90,11 +90,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c return <-errChan } -// SetOnVirtualChange sets the onVirtualChangeHandler handler -func (m *Manager) SetOnVirtualChange(onVirtualChangeHandler flowcontext.OnVirtualChangeHandler) { - m.context.SetOnVirtualChangeHandler(onVirtualChangeHandler) -} - // SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) { m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler) diff --git a/app/rpc/manager.go b/app/rpc/manager.go index 7592d8a19e..ad1c9245b5 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -28,6 +28,7 @@ func NewManager( connectionManager *connmanager.ConnectionManager, addressManager *addressmanager.AddressManager, utxoIndex *utxoindex.UTXOIndex, + virtualChangeChan chan *externalapi.VirtualChangeSet, shutDownChan chan<- struct{}) *Manager { manager := Manager{ @@ -44,14 +45,37 @@ func NewManager( } netAdapter.SetRPCRouterInitializer(manager.routerInitializer) + manager.initVirtualChangeHandler(virtualChangeChan) + return &manager } +func (m *Manager) initVirtualChangeHandler(virtualChangeChan chan *externalapi.VirtualChangeSet) { + spawn("virtualChangeHandler", func() { + for { + virtualChangeSet, ok := <-virtualChangeChan + if !ok { + return + } + err := m.notifyVirtualChange(virtualChangeSet) + if err != nil { + panic(err) + } + } + }) +} + // NotifyBlockAddedToDAG notifies the manager that a block has been added to the DAG -func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error { +func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyBlockAddedToDAG") defer onEnd() + // Before converting the block and populating it, we check if any listeners are interested. + // This is done since most nodes do not use this event. + if !m.context.NotificationManager.HasBlockAddedListeners() { + return nil + } + rpcBlock := appmessage.DomainBlockToRPCBlock(block) err := m.context.PopulateBlockWithVerboseData(rpcBlock, block.Header, block, true) if err != nil { @@ -63,38 +87,43 @@ func (m *Manager) NotifyBlockAddedToDAG(block *externalapi.DomainBlock, virtualC return err } - // When block was added during IBD - it doesn't incur any Virtual change, - // thus no notification is needed. - if len(virtualChangeSet.VirtualSelectedParentChainChanges.Added) == 0 && - len(virtualChangeSet.VirtualSelectedParentChainChanges.Removed) == 0 { - - return nil - } - return m.NotifyVirtualChange(virtualChangeSet) + return nil } -// NotifyVirtualChange notifies the manager that the virtual block has been changed. -func (m *Manager) NotifyVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error { +// notifyVirtualChange notifies the manager that the virtual block has been changed. +func (m *Manager) notifyVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualChange") defer onEnd() - if m.context.Config.UTXOIndex { + /* + NOTE: nothing under this function is allowed to acquire the consensus lock, since + the function is triggered by a channel call under consensus lock which might block + */ + + if m.context.Config.UTXOIndex && virtualChangeSet.VirtualUTXODiff != nil { err := m.notifyUTXOsChanged(virtualChangeSet) if err != nil { return err } } - err := m.notifyVirtualSelectedParentBlueScoreChanged() + err := m.notifyVirtualSelectedParentBlueScoreChanged(virtualChangeSet.VirtualSelectedParentBlueScore) if err != nil { return err } - err = m.notifyVirtualDaaScoreChanged() + err = m.notifyVirtualDaaScoreChanged(virtualChangeSet.VirtualDAAScore) if err != nil { return err } + if virtualChangeSet.VirtualSelectedParentChainChanges == nil || + (len(virtualChangeSet.VirtualSelectedParentChainChanges.Added) == 0 && + len(virtualChangeSet.VirtualSelectedParentChainChanges.Removed) == 0) { + + return nil + } + err = m.notifyVirtualSelectedParentChainChanged(virtualChangeSet) if err != nil { return err @@ -152,6 +181,7 @@ func (m *Manager) notifyUTXOsChanged(virtualChangeSet *externalapi.VirtualChange if err != nil { return err } + return m.context.NotificationManager.NotifyUTXOsChanged(utxoIndexChanges) } @@ -167,33 +197,18 @@ func (m *Manager) notifyPruningPointUTXOSetOverride() error { return m.context.NotificationManager.NotifyPruningPointUTXOSetOverride() } -func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged() error { +func (m *Manager) notifyVirtualSelectedParentBlueScoreChanged(virtualSelectedParentBlueScore uint64) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentBlueScoreChanged") defer onEnd() - virtualSelectedParent, err := m.context.Domain.Consensus().GetVirtualSelectedParent() - if err != nil { - return err - } - - blockInfo, err := m.context.Domain.Consensus().GetBlockInfo(virtualSelectedParent) - if err != nil { - return err - } - - notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(blockInfo.BlueScore) + notification := appmessage.NewVirtualSelectedParentBlueScoreChangedNotificationMessage(virtualSelectedParentBlueScore) return m.context.NotificationManager.NotifyVirtualSelectedParentBlueScoreChanged(notification) } -func (m *Manager) notifyVirtualDaaScoreChanged() error { +func (m *Manager) notifyVirtualDaaScoreChanged(virtualDAAScore uint64) error { onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualDaaScoreChanged") defer onEnd() - virtualDAAScore, err := m.context.Domain.Consensus().GetVirtualDAAScore() - if err != nil { - return err - } - notification := appmessage.NewVirtualDaaScoreChangedNotificationMessage(virtualDAAScore) return m.context.NotificationManager.NotifyVirtualDaaScoreChanged(notification) } diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index 33ab6bc150..d28870f927 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -82,6 +82,19 @@ func (nm *NotificationManager) Listener(router *routerpkg.Router) (*Notification return listener, nil } +// HasBlockAddedListeners indicates if the notification manager has any listeners for `BlockAdded` events +func (nm *NotificationManager) HasBlockAddedListeners() bool { + nm.RLock() + defer nm.RUnlock() + + for _, listener := range nm.listeners { + if listener.propagateBlockAddedNotifications { + return true + } + } + return false +} + // NotifyBlockAdded notifies the notification manager that a block has been added to the DAG func (nm *NotificationManager) NotifyBlockAdded(notification *appmessage.BlockAddedNotificationMessage) error { nm.RLock() diff --git a/app/rpc/rpchandlers/get_blocks_test.go b/app/rpc/rpchandlers/get_blocks_test.go index 73a2bdca95..6f4bf58092 100644 --- a/app/rpc/rpchandlers/get_blocks_test.go +++ b/app/rpc/rpchandlers/get_blocks_test.go @@ -23,6 +23,10 @@ type fakeDomain struct { testapi.TestConsensus } +func (d fakeDomain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { + panic("implement me") +} + func (d fakeDomain) DeleteStagingConsensus() error { panic("implement me") } diff --git a/domain/consensus/consensus.go b/domain/consensus/consensus.go index 46aeda7a22..76737d88b2 100644 --- a/domain/consensus/consensus.go +++ b/domain/consensus/consensus.go @@ -58,6 +58,8 @@ type consensus struct { headersSelectedChainStore model.HeadersSelectedChainStore daaBlocksStore model.DAABlocksStore blocksWithTrustedDataDAAWindowStore model.BlocksWithTrustedDataDAAWindowStore + + virtualChangeChan chan *externalapi.VirtualChangeSet } func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) (*externalapi.VirtualChangeSet, error) { @@ -190,7 +192,46 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, shoul s.lock.Lock() defer s.lock.Unlock() - return s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO) + virtualChangeSet, err := s.blockProcessor.ValidateAndInsertBlock(block, shouldValidateAgainstUTXO) + if err != nil { + return nil, err + } + + err = s.onVirtualChange(virtualChangeSet, shouldValidateAgainstUTXO) + if err != nil { + return nil, err + } + + return virtualChangeSet, nil +} + +func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet, wasVirtualUpdated bool) error { + if !wasVirtualUpdated || s.virtualChangeChan == nil { + return nil + } + + stagingArea := model.NewStagingArea() + virtualGHOSTDAGData, err := s.ghostdagDataStores[0].Get(s.databaseContext, stagingArea, model.VirtualBlockHash, false) + if err != nil { + return err + } + + virtualSelectedParentGHOSTDAGData, err := s.ghostdagDataStores[0].Get(s.databaseContext, stagingArea, virtualGHOSTDAGData.SelectedParent(), false) + if err != nil { + return err + } + + virtualDAAScore, err := s.daaBlocksStore.DAAScore(s.databaseContext, stagingArea, model.VirtualBlockHash) + if err != nil { + return err + } + + // Populate the change set with additional data before sending + virtualChangeSet.VirtualSelectedParentBlueScore = virtualSelectedParentGHOSTDAGData.BlueScore() + virtualChangeSet.VirtualDAAScore = virtualDAAScore + + s.virtualChangeChan <- virtualChangeSet + return nil } // ValidateTransactionAndPopulateWithConsensusData validates the given transaction @@ -792,6 +833,11 @@ func (s *consensus) ResolveVirtual() (*externalapi.VirtualChangeSet, bool, error return nil, false, err } + err = s.onVirtualChange(virtualChangeSet, true) + if err != nil { + return nil, false, err + } + return virtualChangeSet, isCompletelyResolved, nil } diff --git a/domain/consensus/factory.go b/domain/consensus/factory.go index 5935672180..a91f539adc 100644 --- a/domain/consensus/factory.go +++ b/domain/consensus/factory.go @@ -76,7 +76,8 @@ type Config struct { // Factory instantiates new Consensuses type Factory interface { - NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix) ( + NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, + virtualChangeChan chan *externalapi.VirtualChangeSet) ( externalapi.Consensus, bool, error) NewTestConsensus(config *Config, testName string) ( tc testapi.TestConsensus, teardown func(keepDataDir bool), err error) @@ -108,7 +109,8 @@ func NewFactory() Factory { } // NewConsensus instantiates a new Consensus -func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix) ( +func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Database, dbPrefix *prefix.Prefix, + virtualChangeChan chan *externalapi.VirtualChangeSet) ( consensusInstance externalapi.Consensus, shouldMigrate bool, err error) { dbManager := consensusdatabase.New(db) @@ -510,6 +512,8 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas headersSelectedChainStore: headersSelectedChainStore, daaBlocksStore: daaBlocksStore, blocksWithTrustedDataDAAWindowStore: daaWindowStore, + + virtualChangeChan: virtualChangeChan, } if isOldReachabilityInitialized { @@ -572,7 +576,7 @@ func (f *factory) NewTestConsensus(config *Config, testName string) ( } testConsensusDBPrefix := &prefix.Prefix{} - consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix) + consensusAsInterface, shouldMigrate, err := f.NewConsensus(config, db, testConsensusDBPrefix, nil) if err != nil { return nil, nil, err } diff --git a/domain/consensus/factory_test.go b/domain/consensus/factory_test.go index 8e199c25d6..cbf661cbab 100644 --- a/domain/consensus/factory_test.go +++ b/domain/consensus/factory_test.go @@ -1,6 +1,7 @@ package consensus import ( + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/prefixmanager/prefix" "io/ioutil" "testing" @@ -24,7 +25,7 @@ func TestNewConsensus(t *testing.T) { t.Fatalf("error in NewLevelDB: %s", err) } - _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}) + _, shouldMigrate, err := f.NewConsensus(config, db, &prefix.Prefix{}, make(chan *externalapi.VirtualChangeSet)) if err != nil { t.Fatalf("error in NewConsensus: %+v", err) } diff --git a/domain/consensus/model/externalapi/virtualchangeset.go b/domain/consensus/model/externalapi/virtualchangeset.go index 66d08ef314..356e938f40 100644 --- a/domain/consensus/model/externalapi/virtualchangeset.go +++ b/domain/consensus/model/externalapi/virtualchangeset.go @@ -5,6 +5,8 @@ type VirtualChangeSet struct { VirtualSelectedParentChainChanges *SelectedChainPath VirtualUTXODiff UTXODiff VirtualParents []*DomainHash + VirtualSelectedParentBlueScore uint64 + VirtualDAAScore uint64 } // SelectedChainPath is a path the of the selected chains between two blocks. diff --git a/domain/domain.go b/domain/domain.go index 161bf1c366..6a5532d73a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -24,6 +24,7 @@ type Domain interface { InitStagingConsensusWithoutGenesis() error CommitStagingConsensus() error DeleteStagingConsensus() error + VirtualChangeChannel() chan *externalapi.VirtualChangeSet } type domain struct { @@ -33,6 +34,11 @@ type domain struct { stagingConsensusLock sync.RWMutex consensusConfig *consensus.Config db infrastructuredatabase.Database + virtualChangeChan chan *externalapi.VirtualChangeSet +} + +func (d *domain) VirtualChangeChannel() chan *externalapi.VirtualChangeSet { + return d.virtualChangeChan } func (d *domain) Consensus() externalapi.Consensus { @@ -86,7 +92,7 @@ func (d *domain) initStagingConsensus(cfg *consensus.Config) error { consensusFactory := consensus.NewFactory() - consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix) + consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(cfg, d.db, inactivePrefix, d.virtualChangeChan) if err != nil { return err } @@ -190,16 +196,18 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in } } + virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 1000) consensusFactory := consensus.NewFactory() - consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix) + consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan) if err != nil { return nil, err } domainInstance := &domain{ - consensus: &consensusInstance, - consensusConfig: consensusConfig, - db: db, + consensus: &consensusInstance, + consensusConfig: consensusConfig, + db: db, + virtualChangeChan: virtualChangeChan, } if shouldMigrate { diff --git a/domain/utxoindex/utxoindex.go b/domain/utxoindex/utxoindex.go index 13e4d3b3d1..8646a4f5b7 100644 --- a/domain/utxoindex/utxoindex.go +++ b/domain/utxoindex/utxoindex.go @@ -43,6 +43,9 @@ func New(domain domain.Domain, database database.Database) (*UTXOIndex, error) { // Reset deletes the whole UTXO index and resyncs it from consensus. func (ui *UTXOIndex) Reset() error { + ui.mutex.Lock() + defer ui.mutex.Unlock() + err := ui.store.deleteAll() if err != nil { return err