diff --git a/block/block.go b/block/block.go index abb1b242e..a21ad6d22 100644 --- a/block/block.go +++ b/block/block.go @@ -90,7 +90,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta if err != nil { return fmt.Errorf("update state: %w", err) } - // Prune old heights, if requested by ABCI app. if 0 < retainHeight { err = m.PruneBlocks(uint64(retainHeight)) @@ -132,7 +131,7 @@ func (m *Manager) attemptApplyCachedBlocks() error { return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err) } - err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock}) + err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source}) if err != nil { return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err) } diff --git a/block/block_cache.go b/block/block_cache.go index b8e1a296c..4e8a188a0 100644 --- a/block/block_cache.go +++ b/block/block_cache.go @@ -9,8 +9,8 @@ type Cache struct { cache map[uint64]types.CachedBlock } -func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) { - m.cache[h] = types.CachedBlock{Block: b, Commit: c} +func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) { + m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source} types.BlockCacheSizeGauge.Set(float64(m.Size())) } diff --git a/block/manager.go b/block/manager.go index 38491e93a..b1c6239a9 100644 --- a/block/manager.go +++ b/block/manager.go @@ -8,7 +8,6 @@ import ( "sync" "sync/atomic" - "code.cloudfoundry.org/go-diodes" "github.com/dymensionxyz/gerr-cosmos/gerrc" "golang.org/x/sync/errgroup" @@ -66,15 +65,15 @@ type Manager struct { // Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, // and incoming DA blocks, respectively. retrieverMu sync.Mutex - Retriever da.BatchRetriever - // get the next target height to sync local state to - targetSyncHeight diodes.Diode - // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA - TargetHeight atomic.Uint64 - + // Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet. + syncFromDaMu sync.Mutex + Retriever da.BatchRetriever // Cached blocks and commits for applying at future heights. The blocks may not be valid, because // we can only do full validation in sequential order. blockCache *Cache + + // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA + TargetHeight atomic.Uint64 } // NewManager creates new block Manager. @@ -102,18 +101,17 @@ func NewManager( } m := &Manager{ - Pubsub: pubsub, - p2pClient: p2pClient, - LocalKey: localKey, - Conf: conf, - Genesis: genesis, - Store: store, - Executor: exec, - DAClient: dalc, - SLClient: settlementClient, - Retriever: dalc.(da.BatchRetriever), - targetSyncHeight: diodes.NewOneToOne(1, nil), - logger: logger, + Pubsub: pubsub, + p2pClient: p2pClient, + LocalKey: localKey, + Conf: conf, + Genesis: genesis, + Store: store, + Executor: exec, + DAClient: dalc, + SLClient: settlementClient, + Retriever: dalc.(da.BatchRetriever), + logger: logger, blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, @@ -148,16 +146,6 @@ func (m *Manager) Start(ctx context.Context) error { } } - if !isSequencer { - // Fullnode loop can start before syncing from DA - go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger) - } - - err = m.syncBlockManager() - if err != nil { - return fmt.Errorf("sync block manager: %w", err) - } - eg, ctx := errgroup.WithContext(ctx) if isSequencer { @@ -165,6 +153,10 @@ func (m *Manager) Start(ctx context.Context) error { <-m.DAClient.Synced() nBytes := m.GetUnsubmittedBytes() bytesProducedC := make(chan int) + err = m.syncFromSettlement() + if err != nil { + return fmt.Errorf("sync block manager from settlement: %w", err) + } uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SubmitLoop(ctx, bytesProducedC) }) @@ -172,13 +164,21 @@ func (m *Manager) Start(ctx context.Context) error { bytesProducedC <- nBytes return m.ProduceBlockLoop(ctx, bytesProducedC) }) + } else { - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.RetrieveLoop(ctx) - }) - uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.SyncToTargetHeightLoop(ctx) - }) + // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. + go func() { + err := m.syncFromSettlement() + if err != nil { + m.logger.Error("sync block manager from settlement", "err", err) + } + // DA Sync. Subscribe to SL next batch events + go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger) + }() + + // P2P Sync. Subscribe to P2P received blocks events + go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger) + go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger) } go func() { @@ -207,8 +207,8 @@ func (m *Manager) NextHeightToSubmit() uint64 { return m.LastSubmittedHeight.Load() + 1 } -// syncBlockManager enforces the node to be synced on initial run. -func (m *Manager) syncBlockManager() error { +// syncFromSettlement enforces the node to be synced on initial run from SL and DA. +func (m *Manager) syncFromSettlement() error { res, err := m.SLClient.GetLatestBatch() if errors.Is(err, gerrc.ErrNotFound) { // The SL hasn't got any batches for this chain yet. @@ -216,12 +216,14 @@ func (m *Manager) syncBlockManager() error { m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) return nil } + if err != nil { // TODO: separate between fresh rollapp and non-registered rollapp return err } m.LastSubmittedHeight.Store(res.EndHeight) err = m.syncToTargetHeight(res.EndHeight) + m.UpdateTargetHeight(res.EndHeight) if err != nil { return err } diff --git a/block/manager_test.go b/block/manager_test.go index f38a07fe1..58bf17f98 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -55,10 +56,11 @@ func TestInitialState(t *testing.T) { // Init p2p client privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) p2pClient, err := p2p.NewClient(config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - }, privKey, "TestChain", pubsubServer, logger) + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + }, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger) assert.NoError(err) assert.NotNil(p2pClient) diff --git a/block/gossip.go b/block/p2p.go similarity index 58% rename from block/gossip.go rename to block/p2p.go index 11a91bcd2..30d8e76b7 100644 --- a/block/gossip.go +++ b/block/p2p.go @@ -4,19 +4,40 @@ import ( "context" "fmt" - "github.com/tendermint/tendermint/libs/pubsub" - "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/types" + "github.com/tendermint/tendermint/libs/pubsub" ) -// onNewGossipedBlock will take a block and apply it -func (m *Manager) onNewGossipedBlock(event pubsub.Message) { - eventData, _ := event.Data().(p2p.GossipedBlock) +// onReceivedBlock receives a block received event from P2P, saves the block to a cache and tries to apply the blocks from the cache. +func (m *Manager) onReceivedBlock(event pubsub.Message) { + eventData, ok := event.Data().(p2p.P2PBlockEvent) + if !ok { + m.logger.Error("onReceivedBlock", "err", "wrong event data received") + return + } + var source types.BlockSource + + if len(event.Events()[p2p.EventTypeKey]) != 1 { + m.logger.Error("onReceivedBlock", "err", "wrong number of event types received with the event", "received", len(event.Events()[p2p.EventTypeKey])) + return + } + + switch event.Events()[p2p.EventTypeKey][0] { + case p2p.EventNewBlockSyncBlock: + source = types.BlockSync + case p2p.EventNewGossipedBlock: + source = types.Gossiped + default: + m.logger.Error("onReceivedBlock", "err", "wrong event type received", "type", event.Events()[p2p.EventTypeKey][0]) + return + } + block := eventData.Block commit := eventData.Commit - m.retrieverMu.Lock() // needed to protect blockCache access height := block.Header.Height + m.retrieverMu.Lock() // needed to protect blockCache access + // It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks if m.blockCache.HasBlockInCache(height) { m.retrieverMu.Unlock() @@ -30,7 +51,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { nextHeight := m.State.NextHeight() if height >= nextHeight { - m.blockCache.AddBlockToCache(height, &block, &commit) + m.blockCache.AddBlockToCache(height, &block, &commit, source) } m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant @@ -40,8 +61,10 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { } } +// gossipBlock sends created blocks by the sequencer to full-nodes using P2P gossipSub func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error { - gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit} + m.logger.Info("Gossipping block", "height", block.Header.Height) + gossipedBlock := p2p.P2PBlockEvent{Block: block, Commit: commit} gossipedBlockBytes, err := gossipedBlock.MarshalBinary() if err != nil { return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable) @@ -51,5 +74,6 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ // could cause that to fail, so we assume recoverable. return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable) } + return nil } diff --git a/block/produce.go b/block/produce.go index 3266fbaf2..4690f8d47 100644 --- a/block/produce.go +++ b/block/produce.go @@ -71,6 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) } bytesProducedN := block.SizeBytes() + commit.SizeBytes() + m.logger.Info("New block.", "size", uint64(block.ToProto().Size())) select { case <-ctx.Done(): return nil @@ -181,7 +182,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er } } - if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil { + if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced}); err != nil { return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable) } diff --git a/block/pruning.go b/block/pruning.go index 68628053d..bc222783d 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -1,6 +1,7 @@ package block import ( + "context" "fmt" "github.com/dymensionxyz/gerr-cosmos/gerrc" @@ -14,6 +15,10 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error { gerrc.ErrInvalidArgument) } + err := m.p2pClient.RemoveBlocks(context.TODO(), m.State.BaseHeight, retainHeight) + if err != nil { + m.logger.Error("pruning block-sync store", "retain_height", retainHeight, "err", err) + } pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight) if err != nil { return fmt.Errorf("prune block store: %w", err) diff --git a/block/retriever.go b/block/retriever.go index 213d6e963..50ce23324 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -1,34 +1,29 @@ package block import ( - "context" "errors" "fmt" - "code.cloudfoundry.org/go-diodes" "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/dymensionxyz/dymint/da" + "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/types" + "github.com/tendermint/tendermint/libs/pubsub" ) -// RetrieveLoop listens for new target sync heights and then syncs the chain by -// fetching batches from the settlement layer and then fetching the actual blocks -// from the DA. -func (m *Manager) RetrieveLoop(ctx context.Context) (err error) { - m.logger.Info("Started retrieve loop.") - p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx)) - - for { - targetHeight := p.Next() // We only care about the latest one - if targetHeight == nil { - return - } - - if err = m.syncToTargetHeight(*(*uint64)(targetHeight)); err != nil { - err = fmt.Errorf("sync until target: %w", err) - return - } +// onNewStateUpdate will try to sync to new height, if not already synced +func (m *Manager) onNewStateUpdate(event pubsub.Message) { + eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted) + if !ok { + m.logger.Error("onReceivedBatch", "err", "wrong event data received") + return + } + h := eventData.EndHeight + m.UpdateTargetHeight(h) + err := m.syncToTargetHeight(h) + if err != nil { + m.logger.Error("sync until target", "err", err) } } @@ -36,6 +31,8 @@ func (m *Manager) RetrieveLoop(ctx context.Context) (err error) { // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. func (m *Manager) syncToTargetHeight(targetHeight uint64) error { + defer m.syncFromDaMu.Unlock() + m.syncFromDaMu.Lock() for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() { // if we have the block locally, we don't need to fetch it from the DA err := m.applyLocalBlock(currH) @@ -100,7 +97,7 @@ func (m *Manager) applyLocalBlock(height uint64) error { } m.retrieverMu.Lock() - err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDbBlock}) + err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDb}) if err != nil { return fmt.Errorf("apply block from local store: height: %d: %w", height, err) } @@ -113,7 +110,6 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { m.logger.Debug("trying to retrieve batch from DA", "daHeight", daMetaData.Height) batchResp := m.fetchBatch(daMetaData) if batchResp.Code != da.StatusSuccess { - m.logger.Error("fetching batch from DA", batchResp.Message) return batchResp.Error } @@ -132,7 +128,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { m.logger.Error("validate block from DA", "height", block.Header.Height, "err", err) continue } - err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DABlock, DAHeight: daMetaData.Height}) + + err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DA, DAHeight: daMetaData.Height}) if err != nil { return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err) } diff --git a/block/state.go b/block/state.go index fa2a7e0ad..ef8e2a8c9 100644 --- a/block/state.go +++ b/block/state.go @@ -86,7 +86,7 @@ func (m *Manager) UpdateStateFromApp() error { } vals, err := m.Store.LoadValidators(appHeight) if err != nil { - return errorsmod.Wrap(err, "load block responses") + return errorsmod.Wrap(err, "load validators") } // update the state with the hash, last store height and last validators. diff --git a/block/sync.go b/block/sync.go deleted file mode 100644 index 9e37d8277..000000000 --- a/block/sync.go +++ /dev/null @@ -1,53 +0,0 @@ -package block - -import ( - "context" - - "github.com/tendermint/tendermint/libs/pubsub" - - "github.com/dymensionxyz/dymint/types" - - "code.cloudfoundry.org/go-diodes" - - "github.com/dymensionxyz/dymint/settlement" -) - -// SyncToTargetHeightLoop gets real time updates about settlement batch submissions and sends the latest height downstream -// to be retrieved by another process which will pull the data. -func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) { - m.logger.Info("Started sync target loop") - var subscription *pubsub.Subscription - subscription, err = m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) - if err != nil { - m.logger.Error("subscribe to state update events", "error", err) - return - } - - for { - select { - case <-ctx.Done(): - return - case event := <-subscription.Out(): - eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted) - h := eventData.EndHeight - - if h <= m.State.Height() { - m.logger.Debug( - "syncTargetLoop: received new settlement batch accepted with batch end height <= current store height, skipping.", - "target sync height (batch end height)", - h, - "current store height", - m.State.Height(), - ) - continue - } - types.RollappHubHeightGauge.Set(float64(h)) - m.UpdateTargetHeight(h) - m.targetSyncHeight.Set(diodes.GenericDataType(&h)) - m.logger.Info("Set new target sync height", "height", h) - case <-subscription.Cancelled(): - m.logger.Error("syncTargetLoop subscription canceled") - return - } - } -} diff --git a/config/config_test.go b/config/config_test.go index 8b34b2551..8940e1912 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -220,10 +220,11 @@ func fullNodeConfig() config.NodeConfig { Port: 9090, }, P2PConfig: config.P2PConfig{ - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", }, } } diff --git a/config/defaults.go b/config/defaults.go index c0c9ec16b..64dd6b734 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -37,11 +37,13 @@ func DefaultConfig(home, chainId string) *NodeConfig { PrometheusListenAddr: ":2112", }, P2PConfig: P2PConfig{ - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: DefaultListenAddress, - BootstrapNodes: "", - AdvertisingEnabled: true, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + ListenAddress: DefaultListenAddress, + BootstrapNodes: "", + AdvertisingEnabled: true, + BlockSyncEnabled: true, }, DBConfig: DBConfig{ SyncWrites: true, diff --git a/config/p2p.go b/config/p2p.go index 31ef7bec5..a569d023c 100644 --- a/config/p2p.go +++ b/config/p2p.go @@ -17,6 +17,10 @@ type P2PConfig struct { GossipSubCacheSize int `mapstructure:"p2p_gossip_cache_size"` // Time interval a node tries to bootstrap again, in case no nodes connected BootstrapRetryTime time.Duration `mapstructure:"p2p_bootstrap_retry_time"` + // Param used to enable block sync from p2p + BlockSyncEnabled bool `mapstructure:"p2p_blocksync_enabled"` + // Time interval used by a node to request missing blocks (gap between cached blocks and local height) on demand from other peers using block-sync + BlockSyncRequestIntervalTime time.Duration `mapstructure:"p2p_blocksync_block_request_interval"` // Param used to enable the advertisement of the node to be part of the P2P network in the DHT AdvertisingEnabled bool `mapstructure:"p2p_advertising_enabled"` } @@ -29,5 +33,9 @@ func (c P2PConfig) Validate() error { if c.BootstrapRetryTime <= 0 { return fmt.Errorf("bootstrap time must be positive") } + if c.BlockSyncRequestIntervalTime <= 0 { + return fmt.Errorf("blocksync retrieve time must be positive") + } + return nil } diff --git a/config/toml.go b/config/toml.go index 7c3236eab..f5b2eee66 100644 --- a/config/toml.go +++ b/config/toml.go @@ -110,6 +110,12 @@ p2p_bootstrap_retry_time = "{{ .P2PConfig.BootstrapRetryTime }}" # set to false to disable advertising the node to the P2P network p2p_advertising_enabled= "{{ .P2PConfig.AdvertisingEnabled }}" +# set to false to disable block syncing from p2p +p2p_blocksync_enabled= "{{ .P2PConfig.BlockSyncEnabled }}" + +# time interval used to periodically check for missing blocks and retrieve it from other peers on demand using P2P +p2p_blocksync_block_request_interval= "{{ .P2PConfig.BlockSyncRequestIntervalTime }}" + ### settlement config ### settlement_layer = "{{ .SettlementLayer }}" # mock, dymension diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 48194e214..7429b05e3 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -267,11 +267,11 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet c.logger.Debug("Context cancelled.") return da.ResultRetrieveBatch{} default: - // Just for backward compatibility, in case no commitments are sent from the Hub, batch can be retrieved using previous implementation. var resultRetrieveBatch da.ResultRetrieveBatch err := retry.Do( func() error { var result da.ResultRetrieveBatch + // Just for backward compatibility, in case no commitments are sent from the Hub, batch can be retrieved using previous implementation. if daMetaData.Commitment == nil { result = c.retrieveBatchesNoCommitment(daMetaData.Height) } else { diff --git a/da/celestia/rpc.go b/da/celestia/rpc.go index 65304951a..5ad99feb4 100644 --- a/da/celestia/rpc.go +++ b/da/celestia/rpc.go @@ -30,7 +30,7 @@ func (c *OpenRPC) GetAll(ctx context.Context, height uint64, namespaces []share. return c.rpc.Blob.GetAll(ctx, height, namespaces) } -// Submit blobs. +// Submit blobs. func (c *OpenRPC) Submit(ctx context.Context, blobs []*blob.Blob, gasPrice openrpc.GasPrice) (uint64, error) { return c.rpc.Blob.Submit(ctx, blobs, gasPrice) } diff --git a/da/da.go b/da/da.go index 2f3d7e0ae..1d0a7de6e 100644 --- a/da/da.go +++ b/da/da.go @@ -38,9 +38,9 @@ type Client string // Data availability clients const ( Mock Client = "mock" - Grpc Client = "grpc" Celestia Client = "celestia" Avail Client = "avail" + Grpc Client = "grpc" ) // Option is a function that sets a parameter on the da layer. diff --git a/da/grpc/grpc.go b/da/grpc/grpc.go index d8aadb294..998e970ca 100644 --- a/da/grpc/grpc.go +++ b/da/grpc/grpc.go @@ -74,7 +74,7 @@ func (d *DataAvailabilityLayerClient) Start() error { // Stop closes connection to gRPC server. func (d *DataAvailabilityLayerClient) Stop() error { - d.logger.Info("stopoing GRPC DALC") + d.logger.Info("stopping GRPC DALC") return d.conn.Close() } diff --git a/go.mod b/go.mod index 0686207fd..0976f51e9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/dymensionxyz/dymint go 1.22.4 require ( - code.cloudfoundry.org/go-diodes v0.0.0-20220725190411-383eb6634c40 cosmossdk.io/errors v1.0.1 github.com/avast/retry-go/v4 v4.5.0 github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1 @@ -25,9 +24,11 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/ignite/cli v0.26.1 github.com/informalsystems/tm-load-test v1.3.0 + github.com/ipfs/boxo v0.18.0 github.com/libp2p/go-libp2p v0.33.1 github.com/libp2p/go-libp2p-kad-dht v0.25.2 - github.com/libp2p/go-libp2p-pubsub v0.9.3 + github.com/libp2p/go-libp2p-pubsub v0.10.1 + github.com/libp2p/go-libp2p-routing-helpers v0.7.3 github.com/multiformats/go-multiaddr v0.12.2 github.com/prometheus/client_golang v1.18.0 github.com/rs/cors v1.9.0 @@ -37,11 +38,16 @@ require ( github.com/tendermint/tendermint v0.34.29 go.uber.org/multierr v1.11.0 golang.org/x/net v0.24.0 - gonum.org/v1/gonum v0.13.0 + gonum.org/v1/gonum v0.14.0 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.33.0 ) +require ( + github.com/cskr/pubsub v1.0.2 // indirect + github.com/ipfs/go-block-format v0.2.0 // indirect +) + require ( filippo.io/edwards25519 v1.0.0-rc.1 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -72,13 +78,13 @@ require ( github.com/ethereum/go-ethereum v1.12.0 // indirect github.com/filecoin-project/go-jsonrpc v0.3.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/lib/pq v1.10.7 // indirect @@ -99,13 +105,13 @@ require ( github.com/tyler-smith/go-bip39 v1.1.0 // indirect github.com/vedhavyas/go-subkey v1.0.3 // indirect github.com/zondax/ledger-go v0.14.3 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect - go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/fx v1.20.1 // indirect golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect - golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect @@ -130,7 +136,7 @@ require ( github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/dustin/go-humanize v1.0.1-0.20200219035652-afde56e7acac // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/flynn/noise v1.1.0 // indirect @@ -156,15 +162,15 @@ require ( github.com/gtank/ristretto255 v0.1.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect - github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/ipfs/go-cid v0.4.1 // indirect - github.com/ipfs/go-datastore v0.6.0 // indirect + github.com/ipfs/go-cid v0.4.1 + github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-log/v2 v2.5.1 // indirect - github.com/ipld/go-ipld-prime v0.20.0 // indirect + github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/goprocess v0.1.4 // indirect @@ -199,7 +205,7 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect - github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect @@ -245,6 +251,7 @@ require ( require ( cosmossdk.io/math v1.3.0 // indirect github.com/DataDog/zstd v1.5.2 // indirect + github.com/Jorropo/jsync v1.0.1 // indirect github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/btcsuite/btcd/btcutil v1.1.3 // indirect @@ -261,15 +268,24 @@ require ( github.com/getsentry/sentry-go v0.18.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect github.com/holiman/uint256 v1.2.2 // indirect - github.com/ipfs/boxo v0.10.0 // indirect + github.com/ipfs/bbloom v0.0.4 // indirect + github.com/ipfs/go-ds-leveldb v0.5.0 + github.com/ipfs/go-ipfs-delay v0.0.1 // indirect + github.com/ipfs/go-ipfs-pq v0.0.3 // indirect + github.com/ipfs/go-ipfs-util v0.0.3 // indirect + github.com/ipfs/go-ipld-format v0.6.0 + github.com/ipfs/go-ipld-legacy v0.2.1 // indirect + github.com/ipfs/go-metrics-interface v0.0.1 // indirect + github.com/ipfs/go-peertaskqueue v0.8.1 // indirect + github.com/ipld/go-codec-dagpb v1.6.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/libp2p/go-libp2p-routing-helpers v0.7.2 // indirect github.com/linxGnu/grocksdb v1.8.12 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/common v0.47.0 // indirect github.com/regen-network/cosmos-proto v0.3.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect go.uber.org/mock v0.4.0 // indirect golang.org/x/tools v0.18.0 // indirect pgregory.net/rapid v1.1.0 // indirect @@ -280,7 +296,6 @@ replace ( github.com/evmos/evmos/v12 => github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4 github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1 - github.com/libp2p/go-libp2p-pubsub => github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2 github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.28 ) diff --git a/go.sum b/go.sum index bf884d367..c4673db35 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,6 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.30.1 h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/oNM= cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E= -code.cloudfoundry.org/go-diodes v0.0.0-20220725190411-383eb6634c40 h1:wzkYwwcf4uMGcDpn48WAbq8GtoqDny49tdQ4zJVAsmo= -code.cloudfoundry.org/go-diodes v0.0.0-20220725190411-383eb6634c40/go.mod h1:Nx9ASXN4nIlRDEXv+qXE3dpuhnTnO28Lxl/bMUd6BMc= cosmossdk.io/errors v1.0.1 h1:bzu+Kcr0kS/1DuPBtUFdWjzLqyUuCiyHjyJB6srBV/0= cosmossdk.io/errors v1.0.1/go.mod h1:MeelVSZThMi4bEakzhhhE/CKqVv3nOJDA25bIqRDu/U= cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE= @@ -74,6 +72,8 @@ github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU= +github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= @@ -249,6 +249,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM= github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/danwt/gerr v1.0.0 h1:v3Do0h1r+uctQQVYJfOTCo8uigp8oIaY4OL/wUU8LzI= @@ -300,8 +302,8 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY= github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.1-0.20200219035652-afde56e7acac h1:opbrjaN/L8gg6Xh5D04Tem+8xVcz6ajZlGCs49mQgyg= -github.com/dustin/go-humanize v1.0.1-0.20200219035652-afde56e7acac/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/dymensionxyz/cosmosclient v0.4.2-beta h1:sokBefcN1tIOlUKmB8Q2E9XMJ93LueqtFThiM/kA4DI= @@ -312,8 +314,6 @@ github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 h1:vmAdUGUc4rTIiO3Phezr github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3/go.mod h1:LfPv2O1HXMgETpka81Pg3nXy+U/7urq8dn85ZnSXK5Y= github.com/dymensionxyz/gerr-cosmos v1.0.0 h1:oi91rgOkpJWr41oX9JOyjvvBnhGY54tj513x8VlDAEc= github.com/dymensionxyz/gerr-cosmos v1.0.0/go.mod h1:n+0olxPogzWqFKba45mCpvrHLGmeS8W9UZjggHnWk6c= -github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2 h1:5FMEOpX5OuoRfwwjjA+LxRJXoDT0fFvg8/rlat7z8bE= -github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw= github.com/dymensionxyz/rpc v1.3.1 h1:7EXWIobaBes5zldRvTIg7TmNsEKjicrWA/OjCc0NaGs= github.com/dymensionxyz/rpc v1.3.1/go.mod h1:f+WpX8ysy8wt95iGc6auYlHcnHj2bUkhiRVkkKNys8c= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= @@ -344,8 +344,8 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= -github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= @@ -375,8 +375,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -441,6 +441,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= @@ -497,6 +498,7 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -555,10 +557,10 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs= -github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= -github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= +github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 h1:aSVUgRRRtOrZOC1fYmY9gV0e9z/Iu+xNVSASWjsuyGU= @@ -583,23 +585,47 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/informalsystems/tm-load-test v1.3.0 h1:FGjKy7vBw6mXNakt+wmNWKggQZRsKkEYpaFk/zR64VA= github.com/informalsystems/tm-load-test v1.3.0/go.mod h1:OQ5AQ9TbT5hKWBNIwsMjn6Bf4O0U4b1kRc+0qZlQJKw= -github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY= -github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM= +github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= +github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= +github.com/ipfs/boxo v0.18.0 h1:MOL9/AgoV3e7jlVMInicaSdbgralfqSsbkc31dZ9tmw= +github.com/ipfs/boxo v0.18.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= +github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs= +github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= -github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= +github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= +github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= +github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= +github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= +github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= +github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= +github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= +github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE= +github.com/ipfs/go-ipfs-pq v0.0.3/go.mod h1:btNw5hsHBpRcSSgZtiNm/SLj5gYIZ18AKtv3kERkRb4= +github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= +github.com/ipfs/go-ipfs-util v0.0.3/go.mod h1:LHzG1a0Ig4G+iZ26UUOMjHd+lfM84LZCrn17xAKWBvs= +github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten1F5U= +github.com/ipfs/go-ipld-format v0.6.0/go.mod h1:g4QVMTn3marU3qXchwjpKPKgJv+zF+OlaKMyhJ4LHPg= +github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= +github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= -github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g= -github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M= +github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= +github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= +github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= +github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= +github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= +github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= +github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= @@ -638,6 +664,7 @@ github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoK github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -664,10 +691,12 @@ github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0Trt github.com/libp2p/go-libp2p-kad-dht v0.25.2/go.mod h1:6za56ncRHYXX4Nc2vn8z7CZK0P4QiMcrn77acKLM2Oo= github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0= github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= +github.com/libp2p/go-libp2p-pubsub v0.10.1 h1:/RqOZpEtAolsr8/9CC8KqROJSOZeu7lK7fPftn4MwNg= +github.com/libp2p/go-libp2p-pubsub v0.10.1/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= -github.com/libp2p/go-libp2p-routing-helpers v0.7.2 h1:xJMFyhQ3Iuqnk9Q2dYE1eUTzsah7NLw3Qs2zjUV78T0= -github.com/libp2p/go-libp2p-routing-helpers v0.7.2/go.mod h1:cN4mJAD/7zfPKXBcs9ze31JGYAZgzdABEm+q/hkswb8= +github.com/libp2p/go-libp2p-routing-helpers v0.7.3 h1:u1LGzAMVRK9Nqq5aYDVOiq/HaB93U9WWczBzGyAC5ZY= +github.com/libp2p/go-libp2p-routing-helpers v0.7.3/go.mod h1:cN4mJAD/7zfPKXBcs9ze31JGYAZgzdABEm+q/hkswb8= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= @@ -966,6 +995,7 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= @@ -1004,8 +1034,12 @@ github.com/vedhavyas/go-subkey v1.0.3 h1:iKR33BB/akKmcR2PMlXPBeeODjWLM90EL98OrOG github.com/vedhavyas/go-subkey v1.0.3/go.mod h1:CloUaFQSSTdWnINfBRFjVMkWXZANW+nd8+TI5jYcl6Y= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= +github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= +github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= @@ -1040,12 +1074,12 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= -go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= -go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= -go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= -go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= -go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -1361,10 +1395,10 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= -gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.14.0 h1:2NiG67LD1tEH0D7kM+ps2V+fXmsAnpUeec7n8tcr4S0= +gonum.org/v1/gonum v0.14.0/go.mod h1:AoWeoz0becf9QMWtE8iWXNXc27fK4fNeHNf/oMejGfU= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= diff --git a/node/node.go b/node/node.go index 040656850..7bd296f3a 100644 --- a/node/node.go +++ b/node/node.go @@ -4,18 +4,15 @@ import ( "context" "fmt" "net/http" + "path/filepath" "time" + "github.com/ipfs/go-datastore" + leveldb "github.com/ipfs/go-ds-leveldb" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/libp2p/go-libp2p/core/crypto" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/pubsub" - "github.com/tendermint/tendermint/libs/service" - "github.com/tendermint/tendermint/proxy" - tmtypes "github.com/tendermint/tendermint/types" - "github.com/dymensionxyz/dymint/block" "github.com/dymensionxyz/dymint/config" "github.com/dymensionxyz/dymint/da" @@ -31,6 +28,11 @@ import ( "github.com/dymensionxyz/dymint/settlement" slregistry "github.com/dymensionxyz/dymint/settlement/registry" "github.com/dymensionxyz/dymint/store" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/proxy" + tmtypes "github.com/tendermint/tendermint/types" ) // prefixes used in KV store to separate main node data from DALC data @@ -101,12 +103,21 @@ func NewNode( pubsubServer := pubsub.NewServer() var baseKV store.KV + var dstore datastore.Datastore + if conf.DBConfig.InMemory || (conf.RootDir == "" && conf.DBPath == "") { // this is used for testing logger.Info("WARNING: working in in-memory mode") baseKV = store.NewDefaultInMemoryKVStore() + dstore = datastore.NewMapDatastore() } else { // TODO(omritoptx): Move dymint to const baseKV = store.NewKVStore(conf.RootDir, conf.DBPath, "dymint", conf.DBConfig.SyncWrites) + path := filepath.Join(store.Rootify(conf.RootDir, conf.DBPath), "blocksync") + var err error + dstore, err = leveldb.NewDatastore(path, &leveldb.Options{}) + if err != nil { + return nil, fmt.Errorf("initialize datastore at %s: %w", path, err) + } } s := store.New(store.NewPrefixKV(baseKV, mainPrefix)) @@ -154,7 +165,7 @@ func NewNode( // Set p2p client and it's validators p2pValidator := p2p.NewValidator(logger.With("module", "p2p_validator"), settlementlc) - p2pClient, err := p2p.NewClient(conf.P2PConfig, p2pKey, genesis.ChainID, pubsubServer, logger.With("module", "p2p")) + p2pClient, err := p2p.NewClient(conf.P2PConfig, p2pKey, genesis.ChainID, s, pubsubServer, dstore, logger.With("module", "p2p")) if err != nil { return nil, err } diff --git a/node/node_test.go b/node/node_test.go index 3706ccb47..3a4953b3b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -65,10 +65,11 @@ func TestMempoolDirectly(t *testing.T) { RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - BootstrapNodes: "", + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BootstrapNodes: "", + BlockSyncRequestIntervalTime: 30 * time.Second, }, RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), diff --git a/p2p/block_sync.go b/p2p/block_sync.go new file mode 100644 index 000000000..e562fcfda --- /dev/null +++ b/p2p/block_sync.go @@ -0,0 +1,117 @@ +package p2p + +import ( + "context" + "fmt" + + "github.com/dymensionxyz/dymint/types" + "github.com/ipfs/go-datastore" + dsync "github.com/ipfs/go-datastore/sync" + + mh "github.com/multiformats/go-multihash" + + "github.com/ipfs/boxo/bitswap/client" + "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/bitswap/server" + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/go-cid" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/libp2p/go-libp2p/core/host" +) + +// Blocksync is a protocol used to retrieve blocks on demand from the P2P network. +// Nodes store received blocks from gossip in an IPFS blockstore and nodes are able to request them on demand using bitswap protocol. +// In order to discover the identifier (CID) of each block a DHT request needs to be made for the specific block height. +// Nodes need to advertise CIDs/height map to the DHT periodically. +// https://www.notion.so/dymension/ADR-x-Rollapp-block-sync-protocol-6ee48b232a6a45e09989d67f1a6c0297?pvs=4 +type BlockSync struct { + // service that reads/writes blocks either from local datastore or the P2P network + bsrv blockservice.BlockService + // local datastore for IPFS blocks + bstore blockstore.Blockstore + // protocol used to obtain blocks from the P2P network + net network.BitSwapNetwork + // used to find all data chunks that are part of the same block + dsrv BlockSyncDagService + // used to define the content identifiers of each data chunk + cidBuilder cid.Builder + logger types.Logger +} + +type BlockSyncMessageHandler func(block *P2PBlockEvent) + +// SetupBlockSync initializes all services required to provide and retrieve block data in the P2P network. +func SetupBlockSync(ctx context.Context, h host.Host, store datastore.Datastore, logger types.Logger) *BlockSync { + // construct a datastore + ds := dsync.MutexWrap(store) + + // set a blockstore (to store IPFS data chunks) with the previous datastore + bs := blockstore.NewBlockstore(ds) + + // initialize bitswap network used to retrieve data chunks from other peers in the P2P network + bsnet := network.NewFromIpfsHost(h, &routinghelpers.Null{}, network.Prefix("/dymension/block-sync/")) + + // Bitswap server that provides data to the network. + bsserver := server.New( + ctx, + bsnet, + bs, + server.ProvideEnabled(false), // we don't provide blocks over DHT + server.SetSendDontHaves(false), + ) + + // Bitswap client that retrieves data from the network. + bsclient := client.New( + ctx, + bsnet, + bs, + client.SetSimulateDontHavesOnTimeout(false), + client.WithBlockReceivedNotifier(bsserver), + client.WithoutDuplicatedBlockStats(), + ) + + // start the network + bsnet.Start(bsserver, bsclient) + + bsrv := blockservice.New(bs, bsclient) + + blockSync := &BlockSync{ + bsrv: bsrv, + net: bsnet, + bstore: bs, + dsrv: NewDAGService(bsrv), + cidBuilder: &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + }, + logger: logger, + } + + return blockSync +} + +// SaveBlock stores the blocks produced in the DAG services to be retrievable from the P2P network. +func (blocksync *BlockSync) SaveBlock(ctx context.Context, block []byte) (cid.Cid, error) { + return blocksync.dsrv.SaveBlock(ctx, block) +} + +// LoadBlock retrieves the blocks (from the local blockstore or the network) using the DAGService to discover all data chunks that are part of the same block. +func (blocksync *BlockSync) LoadBlock(ctx context.Context, cid cid.Cid) (P2PBlockEvent, error) { + blockBytes, err := blocksync.dsrv.LoadBlock(ctx, cid) + if err != nil { + return P2PBlockEvent{}, err + } + var block P2PBlockEvent + if err := block.UnmarshalBinary(blockBytes); err != nil { + return P2PBlockEvent{}, fmt.Errorf("deserialize blocksync block %w", err) + } + return block, nil +} + +// RemoveBlock removes the block from the DAGservice. +func (blocksync *BlockSync) DeleteBlock(ctx context.Context, cid cid.Cid) error { + return blocksync.dsrv.Remove(ctx, cid) +} diff --git a/p2p/block_sync_dag.go b/p2p/block_sync_dag.go new file mode 100644 index 000000000..18d78dca3 --- /dev/null +++ b/p2p/block_sync_dag.go @@ -0,0 +1,137 @@ +package p2p + +import ( + "bytes" + "context" + "errors" + "io" + + chunker "github.com/ipfs/boxo/chunker" + mh "github.com/multiformats/go-multihash" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/ipld/merkledag" + dag "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +type BlockSyncDagService struct { + ipld.DAGService + cidBuilder cid.Builder +} + +// NewDAGService inits the DAGservice used to retrieve/send blocks data in the P2P. +// Block data is organized in a merkle DAG using IPLD (https://ipld.io/docs/) +func NewDAGService(bsrv blockservice.BlockService) BlockSyncDagService { + bsDagService := &BlockSyncDagService{ + cidBuilder: &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + }, + } + bsDagService.DAGService = merkledag.NewDAGService(bsrv) + + return *bsDagService +} + +// SaveBlock splits the block in chunks of 256KB and it creates a new merkle DAG with them. it returns the content identifier (cid) of the root node of the DAG. +// Using the root CID the whole block can be retrieved using the DAG service +func (bsDagService *BlockSyncDagService) SaveBlock(ctx context.Context, block []byte) (cid.Cid, error) { + blockReader := bytes.NewReader(block) + + splitter := chunker.NewSizeSplitter(blockReader, chunker.DefaultBlockSize) + nodes := []*dag.ProtoNode{} + + // the loop creates nodes for each block chunk and sets each cid + for { + nextData, err := splitter.NextBytes() + if err == io.EOF { + break + } + if err != nil { + return cid.Undef, err + } + protoNode := dag.NodeWithData(nextData) + err = protoNode.SetCidBuilder(bsDagService.cidBuilder) + if err != nil { + return cid.Undef, err + } + nodes = append(nodes, protoNode) + + } + + // an empty root node is created + root := dag.NodeWithData(nil) + err := root.SetCidBuilder(bsDagService.cidBuilder) + if err != nil { + return cid.Undef, err + } + + // and linked to all chunks that are added to the DAGservice + for _, n := range nodes { + + err := root.AddNodeLink(n.Cid().String(), n) + if err != nil { + return cid.Undef, err + } + err = bsDagService.Add(ctx, n) + if err != nil { + return cid.Undef, err + } + } + err = bsDagService.Add(ctx, root) + if err != nil { + return cid.Undef, err + } + + return root.Cid(), nil +} + +// LoadBlock returns the block data obtained from the DAGService, using the root cid, either from the network or the local blockstore +func (bsDagService *BlockSyncDagService) LoadBlock(ctx context.Context, cid cid.Cid) ([]byte, error) { + // first it gets the root node + nd, err := bsDagService.Get(ctx, cid) + if err != nil { + return nil, err + } + + // then it gets all the data from the root node + read, err := dagReader(nd, bsDagService) + if err != nil { + return nil, err + } + + // the data is read to bytes array + data, err := io.ReadAll(read) + if err != nil { + return nil, err + } + return data, nil +} + +// dagReader is used to read the DAG (all the block chunks) from the root (IPLD) node +func dagReader(root ipld.Node, ds ipld.DAGService) (io.Reader, error) { + ctx := context.Background() + buf := new(bytes.Buffer) + + // the loop retrieves all the nodes (block chunks) either from the local store or the network, in case it is not there. + for _, l := range root.Links() { + n, err := ds.Get(ctx, l.Cid) + if err != nil { + return nil, err + } + rawdata, ok := n.(*dag.ProtoNode) + if !ok { + return nil, errors.New("read block DAG") + } + + _, err = buf.Write(rawdata.Data()) + if err != nil { + return nil, err + } + } + return buf, nil +} diff --git a/p2p/block_sync_test.go b/p2p/block_sync_test.go new file mode 100644 index 000000000..3baf01cef --- /dev/null +++ b/p2p/block_sync_test.go @@ -0,0 +1,63 @@ +package p2p_test + +import ( + "context" + "testing" + + "github.com/dymensionxyz/dymint/p2p" + "github.com/dymensionxyz/dymint/testutil" + "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" +) + +func TestBlockSync(t *testing.T) { + + logger := log.TestingLogger() + ctx := context.Background() + + manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, nil, nil) + require.NoError(t, err) + require.NotNil(t, manager) + + // required for tx validator + assertRecv := func(tx *p2p.GossipMessage) bool { + return true + } + + // Create a block for height 1 + blocks, err := testutil.GenerateBlocksWithTxs(1, 1, manager.LocalKey, 1) + require.NoError(t, err) + + // Create commit + commits, err := testutil.GenerateCommits(blocks, manager.LocalKey) + require.NoError(t, err) + + gossipedBlock := p2p.P2PBlockEvent{Block: *blocks[0], Commit: *commits[0]} + gossipedBlockbytes, err := gossipedBlock.MarshalBinary() + require.NoError(t, err) + + // validators required + validators := []p2p.GossipValidator{assertRecv, assertRecv, assertRecv, assertRecv, assertRecv} + + clients := testutil.StartTestNetwork(ctx, t, 1, map[int]testutil.HostDescr{ + 0: {Conns: []int{}, ChainID: "1"}, + }, validators, logger) + + blocksync := p2p.SetupBlockSync(ctx, clients[0].Host, datastore.NewMapDatastore(), logger) + require.NoError(t, err) + + //add block to blocksync protocol client 0 + cid, err := blocksync.SaveBlock(ctx, gossipedBlockbytes) + require.NoError(t, err) + + //get block + block, err := blocksync.LoadBlock(ctx, cid) + require.NoError(t, err) + require.Equal(t, gossipedBlock, block) + + //remove block + err = blocksync.DeleteBlock(ctx, cid) + require.NoError(t, err) + +} diff --git a/p2p/blocks_received.go b/p2p/blocks_received.go new file mode 100644 index 000000000..ceaf0bf67 --- /dev/null +++ b/p2p/blocks_received.go @@ -0,0 +1,43 @@ +package p2p + +import "sync" + +// BlocksReceived tracks blocks received from P2P to know what are the missing blocks that need to be requested on demand +type BlocksReceived struct { + blocksReceived map[uint64]struct{} + latestSeenHeight uint64 + // mutex to protect blocksReceived map access + blockReceivedMu sync.Mutex +} + +// addBlockReceived adds the block height to a map +func (br *BlocksReceived) AddBlockReceived(height uint64) { + br.latestSeenHeight = max(height, br.latestSeenHeight) + br.blockReceivedMu.Lock() + defer br.blockReceivedMu.Unlock() + br.blocksReceived[height] = struct{}{} +} + +// isBlockReceived checks if a block height is already received +func (br *BlocksReceived) IsBlockReceived(height uint64) bool { + br.blockReceivedMu.Lock() + defer br.blockReceivedMu.Unlock() + _, ok := br.blocksReceived[height] + return ok +} + +// removeBlocksReceivedUpToHeight clears previous received block heights +func (br *BlocksReceived) RemoveBlocksReceivedUpToHeight(appliedHeight uint64) { + br.blockReceivedMu.Lock() + defer br.blockReceivedMu.Unlock() + for h := range br.blocksReceived { + if h < appliedHeight { + delete(br.blocksReceived, h) + } + } +} + +// GetLatestSeenHeight returns the latest height stored +func (br *BlocksReceived) GetLatestSeenHeight() uint64 { + return br.latestSeenHeight +} diff --git a/p2p/client.go b/p2p/client.go index f5d26f774..ed08f8565 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -4,9 +4,14 @@ import ( "context" "encoding/hex" "fmt" + "strconv" "strings" "time" + "github.com/dymensionxyz/dymint/store" + "github.com/dymensionxyz/gerr-cosmos/gerrc" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -40,6 +45,9 @@ const ( // blockTopicSuffix is added after namespace to create pubsub topic for block gossiping. blockTopicSuffix = "-block" + + // blockSyncProtocolSuffix is added after namespace to create block-sync protocol prefix. + blockSyncProtocolPrefix = "block-sync" ) // Client is a P2P client, implemented with libp2p. @@ -69,15 +77,25 @@ type Client struct { localPubsubServer *tmpubsub.Server logger types.Logger + + // block-sync instance used to save and retrieve blocks from the P2P network on demand + blocksync *BlockSync + + // store used to store retrievable blocks using block-sync + blockSyncStore datastore.Datastore + + store store.Store + + blocksReceived *BlocksReceived } // NewClient creates new Client object. // // Basic checks on parameters are done, and default parameters are provided for unset-configuration // TODO(tzdybal): consider passing entire config, not just P2P config, to reduce number of arguments -func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, localPubsubServer *tmpubsub.Server, logger types.Logger) (*Client, error) { +func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, store store.Store, localPubsubServer *tmpubsub.Server, blockSyncStore datastore.Datastore, logger types.Logger) (*Client, error) { if privKey == nil { - return nil, errNoPrivKey + return nil, fmt.Errorf("private key: %w", gerrc.ErrNotFound) } if conf.ListenAddress == "" { conf.ListenAddress = config.DefaultListenAddress @@ -89,6 +107,11 @@ func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, lo chainID: chainID, logger: logger, localPubsubServer: localPubsubServer, + blockSyncStore: blockSyncStore, + store: store, + blocksReceived: &BlocksReceived{ + blocksReceived: make(map[uint64]struct{}), + }, }, nil } @@ -133,6 +156,17 @@ func (c *Client) StartWithHost(ctx context.Context, h host.Host) error { return err } + if !c.conf.BlockSyncEnabled { + c.logger.Info("Block sync protocol disabled") + return nil + } + + c.logger.Debug("Setting up block sync protocol") + err = c.startBlockSync(ctx) + if err != nil { + return err + } + return nil } @@ -165,6 +199,65 @@ func (c *Client) GossipBlock(ctx context.Context, blockBytes []byte) error { return c.blockGossiper.Publish(ctx, blockBytes) } +// SaveBlock stores the block in the block-sync datastore, stores locally the returned identifier and advertises the identifier to the DHT, so other nodes can know the identifier for the block height. +func (c *Client) SaveBlock(ctx context.Context, height uint64, blockBytes []byte) error { + if !c.conf.BlockSyncEnabled { + return nil + } + cid, err := c.blocksync.SaveBlock(ctx, blockBytes) + if err != nil { + return fmt.Errorf("block-sync add block: %w", err) + } + _, err = c.store.SaveBlockCid(height, cid, nil) + if err != nil { + return fmt.Errorf("block-sync store block id: %w", err) + } + advErr := c.AdvertiseBlockIdToDHT(ctx, height, cid) + if advErr != nil { + return fmt.Errorf("block-sync advertise block %w", advErr) + } + return nil +} + +// RemoveBlocks is used to prune blocks from the block sync datastore. +func (c *Client) RemoveBlocks(ctx context.Context, from, to uint64) error { + if from <= 0 { + return fmt.Errorf("from height must be greater than 0: %w", gerrc.ErrInvalidArgument) + } + + if to <= from { + return fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerrc.ErrInvalidArgument) + } + + for h := from; h < to; h++ { + + cid, err := c.store.LoadBlockCid(h) + if err != nil { + return fmt.Errorf("load block id from store %d: %w", h, err) + } + err = c.blocksync.DeleteBlock(ctx, cid) + if err != nil { + return fmt.Errorf("remove block height %d: %w", h, err) + } + } + return nil +} + +// AdvertiseBlockIdToDHT is used to advertise the identifier (cid) for a specific block height to the DHT, using a PutValue operation +func (c *Client) AdvertiseBlockIdToDHT(ctx context.Context, height uint64, cid cid.Cid) error { + err := c.DHT.PutValue(ctx, getBlockSyncKeyByHeight(height), []byte(cid.String())) + return err +} + +// GetBlockIdFromDHT is used to retrieve the identifier (cid) for a specific block height from the DHT, using a GetValue operation +func (c *Client) GetBlockIdFromDHT(ctx context.Context, height uint64) (cid.Cid, error) { + cidBytes, err := c.DHT.GetValue(ctx, getBlockSyncKeyByHeight(height)) + if err != nil { + return cid.Undef, err + } + return cid.MustParse(string(cidBytes)), nil +} + // SetBlockValidator sets the callback function, that will be invoked after block is received from P2P network. func (c *Client) SetBlockValidator(validator GossipValidator) { c.blockValidator = validator @@ -243,7 +336,9 @@ func (c *Client) setupDHT(ctx context.Context) error { } var err error - c.DHT, err = dht.New(ctx, c.Host, dht.Mode(dht.ModeServer), dht.BootstrapPeers(bootstrapNodes...)) + + val := dht.NamespacedValidator(blockSyncProtocolPrefix, blockIdValidator{}) + c.DHT, err = dht.New(ctx, c.Host, dht.Mode(dht.ModeServer), dht.ProtocolPrefix(blockSyncProtocolPrefix), val, dht.BootstrapPeers(bootstrapNodes...)) if err != nil { return fmt.Errorf("create DHT: %w", err) } @@ -292,6 +387,14 @@ func (c *Client) setupPeerDiscovery(ctx context.Context) error { return nil } +func (c *Client) startBlockSync(ctx context.Context) error { + blocksync := SetupBlockSync(ctx, c.Host, c.blockSyncStore, c.logger) + c.blocksync = blocksync + go c.retrieveBlockSyncLoop(ctx, c.blockSyncReceived) + go c.advertiseBlockSyncCids(ctx) + return nil +} + func (c *Client) advertise(ctx context.Context) error { discutil.Advertise(ctx, c.disc, c.getNamespace(), cdiscovery.TTL(reAdvertisePeriod)) return nil @@ -337,7 +440,7 @@ func (c *Client) setupGossiping(ctx context.Context) error { } go c.txGossiper.ProcessMessages(ctx) - c.blockGossiper, err = NewGossiper(c.Host, ps, c.getBlockTopic(), c.gossipedBlockReceived, c.logger, + c.blockGossiper, err = NewGossiper(c.Host, ps, c.getBlockTopic(), c.blockGossipReceived, c.logger, WithValidator(c.blockValidator)) if err != nil { return err @@ -377,10 +480,12 @@ func (c *Client) getNamespace() string { return c.chainID } +// topic used to transmit transactions in gossipsub func (c *Client) getTxTopic() string { return c.getNamespace() + txTopicSuffix } +// topic used to transmit blocks in gossipsub func (c *Client) getBlockTopic() string { return c.getNamespace() + blockTopicSuffix } @@ -393,17 +498,38 @@ func (c *Client) NewTxValidator() GossipValidator { } } -func (c *Client) gossipedBlockReceived(msg *GossipMessage) { - var gossipedBlock GossipedBlock - if err := gossipedBlock.UnmarshalBinary(msg.Data); err != nil { +// blockSyncReceived is called on reception of new block via block-sync protocol +func (c *Client) blockSyncReceived(block *P2PBlockEvent) { + err := c.localPubsubServer.PublishWithEvents(context.Background(), *block, map[string][]string{EventTypeKey: {EventNewBlockSyncBlock}}) + if err != nil { + c.logger.Error("Publishing event.", "err", err) + } + // Received block is cached and no longer needed to request using block-sync + c.blocksReceived.AddBlockReceived(block.Block.Header.Height) +} + +// blockSyncReceived is called on reception of new block via gossip protocol +func (c *Client) blockGossipReceived(ctx context.Context, block []byte) { + var gossipedBlock P2PBlockEvent + if err := gossipedBlock.UnmarshalBinary(block); err != nil { c.logger.Error("Deserialize gossiped block", "error", err) } err := c.localPubsubServer.PublishWithEvents(context.Background(), gossipedBlock, map[string][]string{EventTypeKey: {EventNewGossipedBlock}}) if err != nil { c.logger.Error("Publishing event.", "err", err) } + if c.conf.BlockSyncEnabled { + err = c.SaveBlock(ctx, gossipedBlock.Block.Header.Height, block) + if err != nil { + c.logger.Error("Adding block to blocksync store.", "err", err, "height", gossipedBlock.Block.Header.Height) + } + // Received block is cached and no longer needed to request using block-sync + c.blocksReceived.AddBlockReceived(gossipedBlock.Block.Header.Height) + } } +// bootstrapLoop is used to periodically check if the node is connected to other nodes in the P2P network, re-bootstrapping the DHT in case it is necessary, +// or to try to connect to the persistent peers func (c *Client) bootstrapLoop(ctx context.Context) { ticker := time.NewTicker(c.conf.BootstrapRetryTime) defer ticker.Stop() @@ -430,6 +556,75 @@ func (c *Client) bootstrapLoop(ctx context.Context) { } } +// retrieveBlockSyncLoop checks if there is any block not received, previous to the latest block height received, to request it on demand +func (c *Client) retrieveBlockSyncLoop(ctx context.Context, msgHandler BlockSyncMessageHandler) { + ticker := time.NewTicker(c.conf.BlockSyncRequestIntervalTime) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + state, err := c.store.LoadState() + if err != nil { + continue + } + + // this loop iterates and retrieves all the blocks between the last block applied and the greatest height received, + // skipping any block cached, since are already received. + for h := state.NextHeight(); h <= c.blocksReceived.latestSeenHeight; h++ { + ok := c.blocksReceived.IsBlockReceived(h) + if ok { + continue + } + c.logger.Debug("Getting block.", "height", h) + id, err := c.GetBlockIdFromDHT(ctx, h) + if err != nil || id == cid.Undef { + c.logger.Error("unable to find cid", "height", h) + continue + } + _, err = c.store.SaveBlockCid(h, id, nil) + if err != nil { + c.logger.Error("storing block cid", "height", h, "cid", id) + continue + } + block, err := c.blocksync.LoadBlock(ctx, id) + if err != nil { + c.logger.Error("Blocksync GetBlock", "err", err) + continue + } + + c.logger.Debug("Blocksync block received ", "cid", id) + msgHandler(&block) + } + c.blocksReceived.RemoveBlocksReceivedUpToHeight(state.NextHeight()) + } + } +} + +// advertiseBlockSyncCids is used to advertise all the block identifiers (cids) stored in the local store to the DHT on startup +func (c *Client) advertiseBlockSyncCids(ctx context.Context) { + state, err := c.store.LoadState() + if err != nil { + return + } + for h := state.BaseHeight; h <= state.Height(); h++ { + + id, err := c.store.LoadBlockCid(h) + if err != nil || id == cid.Undef { + continue + } + + err = c.AdvertiseBlockIdToDHT(ctx, h, id) + if err != nil { + continue + } + + } +} + +// findConnection returns true in case the node is already connected to the peer specified. func (c *Client) findConnection(peer peer.AddrInfo) bool { for _, con := range c.Host.Network().Conns() { if peer.ID == con.RemotePeer() { @@ -438,3 +633,17 @@ func (c *Client) findConnection(peer peer.AddrInfo) bool { } return false } + +func getBlockSyncKeyByHeight(height uint64) string { + return "/" + blockSyncProtocolPrefix + "/" + strconv.FormatUint(height, 10) +} + +// validates that the content identifiers advertised in the DHT are valid. +type blockIdValidator struct{} + +func (blockIdValidator) Validate(_ string, id []byte) error { + _, err := cid.Parse(string(id)) + return err +} + +func (blockIdValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } diff --git a/p2p/client_test.go b/p2p/client_test.go index ddf1c0944..e2d291a22 100644 --- a/p2p/client_test.go +++ b/p2p/client_test.go @@ -3,16 +3,21 @@ package p2p_test import ( "context" "crypto/rand" + "strconv" "sync" "testing" "time" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/dymensionxyz/dymint/store" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub" @@ -26,11 +31,13 @@ func TestClientStartup(t *testing.T) { pubsubServer := pubsub.NewServer() err := pubsubServer.Start() require.NoError(t, err) + store := store.New(store.NewDefaultInMemoryKVStore()) client, err := p2p.NewClient(config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - }, privKey, "TestChain", pubsubServer, log.TestingLogger()) + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + }, privKey, "TestChain", store, pubsubServer, datastore.NewMapDatastore(), log.TestingLogger()) assert := assert.New(t) assert.NoError(err) assert.NotNil(client) @@ -134,6 +141,91 @@ func TestGossiping(t *testing.T) { wg.Wait() } +// Test that advertises and retrieves a CID for a block height in the DHT +func TestAdvertiseBlock(t *testing.T) { + logger := log.TestingLogger() + + ctx := context.Background() + + // required for tx validator + assertRecv := func(tx *p2p.GossipMessage) bool { + return true + } + + // Create a cid manually by specifying the 'prefix' parameters + pref := &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + } + + // And then feed it some data + expectedCid, err := pref.Sum([]byte("test")) + require.NoError(t, err) + + // validators required + validators := []p2p.GossipValidator{assertRecv, assertRecv, assertRecv, assertRecv, assertRecv} + + // network connections topology: 3<->1<->0<->2<->4 + clients := testutil.StartTestNetwork(ctx, t, 3, map[int]testutil.HostDescr{ + 0: {Conns: []int{}, ChainID: "1"}, + 1: {Conns: []int{0}, ChainID: "1"}, + 2: {Conns: []int{1}, ChainID: "1"}, + }, validators, logger) + + // wait for clients to finish refreshing routing tables + clients.WaitForDHT() + + // this sleep is required for pubsub to "propagate" subscription information + // TODO(tzdybal): is there a better way to wait for readiness? + time.Sleep(1 * time.Second) + + // advertise cid for height 1 + err = clients[2].AdvertiseBlockIdToDHT(ctx, 1, expectedCid) + require.NoError(t, err) + + // get cid for height 1 + receivedCid, err := clients[0].GetBlockIdFromDHT(ctx, 1) + require.NoError(t, err) + require.Equal(t, expectedCid, receivedCid) + +} + +// Test that advertises an invalid CID in the DHT +func TestAdvertiseWrongCid(t *testing.T) { + logger := log.TestingLogger() + + ctx := context.Background() + + // required for tx validator + assertRecv := func(tx *p2p.GossipMessage) bool { + return true + } + + validators := []p2p.GossipValidator{assertRecv, assertRecv, assertRecv, assertRecv, assertRecv} + + // network connections topology: 3<->1<->0<->2<->4 + clients := testutil.StartTestNetwork(ctx, t, 3, map[int]testutil.HostDescr{ + 0: {Conns: []int{}, ChainID: "1"}, + 1: {Conns: []int{0}, ChainID: "1"}, + 2: {Conns: []int{1}, ChainID: "1"}, + }, validators, logger) + + // wait for clients to finish refreshing routing tables + clients.WaitForDHT() + + // this sleep is required for pubsub to "propagate" subscription information + // TODO(tzdybal): is there a better way to wait for readiness? + time.Sleep(1 * time.Second) + + // advertise cid for height 1 + receivedError := clients[2].DHT.PutValue(ctx, "/block-sync/"+strconv.FormatUint(1, 10), []byte("test")) + + require.Error(t, cid.ErrInvalidCid{}, receivedError) + +} + func TestSeedStringParsing(t *testing.T) { t.Parallel() @@ -178,10 +270,11 @@ func TestSeedStringParsing(t *testing.T) { assert := assert.New(t) require := require.New(t) logger := &testutil.MockLogger{} + store := store.New(store.NewDefaultInMemoryKVStore()) client, err := p2p.NewClient(config.P2PConfig{ GossipSubCacheSize: 50, BootstrapRetryTime: 30 * time.Second, - }, privKey, "TestNetwork", pubsubServer, logger) + }, privKey, "TestNetwork", store, pubsubServer, datastore.NewMapDatastore(), logger) require.NoError(err) require.NotNil(client) actual := client.GetSeedAddrInfo(c.input) diff --git a/p2p/errors.go b/p2p/errors.go deleted file mode 100644 index 071dc4b79..000000000 --- a/p2p/errors.go +++ /dev/null @@ -1,5 +0,0 @@ -package p2p - -import "errors" - -var errNoPrivKey = errors.New("private key not provided") diff --git a/p2p/events.go b/p2p/events.go index 033ccdae5..45a0064a5 100644 --- a/p2p/events.go +++ b/p2p/events.go @@ -14,12 +14,16 @@ const ( ) const ( - EventNewGossipedBlock = "NewGossipedBlock" + EventNewGossipedBlock = "NewGossipedBlock" + EventNewBlockSyncBlock = "NewBlockSyncBlock" ) /* -------------------------------------------------------------------------- */ /* Queries */ /* -------------------------------------------------------------------------- */ -// EventQueryNewNewGossipedBlock is the query used for getting EventNewGossipedBlock -var EventQueryNewNewGossipedBlock = uevent.QueryFor(EventTypeKey, EventNewGossipedBlock) +// EventQueryNewGossipedBlock is the query used for getting EventNewGossipedBlock +var EventQueryNewGossipedBlock = uevent.QueryFor(EventTypeKey, EventNewGossipedBlock) + +// EventQueryNewBlockSyncBlock is the query used for getting EventNewBlockSyncBlock +var EventQueryNewBlockSyncBlock = uevent.QueryFor(EventTypeKey, EventNewBlockSyncBlock) diff --git a/p2p/gossip.go b/p2p/gossip.go index 21ea9ad6c..6d4236e4c 100644 --- a/p2p/gossip.go +++ b/p2p/gossip.go @@ -25,7 +25,7 @@ type GossipMessage struct { // GossiperOption sets optional parameters of Gossiper. type GossiperOption func(*Gossiper) error -type GossipMessageHandler func(msg *GossipMessage) +type GossipMessageHandler func(ctx context.Context, gossipedBlock []byte) // WithValidator options registers topic validator for Gossiper. func WithValidator(validator GossipValidator) GossiperOption { @@ -103,10 +103,7 @@ func (g *Gossiper) ProcessMessages(ctx context.Context) { return } if g.msgHandler != nil { - g.msgHandler(&GossipMessage{ - Data: msg.Data, - From: msg.GetFrom(), - }) + g.msgHandler(ctx, msg.Data) } } } diff --git a/p2p/gossiped_block.go b/p2p/p2p_block.go similarity index 75% rename from p2p/gossiped_block.go rename to p2p/p2p_block.go index 7398299d0..ac814d51c 100644 --- a/p2p/gossiped_block.go +++ b/p2p/p2p_block.go @@ -10,8 +10,8 @@ import ( /* Event Data */ /* -------------------------------------------------------------------------- */ -// GossipedBlock defines the struct of the event data for the GossipedBlock -type GossipedBlock struct { +// P2PBlockEvent defines the struct of the event data for the Block sent via P2P +type P2PBlockEvent struct { // Block is the block that was gossiped Block types.Block // Commit is the commit that was gossiped @@ -19,12 +19,12 @@ type GossipedBlock struct { } // MarshalBinary encodes GossipedBlock into binary form and returns it. -func (e *GossipedBlock) MarshalBinary() ([]byte, error) { +func (e *P2PBlockEvent) MarshalBinary() ([]byte, error) { return e.ToProto().Marshal() } // UnmarshalBinary decodes binary form of GossipedBlock into object. -func (e *GossipedBlock) UnmarshalBinary(data []byte) error { +func (e *P2PBlockEvent) UnmarshalBinary(data []byte) error { var pbGossipedBlock pb.GossipedBlock err := pbGossipedBlock.Unmarshal(data) if err != nil { @@ -35,15 +35,15 @@ func (e *GossipedBlock) UnmarshalBinary(data []byte) error { } // ToProto converts Data into protobuf representation and returns it. -func (e *GossipedBlock) ToProto() *pb.GossipedBlock { +func (e *P2PBlockEvent) ToProto() *pb.GossipedBlock { return &pb.GossipedBlock{ Block: e.Block.ToProto(), Commit: e.Commit.ToProto(), } } -// FromProto fills GossipedBlock with data from its protobuf representation. -func (e *GossipedBlock) FromProto(other *pb.GossipedBlock) error { +// FromProto fills P2PBlock with data from its protobuf representation. +func (e *P2PBlockEvent) FromProto(other *pb.GossipedBlock) error { if err := e.Block.FromProto(other.Block); err != nil { return err } @@ -53,8 +53,8 @@ func (e *GossipedBlock) FromProto(other *pb.GossipedBlock) error { return nil } -// Validate run basic validation on the gossiped block -func (e *GossipedBlock) Validate(proposer *types.Sequencer) error { +// Validate run basic validation on the p2p block +func (e *P2PBlockEvent) Validate(proposer *types.Sequencer) error { if err := e.Block.ValidateBasic(); err != nil { return err } diff --git a/p2p/validator.go b/p2p/validator.go index ee25939fd..0b1362b6a 100644 --- a/p2p/validator.go +++ b/p2p/validator.go @@ -71,7 +71,7 @@ func (v *Validator) TxValidator(mp mempool.Mempool, mpoolIDS *nodemempool.Mempoo // BlockValidator runs basic checks on the gossiped block func (v *Validator) BlockValidator() GossipValidator { return func(blockMsg *GossipMessage) bool { - var gossipedBlock GossipedBlock + var gossipedBlock P2PBlockEvent if err := gossipedBlock.UnmarshalBinary(blockMsg.Data); err != nil { v.logger.Error("Deserialize gossiped block.", "error", err) return false diff --git a/p2p/validator_test.go b/p2p/validator_test.go index 082c316e7..d025c1314 100644 --- a/p2p/validator_test.go +++ b/p2p/validator_test.go @@ -168,7 +168,7 @@ func TestValidator_BlockValidator(t *testing.T) { } // Create gossiped block - gossipedBlock := p2p.GossipedBlock{Block: *block, Commit: *commit} + gossipedBlock := p2p.P2PBlockEvent{Block: *block, Commit: *commit} gossipedBlockBytes, err := gossipedBlock.MarshalBinary() require.NoError(t, err) blockMsg := &p2p.GossipMessage{ diff --git a/rpc/client/client_test.go b/rpc/client/client_test.go index 46db61a05..741521bb3 100644 --- a/rpc/client/client_test.go +++ b/rpc/client/client_test.go @@ -103,10 +103,11 @@ func TestGenesisChunked(t *testing.T) { RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, }, RPC: config.RPCConfig{}, BlockManagerConfig: config.BlockManagerConfig{ @@ -704,10 +705,11 @@ func TestValidatorSetHandling(t *testing.T) { DALayer: "mock", SettlementLayer: "mock", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 10 * time.Millisecond, @@ -861,10 +863,11 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, }, RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), @@ -972,10 +975,11 @@ func TestMempool2Nodes(t *testing.T) { RollappID: rollappID, }, P2PConfig: config.P2PConfig{ - ListenAddress: "/ip4/127.0.0.1/tcp/9001", - BootstrapNodes: "", - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: "/ip4/127.0.0.1/tcp/9001", + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 100 * time.Millisecond, @@ -1002,10 +1006,11 @@ func TestMempool2Nodes(t *testing.T) { MaxBatchSkew: 10, }, P2PConfig: config.P2PConfig{ - ListenAddress: "/ip4/127.0.0.1/tcp/9002", - BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(), - BootstrapRetryTime: 30 * time.Second, - GossipSubCacheSize: 50, + ListenAddress: "/ip4/127.0.0.1/tcp/9002", + BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(), + BootstrapRetryTime: 30 * time.Second, + GossipSubCacheSize: 50, + BlockSyncRequestIntervalTime: 30 * time.Second, }, MempoolConfig: *tmcfg.DefaultMempoolConfig(), }, key2, signingKey2, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: rollappID}, log.TestingLogger(), mempool.NopMetrics()) diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index e3c52dfd6..995a8b879 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -313,9 +313,10 @@ func getRPC(t *testing.T) (*tmmocks.MockApplication, *client.Client) { RollappID: rollappID, }, P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, }, } node, err := node.NewNode( diff --git a/store/badger.go b/store/badger.go index 58019f784..dd72dd79f 100644 --- a/store/badger.go +++ b/store/badger.go @@ -31,7 +31,7 @@ func NewDefaultInMemoryKVStore() KV { } func NewKVStore(rootDir, dbPath, dbName string, syncWrites bool) KV { - path := filepath.Join(rootify(rootDir, dbPath), dbName) + path := filepath.Join(Rootify(rootDir, dbPath), dbName) db, err := badger.Open(badger.DefaultOptions(path).WithSyncWrites(syncWrites)) if err != nil { panic(err) @@ -46,8 +46,8 @@ func NewDefaultKVStore(rootDir, dbPath, dbName string) KV { return NewKVStore(rootDir, dbPath, dbName, true) } -// rootify works just like in cosmos-sdk -func rootify(rootDir, dbPath string) string { +// Rootify is helper function to make config creation independent of root dir +func Rootify(rootDir, dbPath string) string { if filepath.IsAbs(dbPath) { return dbPath } diff --git a/store/pruning.go b/store/pruning.go index 3c0a358cc..6b3950c43 100644 --- a/store/pruning.go +++ b/store/pruning.go @@ -48,6 +48,9 @@ func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { if err := batch.Delete(getValidatorsKey(h)); err != nil { return 0, err } + if err := batch.Delete(getCidKey(h)); err != nil { + return 0, err + } pruned++ diff --git a/store/pruning_test.go b/store/pruning_test.go index 43f9c0e7a..3a75cb508 100644 --- a/store/pruning_test.go +++ b/store/pruning_test.go @@ -6,6 +6,8 @@ import ( "github.com/dymensionxyz/dymint/store" "github.com/dymensionxyz/dymint/testutil" "github.com/dymensionxyz/dymint/types" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/assert" ) @@ -63,10 +65,25 @@ func TestStorePruning(t *testing.T) { _, err := bstore.SaveBlock(block, &types.Commit{}, nil) assert.NoError(err) savedHeights[block.Header.Height] = true + blockBytes, err := block.MarshalBinary() + assert.NoError(err) + // Create a cid manually by specifying the 'prefix' parameters + pref := &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + } + cid, err := pref.Sum(blockBytes) + assert.NoError(err) + _, err = bstore.SaveBlockCid(block.Header.Height, cid, nil) + assert.NoError(err) // TODO: add block responses and commits } + // And then feed it some data + //expectedCid, err := pref.Sum(block) // Validate all blocks are saved for k := range savedHeights { _, err := bstore.LoadBlock(k) @@ -92,9 +109,17 @@ func TestStorePruning(t *testing.T) { _, err = bstore.LoadCommit(k) assert.Error(err, "Commit at height %d should be pruned", k) + + _, err = bstore.LoadBlockCid(k) + assert.Error(err, "Cid at height %d should be pruned", k) + } else { _, err := bstore.LoadBlock(k) assert.NoError(err) + + _, err = bstore.LoadBlockCid(k) + assert.NoError(err) + } } }) diff --git a/store/store.go b/store/store.go index e3c9e69dd..3e09671e5 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/ipfs/go-cid" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" tmtypes "github.com/tendermint/tendermint/types" @@ -21,6 +22,7 @@ var ( statePrefix = [1]byte{4} responsesPrefix = [1]byte{5} validatorsPrefix = [1]byte{6} + cidPrefix = [1]byte{7} ) // DefaultStore is a default store implementation. @@ -251,6 +253,26 @@ func (s *DefaultStore) loadHashFromIndex(height uint64) ([32]byte, error) { return hash, nil } +func (s *DefaultStore) SaveBlockCid(height uint64, cid cid.Cid, batch KVBatch) (KVBatch, error) { + if batch == nil { + return nil, s.db.Set(getCidKey(height), []byte(cid.String())) + } + err := batch.Set(getCidKey(height), []byte(cid.String())) + return batch, err +} + +func (s *DefaultStore) LoadBlockCid(height uint64) (cid.Cid, error) { + cidBytes, err := s.db.Get(getCidKey(height)) + if err != nil { + return cid.Undef, fmt.Errorf("load cid for height %v: %w", height, err) + } + parsedCid, err := cid.Parse(string(cidBytes)) + if err != nil { + return cid.Undef, fmt.Errorf("parse cid: %w", err) + } + return parsedCid, nil +} + func getBlockKey(hash [32]byte) []byte { return append(blockPrefix[:], hash[:]...) } @@ -280,3 +302,9 @@ func getValidatorsKey(height uint64) []byte { binary.BigEndian.PutUint64(buf, height) return append(validatorsPrefix[:], buf[:]...) } + +func getCidKey(height uint64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, height) + return append(cidPrefix[:], buf[:]...) +} diff --git a/store/storeIface.go b/store/storeIface.go index 169b74444..503151d1a 100644 --- a/store/storeIface.go +++ b/store/storeIface.go @@ -2,6 +2,7 @@ package store import ( "github.com/dymensionxyz/dymint/types" + "github.com/ipfs/go-cid" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmtypes "github.com/tendermint/tendermint/types" ) @@ -74,4 +75,8 @@ type Store interface { PruneBlocks(from, to uint64) (uint64, error) Close() error + + SaveBlockCid(height uint64, cid cid.Cid, batch KVBatch) (KVBatch, error) + + LoadBlockCid(height uint64) (cid.Cid, error) } diff --git a/store/store_test.go b/store/store_test.go index 1c6080fb6..26db3190a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -5,6 +5,9 @@ import ( "testing" "github.com/dymensionxyz/gerr-cosmos/gerrc" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + abcitypes "github.com/tendermint/tendermint/abci/types" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" @@ -198,3 +201,52 @@ func TestBatch(t *testing.T) { assert.NotNil(resp) assert.Equal(expected, resp) } + +// test for saving and loading cids for specific block heights in the store with and w/out batches +func TestBlockId(t *testing.T) { + require := require.New(t) + + kv := store.NewDefaultInMemoryKVStore() + s := store.New(kv) + + // Create a cid manually by specifying the 'prefix' parameters + pref := &cid.Prefix{ + Codec: cid.DagProtobuf, + MhLength: -1, + MhType: mh.SHA2_256, + Version: 1, + } + + // And then feed it some data + expectedCid, err := pref.Sum([]byte("test")) + require.NoError(err) + + // store cid for height 1 + _, err = s.SaveBlockCid(1, expectedCid, nil) + require.NoError(err) + + // retrieve cid for height 1 + resultCid, err := s.LoadBlockCid(1) + require.NoError(err) + + require.Equal(expectedCid, resultCid) + + // repeat test using batch + batch := s.NewBatch() + + // store cid for height 2 + batch, err = s.SaveBlockCid(2, expectedCid, batch) + require.NoError(err) + + // retrieve cid for height 2 + _, err = s.LoadBlockCid(2) + require.Error(err, gerrc.ErrNotFound) + + // commit + batch.Commit() + + //retrieve cid for height 2 + resultCid, err = s.LoadBlockCid(2) + require.NoError(err) + require.Equal(expectedCid, resultCid) +} diff --git a/testutil/block.go b/testutil/block.go index 54bbddb02..aa2941aa2 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -9,6 +9,7 @@ import ( "github.com/dymensionxyz/dymint/block" "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/settlement" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/tendermint/tendermint/libs/log" @@ -92,9 +93,10 @@ func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypt // Init p2p client and validator p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) p2pClient, err := p2p.NewClient(config.P2PConfig{ - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - }, p2pKey, "TestChain", pubsubServer, logger) + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + }, p2pKey, "TestChain", managerStore, pubsubServer, datastore.NewMapDatastore(), logger) if err != nil { return nil, err } diff --git a/testutil/p2p.go b/testutil/p2p.go index 32ee0e6a1..8dd88fbe8 100644 --- a/testutil/p2p.go +++ b/testutil/p2p.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -19,6 +20,7 @@ import ( "github.com/dymensionxyz/dymint/config" "github.com/dymensionxyz/dymint/p2p" + "github.com/dymensionxyz/dymint/store" "github.com/dymensionxyz/dymint/types" ) @@ -106,16 +108,20 @@ func StartTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]Hos require.NoError(err) clients := make([]*p2p.Client, n) + store := store.New(store.NewDefaultInMemoryKVStore()) for i := 0; i < n; i++ { client, err := p2p.NewClient(config.P2PConfig{ - BootstrapNodes: seeds[i], - GossipSubCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: config.DefaultListenAddress, + BootstrapNodes: seeds[i], + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BlockSyncRequestIntervalTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BlockSyncEnabled: true, }, mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()), conf[i].ChainID, - pubsubServer, + store, + pubsubServer, datastore.NewMapDatastore(), logger) require.NoError(err) require.NotNil(client) diff --git a/testutil/types.go b/testutil/types.go index 622ec4667..09cee1bb6 100644 --- a/testutil/types.go +++ b/testutil/types.go @@ -56,7 +56,6 @@ func generateBlock(height uint64) *types.Block { Block: BlockVersion, App: AppVersion, }, - NamespaceID: [8]byte{0, 1, 2, 3, 4, 5, 6, 7}, Height: height, Time: 4567, LastHeaderHash: h[0], diff --git a/types/block_source.go b/types/block_source.go index 343346d60..26c24cdea 100644 --- a/types/block_source.go +++ b/types/block_source.go @@ -9,17 +9,18 @@ type BlockSource uint64 const ( _ BlockSource = iota - ProducedBlock - GossipedBlock - DABlock - LocalDbBlock + Produced + Gossiped + BlockSync + DA + LocalDb ) func (s BlockSource) String() string { return AllSources[s] } -var AllSources = []string{"none", "produced", "gossip", "da", "local_db"} +var AllSources = []string{"none", "produced", "gossip", "blocksync", "da", "local_db"} type BlockMetaData struct { Source BlockSource @@ -29,6 +30,7 @@ type BlockMetaData struct { type CachedBlock struct { Block *Block Commit *Commit + Source BlockSource } func GetAddress(key crypto.PrivKey) ([]byte, error) {