From 056283b14097fcc45cb4442ab8bfd2f96d0d983a Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Tue, 4 Apr 2023 12:30:28 +0100 Subject: [PATCH] Upgrade to http delegated routing Upgrade from reframe to http delegated rotuing as reframe isn't supported by kubo anymore. Datastore is backwards compatible. --- README.md | 9 +- cmd/provider/daemon.go | 48 +-- cmd/provider/internal/config/config.go | 32 +- cmd/provider/internal/config/init.go | 14 +- cmd/provider/internal/config/reframe.go | 52 +-- {reframe => delegatedrouting}/chunker.go | 2 +- {reframe => delegatedrouting}/cid_queue.go | 2 +- {reframe => delegatedrouting}/ds_wrapper.go | 2 +- {reframe => delegatedrouting}/listener.go | 235 +++++------ .../listener_api_test.go | 28 +- .../listener_concurrency_test.go | 14 +- .../listener_test.go | 371 +++++++++--------- {reframe => delegatedrouting}/options.go | 2 +- {reframe => delegatedrouting}/options_test.go | 6 +- .../stats_reporter.go | 28 +- go.mod | 12 +- go.sum | 14 +- .../server}/options.go | 4 +- .../server}/server.go | 20 +- 19 files changed, 437 insertions(+), 458 deletions(-) rename {reframe => delegatedrouting}/chunker.go (98%) rename {reframe => delegatedrouting}/cid_queue.go (98%) rename {reframe => delegatedrouting}/ds_wrapper.go (99%) rename {reframe => delegatedrouting}/listener.go (64%) rename {reframe => delegatedrouting}/listener_api_test.go (71%) rename {reframe => delegatedrouting}/listener_concurrency_test.go (83%) rename {reframe => delegatedrouting}/listener_test.go (66%) rename {reframe => delegatedrouting}/options.go (97%) rename {reframe => delegatedrouting}/options_test.go (56%) rename {reframe => delegatedrouting}/stats_reporter.go (74%) rename server/{reframe/http => delegatedrouting/server}/options.go (90%) rename server/{reframe/http => delegatedrouting/server}/server.go (61%) diff --git a/README.md b/README.md index 9bb36288..aada2064 100644 --- a/README.md +++ b/README.md @@ -105,16 +105,15 @@ provider import car -l http://localhost:3102 -i Both CARv1 and CARv2 formats are supported. Index is regenerated on the fly if one is not present. -#### Exposing reframe server from provider (experimental) +#### Exposing delegated routing server from provider (experimental) -Provider can export a reframe server. [Reframe](https://github.com/ipfs/specs/blob/main/reframe/REFRAME_PROTOCOL.md) is a protocol -that allows IPFS nodes to advertise their contents to indexers alongside DHT. Reframe server is off by default. -To enable it, add the following configuration block to the provider config file. +Provider can export a Delegated Routing server. Delegated Routing allows IPFS nodes to advertise their contents to indexers alongside DHT. +Delegated Routing server is off by default. To enable it, add the following configuration block to the provider config file. ``` { ... - Reframe { + DelegatedRouting { ListenMultiaddr: "/ip4/0.0.0.0/tcp/50617 (example)" } ... diff --git a/cmd/provider/daemon.go b/cmd/provider/daemon.go index 8755eb56..e8841189 100644 --- a/cmd/provider/daemon.go +++ b/cmd/provider/daemon.go @@ -22,7 +22,7 @@ import ( "github.com/ipni/index-provider/engine" "github.com/ipni/index-provider/engine/policy" adminserver "github.com/ipni/index-provider/server/admin/http" - reframeserver "github.com/ipni/index-provider/server/reframe/http" + droutingserver "github.com/ipni/index-provider/server/delegatedrouting/server" "github.com/ipni/index-provider/supplier" "github.com/libp2p/go-libp2p" "github.com/mitchellh/go-homedir" @@ -187,7 +187,7 @@ func daemonCommand(cctx *cli.Context) error { adminErrChan <- adminSvr.Start() }() - reframeErrChan := make(chan error, 1) + droutingErrChan := make(chan error, 1) // If there are bootstrap peers and bootstrapping is enabled, then try to // connect to the minimum set of peers. if len(cfg.Bootstrap.Peers) != 0 && cfg.Bootstrap.MinimumPeers != 0 { @@ -206,36 +206,36 @@ func daemonCommand(cctx *cli.Context) error { defer bootstrapper.Close() } - // setting up reframe server - var reframeSrv *reframeserver.Server - if len(cfg.Reframe.ListenMultiaddr) != 0 { - reframeAddr, err := cfg.Reframe.ListenNetAddr() + // setting up delegated routing server + var droutingSrv *droutingserver.Server + if len(cfg.DelegatedRouting.ListenMultiaddr) != 0 { + droutingAddr, err := cfg.DelegatedRouting.ListenNetAddr() if err != nil { return err } - reframeSrv, err = reframeserver.New( - time.Duration(cfg.Reframe.CidTtl), - cfg.Reframe.ChunkSize, - cfg.Reframe.SnapshotSize, - cfg.Reframe.DsPageSize, - cfg.Reframe.ProviderID, - cfg.Reframe.Addrs, + droutingSrv, err = droutingserver.New( + time.Duration(cfg.DelegatedRouting.CidTtl), + cfg.DelegatedRouting.ChunkSize, + cfg.DelegatedRouting.SnapshotSize, + cfg.DelegatedRouting.DsPageSize, + cfg.DelegatedRouting.ProviderID, + cfg.DelegatedRouting.Addrs, eng, ds, - reframeserver.WithListenAddr(reframeAddr), - reframeserver.WithReadTimeout(time.Duration(cfg.Reframe.ReadTimeout)), - reframeserver.WithWriteTimeout(time.Duration(cfg.Reframe.WriteTimeout)), + droutingserver.WithListenAddr(droutingAddr), + droutingserver.WithReadTimeout(time.Duration(cfg.DelegatedRouting.ReadTimeout)), + droutingserver.WithWriteTimeout(time.Duration(cfg.DelegatedRouting.WriteTimeout)), ) if err != nil { return err } - log.Infow("reframe server initialized", "address", cfg.Reframe.ListenMultiaddr) + log.Infow("delegated routing server initialized", "address", cfg.DelegatedRouting.ListenMultiaddr) - fmt.Fprintf(cctx.App.ErrWriter, "Starting reframe server on %s ...", cfg.Reframe.ListenMultiaddr) + fmt.Fprintf(cctx.App.ErrWriter, "Starting delegated routing server on %s ...", cfg.DelegatedRouting.ListenMultiaddr) go func() { - reframeErrChan <- reframeSrv.Start() + droutingErrChan <- droutingSrv.Start() }() } @@ -246,8 +246,8 @@ func daemonCommand(cctx *cli.Context) error { case err = <-adminErrChan: log.Errorw("Failed to start admin server", "err", err) finalErr = ErrDaemonStart - case err = <-reframeErrChan: - log.Errorw("Failed to start reframe server", "err", err) + case err = <-droutingErrChan: + log.Errorw("Failed to start delegated routing server", "err", err) finalErr = ErrDaemonStart } @@ -281,9 +281,9 @@ func daemonCommand(cctx *cli.Context) error { log.Errorw("Error shutting down admin server: %s", err) finalErr = ErrDaemonStop } - if reframeSrv != nil { - if err = reframeSrv.Shutdown(shutdownCtx); err != nil { - log.Errorw("Error shutting down reframe server.", "err", err) + if droutingSrv != nil { + if err = droutingSrv.Shutdown(shutdownCtx); err != nil { + log.Errorw("Error shutting down delegated routing server.", "err", err) finalErr = ErrDaemonStop } } diff --git a/cmd/provider/internal/config/config.go b/cmd/provider/internal/config/config.go index 3deffc06..9833fb96 100644 --- a/cmd/provider/internal/config/config.go +++ b/cmd/provider/internal/config/config.go @@ -11,14 +11,14 @@ import ( // Config is used to load config files. type Config struct { - Identity Identity - Datastore Datastore - Ingest Ingest - ProviderServer ProviderServer - AdminServer AdminServer - Bootstrap Bootstrap - DirectAnnounce DirectAnnounce - Reframe Reframe + Identity Identity + Datastore Datastore + Ingest Ingest + ProviderServer ProviderServer + AdminServer AdminServer + Bootstrap Bootstrap + DirectAnnounce DirectAnnounce + DelegatedRouting DelegatedRouting } const ( @@ -96,13 +96,13 @@ func Load(filePath string) (*Config, error) { // Populate with initial values in case they are not present in config. cfg := Config{ - Bootstrap: NewBootstrap(), - Datastore: NewDatastore(), - Ingest: NewIngest(), - AdminServer: NewAdminServer(), - ProviderServer: NewProviderServer(), - DirectAnnounce: NewDirectAnnounce(), - Reframe: NewReframe(), + Bootstrap: NewBootstrap(), + Datastore: NewDatastore(), + Ingest: NewIngest(), + AdminServer: NewAdminServer(), + ProviderServer: NewProviderServer(), + DirectAnnounce: NewDirectAnnounce(), + DelegatedRouting: NewDelegatedRouting(), } if err = json.NewDecoder(f).Decode(&cfg); err != nil { @@ -158,5 +158,5 @@ func (c *Config) PopulateDefaults() { c.Datastore.PopulateDefaults() c.Ingest.PopulateDefaults() c.ProviderServer.PopulateDefaults() - c.Reframe.PopulateDefaults() + c.DelegatedRouting.PopulateDefaults() } diff --git a/cmd/provider/internal/config/init.go b/cmd/provider/internal/config/init.go index d6d7b581..83b880c2 100644 --- a/cmd/provider/internal/config/init.go +++ b/cmd/provider/internal/config/init.go @@ -20,13 +20,13 @@ func Init(out io.Writer) (*Config, error) { func InitWithIdentity(identity Identity) (*Config, error) { return &Config{ - Identity: identity, - Bootstrap: NewBootstrap(), - Datastore: NewDatastore(), - Ingest: NewIngest(), - ProviderServer: NewProviderServer(), - AdminServer: NewAdminServer(), - Reframe: NewReframe(), + Identity: identity, + Bootstrap: NewBootstrap(), + Datastore: NewDatastore(), + Ingest: NewIngest(), + ProviderServer: NewProviderServer(), + AdminServer: NewAdminServer(), + DelegatedRouting: NewDelegatedRouting(), }, nil } diff --git a/cmd/provider/internal/config/reframe.go b/cmd/provider/internal/config/reframe.go index 1a100bed..4830ea6c 100644 --- a/cmd/provider/internal/config/reframe.go +++ b/cmd/provider/internal/config/reframe.go @@ -8,17 +8,17 @@ import ( ) const ( - defaultReframeReadTimeout = Duration(10 * time.Minute) - defaultReframeWriteTimeout = Duration(10 * time.Minute) - defaultReframeCidTtl = Duration(24 * time.Hour) - defaultReframeChunkSize = 1_000 - defaultReframeSnapshotSize = 10_000 - defaultPageSize = 5_000 + defaultDelegatedRoutingReadTimeout = Duration(10 * time.Minute) + defaultDelegatedRoutingWriteTimeout = Duration(10 * time.Minute) + defaultDelegatedRoutingCidTtl = Duration(24 * time.Hour) + defaultDelegatedRoutingChunkSize = 1_000 + defaultDelegatedRoutingSnapshotSize = 10_000 + defaultPageSize = 5_000 ) -// Reframe tracks the configuration of reframe serber. If specified, index provider will expose a reframe server that will +// DelegatedRouting tracks the configuration of delegated routing server. If specified, index provider will expose a delegated routing server that will // allow an IPFS node to advertise their CIDs through the delegated routing protocol. -type Reframe struct { +type DelegatedRouting struct { ListenMultiaddr string ReadTimeout Duration WriteTimeout Duration @@ -30,52 +30,52 @@ type Reframe struct { // SnapshotSize is the maximum number of records in the Provide payload after which it is considered a snapshot. // Snapshots don't have individual timestamps recorded into the datastore. Instead, timestamps are recorded as a binary blob after processing is done. SnapshotSize int - // ProviderID is a Peer ID of the IPFS node that the reframe server is expecting advertisements from + // ProviderID is a Peer ID of the IPFS node that the delegated routing server is expecting advertisements from ProviderID string - // DsPageSize is a size of the database page that is going to be used on reframe server initialisation. + // DsPageSize is a size of the database page that is going to be used on delegated routing server initialisation. DsPageSize int - // Addrs is a list of multiaddresses of the IPFS node that the reframe server is expecting advertisements from + // Addrs is a list of multiaddresses of the IPFS node that the delegated routing server is expecting advertisements from Addrs []string } -// NewReframe instantiates a new Reframe config with default values. -func NewReframe() Reframe { - return Reframe{ +// NewDelegatedRouting instantiates a new delegated routing config with default values. +func NewDelegatedRouting() DelegatedRouting { + return DelegatedRouting{ // we would like this functionality to be off by default ProviderID: "", ListenMultiaddr: "", - ReadTimeout: defaultReframeReadTimeout, - WriteTimeout: defaultReframeWriteTimeout, - CidTtl: defaultReframeCidTtl, - ChunkSize: defaultReframeChunkSize, - SnapshotSize: defaultReframeSnapshotSize, + ReadTimeout: defaultDelegatedRoutingReadTimeout, + WriteTimeout: defaultDelegatedRoutingWriteTimeout, + CidTtl: defaultDelegatedRoutingCidTtl, + ChunkSize: defaultDelegatedRoutingChunkSize, + SnapshotSize: defaultDelegatedRoutingSnapshotSize, DsPageSize: defaultPageSize, } } // PopulateDefaults replaces zero-values in the config with default values. -func (c *Reframe) PopulateDefaults() { +func (c *DelegatedRouting) PopulateDefaults() { if c.ReadTimeout == 0 { - c.ReadTimeout = defaultReframeReadTimeout + c.ReadTimeout = defaultDelegatedRoutingReadTimeout } if c.WriteTimeout == 0 { - c.WriteTimeout = defaultReframeWriteTimeout + c.WriteTimeout = defaultDelegatedRoutingWriteTimeout } if c.CidTtl == 0 { - c.CidTtl = defaultReframeCidTtl + c.CidTtl = defaultDelegatedRoutingCidTtl } if c.ChunkSize == 0 { - c.ChunkSize = defaultReframeChunkSize + c.ChunkSize = defaultDelegatedRoutingChunkSize } if c.SnapshotSize == 0 { - c.SnapshotSize = defaultReframeSnapshotSize + c.SnapshotSize = defaultDelegatedRoutingSnapshotSize } if c.DsPageSize == 0 { c.DsPageSize = defaultPageSize } } -func (as *Reframe) ListenNetAddr() (string, error) { +func (as *DelegatedRouting) ListenNetAddr() (string, error) { maddr, err := multiaddr.NewMultiaddr(as.ListenMultiaddr) if err != nil { return "", err diff --git a/reframe/chunker.go b/delegatedrouting/chunker.go similarity index 98% rename from reframe/chunker.go rename to delegatedrouting/chunker.go index bff5de53..3b213d6d 100644 --- a/reframe/chunker.go +++ b/delegatedrouting/chunker.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting import ( "context" diff --git a/reframe/cid_queue.go b/delegatedrouting/cid_queue.go similarity index 98% rename from reframe/cid_queue.go rename to delegatedrouting/cid_queue.go index 3b727d3c..4a503646 100644 --- a/reframe/cid_queue.go +++ b/delegatedrouting/cid_queue.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting import ( "container/list" diff --git a/reframe/ds_wrapper.go b/delegatedrouting/ds_wrapper.go similarity index 99% rename from reframe/ds_wrapper.go rename to delegatedrouting/ds_wrapper.go index d3ead573..81e17cfb 100644 --- a/reframe/ds_wrapper.go +++ b/delegatedrouting/ds_wrapper.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting import ( "bytes" diff --git a/reframe/listener.go b/delegatedrouting/listener.go similarity index 64% rename from reframe/listener.go rename to delegatedrouting/listener.go index a078f7ff..851428d4 100644 --- a/reframe/listener.go +++ b/delegatedrouting/listener.go @@ -1,8 +1,9 @@ -package reframe +package delegatedrouting import ( "context" "encoding/base64" + "errors" "fmt" "sort" "sync" @@ -11,37 +12,39 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-libipfs/routing/http/server" + "github.com/ipfs/go-libipfs/routing/http/types" "github.com/ipni/go-libipni/metadata" provider "github.com/ipni/index-provider" "github.com/libp2p/go-libp2p/core/peer" - "github.com/ipfs/go-delegated-routing/client" logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) -var log = logging.Logger("reframe/listener") +var log = logging.Logger("delegatedrouting/listener") var bitswapMetadata = metadata.Default.New(metadata.Bitswap{}) const ( - reframeDSName = "reframe" + // keeping this as "reframe" for backwards compatibility + delegatedRoutingDSName = "reframe" statsPrintFrequency = time.Minute retryWithBackoffInterval = 5 * time.Second retryWithBackoffMaxAttempts = 3 ) -type ReframeListener struct { +type Listener struct { dsWrapper *dsWrapper engine provider.Interface cidTtl time.Duration chunkSize int snapshotSize int - // ReframeListener maintains in memory indexes for fast key value lookups + // Listener maintains in memory indexes for fast key value lookups // as well as a rolling double-linked list of CIDs ordered by their timestamp. // Once a CID gets advertised, the respective linked list node gets moved to the - // beginning of the list. To identify CIDs to expire, ReframeListener would walk the list tail to head. + // beginning of the list. To identify CIDs to expire, Listener would walk the list tail to head. // TODO: offload cid chunks to disk to save RAM chunker *chunker cidQueue *cidQueue @@ -51,11 +54,11 @@ type ReframeListener struct { lock sync.Mutex } -type ReframeMultihashLister struct { +type MultihashLister struct { CidFetcher func(contextID []byte) (map[cid.Cid]struct{}, error) } -func (lister *ReframeMultihashLister) MultihashLister(ctx context.Context, p peer.ID, contextID []byte) (provider.MultihashIterator, error) { +func (lister *MultihashLister) MultihashLister(ctx context.Context, p peer.ID, contextID []byte) (provider.MultihashIterator, error) { contextIdStr := contextIDToStr(contextID) cids, err := lister.CidFetcher(contextID) @@ -77,7 +80,7 @@ func (lister *ReframeMultihashLister) MultihashLister(ctx context.Context, p pee return provider.SliceMultihashIterator(mhs), nil } -// NewReframeListenerWithNonceGen creates a reframe listener and initialises its state from the provided datastore. +// New creates a delegated routing listener and initialises its state from the provided datastore. func New(ctx context.Context, engine provider.Interface, cidTtl time.Duration, chunkSize int, @@ -87,16 +90,16 @@ func New(ctx context.Context, engine provider.Interface, ds datastore.Datastore, nonceGen func() []byte, opts ...Option, -) (*ReframeListener, error) { +) (*Listener, error) { options := ApplyOptions(opts...) - listener := &ReframeListener{ + listener := &Listener{ engine: engine, cidTtl: cidTtl, chunkSize: chunkSize, snapshotSize: snapshotSize, - dsWrapper: newDSWrapper(namespace.Wrap(ds, datastore.NewKey(reframeDSName)), options.SnapshotMaxChunkSize, options.PageSize), + dsWrapper: newDSWrapper(namespace.Wrap(ds, datastore.NewKey(delegatedRoutingDSName)), options.SnapshotMaxChunkSize, options.PageSize), lastSeenProviderInfo: &peer.AddrInfo{}, configuredProviderInfo: nil, chunker: newChunker(func() int { return chunkSize }, nonceGen), @@ -109,7 +112,7 @@ func New(ctx context.Context, engine provider.Interface, func() int { return len(listener.chunker.currentChunk.Cids) }, ) - lister := &ReframeMultihashLister{ + lister := &MultihashLister{ CidFetcher: func(contextID []byte) (map[cid.Cid]struct{}, error) { ctxIdStr := contextIDToStr(contextID) chunk := listener.chunker.getChunkByContextID(ctxIdStr) @@ -185,148 +188,120 @@ func New(ctx context.Context, engine provider.Interface, return listener, nil } -func (listener *ReframeListener) Shutdown() { +func (listener *Listener) Shutdown() { listener.stats.shutdown() } -func (listener *ReframeListener) GetIPNS(ctx context.Context, id []byte) (<-chan client.GetIPNSAsyncResult, error) { - log.Warn("Received unsupported getIPNS request") - ch := make(chan client.GetIPNSAsyncResult, 1) - go func() { - // Not implemented - ch <- client.GetIPNSAsyncResult{Record: nil} - close(ch) - }() - return ch, nil +func (listener *Listener) FindProviders(ctx context.Context, key cid.Cid) ([]types.ProviderResponse, error) { + log.Warn("Received unsupported FindProviders request") + return nil, errors.New("unsupported find providers request") } -func (listener *ReframeListener) PutIPNS(ctx context.Context, id []byte, record []byte) (<-chan client.PutIPNSAsyncResult, error) { - log.Warn("Received unsupported putIPNS request") - ch := make(chan client.PutIPNSAsyncResult, 1) - go func() { - // Not implemented - ch <- client.PutIPNSAsyncResult{} - close(ch) - }() - return ch, nil +func (listener *Listener) Provide(ctx context.Context, req *server.WriteProvideRequest) (types.ProviderResponse, error) { + log.Warn("Received unsupported Provide request") + return nil, errors.New("unsupported provide request") } -func (listener *ReframeListener) FindProviders(ctx context.Context, key cid.Cid) (<-chan client.FindProvidersAsyncResult, error) { - log.Warn("Received unsupported findProviders request") - ch := make(chan client.FindProvidersAsyncResult, 1) - go func() { - // Not implemented - ch <- client.FindProvidersAsyncResult{AddrInfo: nil} - close(ch) +func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + cids := req.Keys + pid := req.ID + paddrs := req.Addrs + startTime := time.Now() + printFrequency := 10_000 + listener.lock.Lock() + defer func() { + listener.stats.incDelegatedRoutingCallsProcessed() + log.Infow("Finished processing Provide request.", "time", time.Since(startTime), "len", len(cids)) + listener.lock.Unlock() }() - return ch, nil -} -func (listener *ReframeListener) Provide(ctx context.Context, pr *client.ProvideRequest) (<-chan client.ProvideAsyncResult, error) { - ch := make(chan client.ProvideAsyncResult, 1) - log.Infof("Received Provide request with %d cids.", len(pr.Key)) - listener.stats.incReframeCallsReceived() - - go func() { - startTime := time.Now() - printFrequency := 10_000 - listener.lock.Lock() - defer func() { - listener.stats.incReframeCallsProcessed() - log.Infow("Finished processing Provide request.", "time", time.Since(startTime), "len", len(pr.Key)) - listener.lock.Unlock() - close(ch) - }() - // shadowing the calling function's context so that cancellation of it doesn't affect processing - ctx := context.Background() - // Using mutex to prevent concurrent Provide requests - - if listener.configuredProviderInfo != nil && listener.configuredProviderInfo.ID != pr.Provider.Peer.ID { - log.Warnw("Skipping Provide request as its provider is different from the configured one.", "configured", listener.configuredProviderInfo.ID, "received", pr.Provider.Peer.ID) - ch <- client.ProvideAsyncResult{Err: fmt.Errorf("provider %s isn't allowed", pr.Provider.Peer.ID)} - return - } + log.Infof("Received Provide request with %d cids.", len(cids)) + listener.stats.incDelegatedRoutingCallsReceived() - if len(listener.lastSeenProviderInfo.ID) > 0 && listener.lastSeenProviderInfo.ID != pr.Provider.Peer.ID { - log.Warnw("Skipping Provide request as its provider is different from the last seen one.", "lastSeen", listener.lastSeenProviderInfo.ID, "received", pr.Provider.Peer.ID) - ch <- client.ProvideAsyncResult{Err: fmt.Errorf("provider %s isn't allowed", pr.Provider.Peer.ID)} - return - } + // shadowing the calling function's context so that cancellation of it doesn't affect processing + ctx = context.Background() + // Using mutex to prevent concurrent Provide requests - listener.lastSeenProviderInfo.ID = pr.Provider.Peer.ID - listener.lastSeenProviderInfo.Addrs = pr.Provider.Peer.Addrs + if listener.configuredProviderInfo != nil && listener.configuredProviderInfo.ID != pid { + log.Warnw("Skipping Provide request as its provider is different from the configured one.", "configured", listener.configuredProviderInfo.ID, "received", pid) + return 0, fmt.Errorf("provider %s isn't allowed", pid) + } - timestamp := time.Now() - for i, c := range pr.Key { + if len(listener.lastSeenProviderInfo.ID) > 0 && listener.lastSeenProviderInfo.ID != pid { + log.Warnw("Skipping Provide request as its provider is different from the last seen one.", "lastSeen", listener.lastSeenProviderInfo.ID, "received", pid) + return 0, fmt.Errorf("provider %s isn't allowed", pid) + } - // persisting timestamp only if this is not a snapshot - if len(pr.Key) < listener.snapshotSize { - err := listener.dsWrapper.recordCidTimestamp(ctx, c, timestamp) - if err != nil { - log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err) - continue - } + listener.lastSeenProviderInfo.ID = pid + listener.lastSeenProviderInfo.Addrs = paddrs + + timestamp := time.Now() + for i, c := range cids { + + // persisting timestamp only if this is not a snapshot + if len(cids) < listener.snapshotSize { + err := listener.dsWrapper.recordCidTimestamp(ctx, c, timestamp) + if err != nil { + log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err) + continue } + } - listElem := listener.cidQueue.getNodeByCid(c) - if listElem == nil { - listener.cidQueue.recordCidNode(&cidNode{ - C: c, - Timestamp: timestamp, - }) + listElem := listener.cidQueue.getNodeByCid(c) + if listElem == nil { + listener.cidQueue.recordCidNode(&cidNode{ + C: c, + Timestamp: timestamp, + }) + err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error { + return listener.notifyPutAndPersist(ctx, cc) + }) + if err != nil { + log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", c, "err", err) + listener.cidQueue.removeCidNode(c) + continue + } + } else { + node := listElem.Value.(*cidNode) + node.Timestamp = timestamp + listener.cidQueue.recordCidNode(node) + // if no existing chunk has been found for the cid - adding it to the current one + // This can happen in the following cases: + // * when currentChunk disappears between restarts as it doesn't get persisted until it's advertised + // * when the same cid comes multiple times within the lifespan of the same chunk + // * after a error to generate a replacement chunk + if node.chunk == nil { err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error { return listener.notifyPutAndPersist(ctx, cc) }) if err != nil { log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", c, "err", err) - listener.cidQueue.removeCidNode(c) continue } - } else { - node := listElem.Value.(*cidNode) - node.Timestamp = timestamp - listener.cidQueue.recordCidNode(node) - // if no existing chunk has been found for the cid - adding it to the current one - // This can happen in the following cases: - // * when currentChunk disappears between restarts as it doesn't get persisted until it's advertised - // * when the same cid comes multiple times within the lifespan of the same chunk - // * after a error to generate a replacement chunk - if node.chunk == nil { - err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error { - return listener.notifyPutAndPersist(ctx, cc) - }) - if err != nil { - log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", c, "err", err) - continue - } - } - listener.stats.incExistingCidsProcessed() - } - - listener.stats.incCidsProcessed() - // Doing some logging for larger requests - if i != 0 && i%printFrequency == 0 { - log.Infof("Processed %d out of %d CIDs. startTime=%v", i, len(pr.Key), startTime) } - } - removedSomething, err := listener.removeExpiredCids(ctx) - if err != nil { - log.Warnw("Error removing expired cids.", "err", err) + listener.stats.incExistingCidsProcessed() } - // if that was a snapshot or some cids have expired - persisting timestamps as binary blob - if removedSomething || len(pr.Key) >= listener.snapshotSize { - listener.dsWrapper.recordTimestampsSnapshot(ctx, listener.cidQueue.getTimestampsSnapshot()) + listener.stats.incCidsProcessed() + // Doing some logging for larger requests + if i != 0 && i%printFrequency == 0 { + log.Infof("Processed %d out of %d CIDs. startTime=%v", i, len(cids), startTime) } + } + removedSomething, err := listener.removeExpiredCids(ctx) + if err != nil { + log.Warnw("Error removing expired cids.", "err", err) + } - response := client.ProvideAsyncResult{AdvisoryTTL: time.Duration(listener.cidTtl), Err: nil} - ch <- response - }() - return ch, nil + // if that was a snapshot or some cids have expired - persisting timestamps as binary blob + if removedSomething || len(cids) >= listener.snapshotSize { + listener.dsWrapper.recordTimestampsSnapshot(ctx, listener.cidQueue.getTimestampsSnapshot()) + } + return time.Duration(listener.cidTtl), nil } // Revise logic here -func (listener *ReframeListener) removeExpiredCids(ctx context.Context) (bool, error) { +func (listener *Listener) removeExpiredCids(ctx context.Context) (bool, error) { lastElem := listener.cidQueue.nodesLl.Back() currentTime := time.Now() chunksToRemove := make(map[string]*cidsChunk) @@ -423,7 +398,7 @@ func (listener *ReframeListener) removeExpiredCids(ctx context.Context) (bool, e return removedSomeCids, nil } -func (listener *ReframeListener) notifyRemoveAndPersist(ctx context.Context, chunk *cidsChunk) error { +func (listener *Listener) notifyRemoveAndPersist(ctx context.Context, chunk *cidsChunk) error { ctxIdStr := contextIDToStr(chunk.ContextID) log.Infof("Notifying Remove for chunk=%s", ctxIdStr) @@ -455,7 +430,7 @@ func (listener *ReframeListener) notifyRemoveAndPersist(ctx context.Context, chu return nil } -func (listener *ReframeListener) notifyPutAndPersist(ctx context.Context, chunk *cidsChunk) error { +func (listener *Listener) notifyPutAndPersist(ctx context.Context, chunk *cidsChunk) error { ctxIdStr := contextIDToStr(chunk.ContextID) log.Infof("Notifying Put for chunk=%s, provider=%s, addrs=%q, cidsTotal=%d", ctxIdStr, listener.provider(), listener.addrs(), len(chunk.Cids)) @@ -493,14 +468,14 @@ func (listener *ReframeListener) notifyPutAndPersist(ctx context.Context, chunk return nil } -func (listener *ReframeListener) provider() peer.ID { +func (listener *Listener) provider() peer.ID { if listener.configuredProviderInfo == nil { return listener.lastSeenProviderInfo.ID } return listener.configuredProviderInfo.ID } -func (listener *ReframeListener) addrs() []multiaddr.Multiaddr { +func (listener *Listener) addrs() []multiaddr.Multiaddr { if listener.configuredProviderInfo == nil { return listener.lastSeenProviderInfo.Addrs } diff --git a/reframe/listener_api_test.go b/delegatedrouting/listener_api_test.go similarity index 71% rename from reframe/listener_api_test.go rename to delegatedrouting/listener_api_test.go index 51243cd2..8213790b 100644 --- a/reframe/listener_api_test.go +++ b/delegatedrouting/listener_api_test.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting import ( "bytes" @@ -10,7 +10,7 @@ import ( "github.com/ipfs/go-datastore" ) -func ChunkExists(ctx context.Context, listener *ReframeListener, cids []cid.Cid, nonceGen func() []byte) bool { +func ChunkExists(ctx context.Context, listener *Listener, cids []cid.Cid, nonceGen func() []byte) bool { cidsMap := cidsListToMap(cids) ctxID := listener.chunker.generateContextID(cidsMap) ctxIDStr := contextIDToStr(ctxID) @@ -45,25 +45,25 @@ func ChunkExists(ctx context.Context, listener *ReframeListener, cids []cid.Cid, return !chunkFromDatastore.Removed } -func HasSnapshot(ctx context.Context, listener *ReframeListener) bool { +func HasSnapshot(ctx context.Context, listener *Listener) bool { return SnapshotsQty(ctx, listener) > 0 } -func SnapshotsQty(ctx context.Context, listener *ReframeListener) int { +func SnapshotsQty(ctx context.Context, listener *Listener) int { keys, _ := listener.dsWrapper.getSnapshotChunkKeys(ctx) return len(keys) } -func HasCidTimestamp(ctx context.Context, listener *ReframeListener, c cid.Cid) bool { +func HasCidTimestamp(ctx context.Context, listener *Listener, c cid.Cid) bool { has, err := listener.dsWrapper.ds.Has(ctx, timestampByCidKey(c)) return has && err == nil } -func WrappedDatastore(listener *ReframeListener) datastore.Datastore { +func WrappedDatastore(listener *Listener) datastore.Datastore { return listener.dsWrapper.ds } -func ChunkNotExist(ctx context.Context, listener *ReframeListener, cids []cid.Cid, nonceGen func() []byte) bool { +func ChunkNotExist(ctx context.Context, listener *Listener, cids []cid.Cid, nonceGen func() []byte) bool { ctxID := listener.chunker.generateContextID(cidsListToMap(cids)) ctxIDStr := contextIDToStr(ctxID) cidsRegistered := false @@ -87,20 +87,20 @@ func ChunkNotExist(ctx context.Context, listener *ReframeListener, cids []cid.Ci } -func CidExist(ctx context.Context, listener *ReframeListener, c cid.Cid, requireChunk bool) bool { +func CidExist(ctx context.Context, listener *Listener, c cid.Cid, requireChunk bool) bool { elem := listener.cidQueue.getNodeByCid(c) return elem != nil && (!requireChunk || elem.Value.(*cidNode).chunk != nil) } -func CidNotExist(ctx context.Context, listener *ReframeListener, c cid.Cid) bool { +func CidNotExist(ctx context.Context, listener *Listener, c cid.Cid) bool { return listener.cidQueue.getNodeByCid(c) == nil } -func GetCidTimestampFromDatastore(ctx context.Context, listener *ReframeListener, c cid.Cid) (time.Time, error) { +func GetCidTimestampFromDatastore(ctx context.Context, listener *Listener, c cid.Cid) (time.Time, error) { return listener.dsWrapper.getCidTimestamp(ctx, c) } -func GetCidTimestampFromCache(ctx context.Context, listener *ReframeListener, c cid.Cid) (time.Time, error) { +func GetCidTimestampFromCache(ctx context.Context, listener *Listener, c cid.Cid) (time.Time, error) { node := listener.cidQueue.getNodeByCid(c) if node == nil { return time.Unix(0, 0), fmt.Errorf("Timestamp not found") @@ -108,15 +108,15 @@ func GetCidTimestampFromCache(ctx context.Context, listener *ReframeListener, c return node.Value.(*cidNode).Timestamp, nil } -func GetChunk(ctx context.Context, listener *ReframeListener, contextID string) *cidsChunk { +func GetChunk(ctx context.Context, listener *Listener, contextID string) *cidsChunk { return listener.chunker.getChunkByContextID(contextID) } -func GetCurrentChunk(ctx context.Context, listener *ReframeListener) *cidsChunk { +func GetCurrentChunk(ctx context.Context, listener *Listener) *cidsChunk { return listener.chunker.currentChunk } -func GetExpiryQueue(ctx context.Context, listener *ReframeListener) []cid.Cid { +func GetExpiryQueue(ctx context.Context, listener *Listener) []cid.Cid { cids := make([]cid.Cid, listener.cidQueue.nodesLl.Len()) node := listener.cidQueue.nodesLl.Front() cnt := 0 diff --git a/reframe/listener_concurrency_test.go b/delegatedrouting/listener_concurrency_test.go similarity index 83% rename from reframe/listener_concurrency_test.go rename to delegatedrouting/listener_concurrency_test.go index 60ef607e..62114107 100644 --- a/reframe/listener_concurrency_test.go +++ b/delegatedrouting/listener_concurrency_test.go @@ -1,6 +1,6 @@ //go:build !race -package reframe_test +package delegatedrouting_test import ( "context" @@ -12,9 +12,9 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" leveldb "github.com/ipfs/go-ds-leveldb" + drouting "github.com/ipni/index-provider/delegatedrouting" "github.com/ipni/index-provider/engine" mock_provider "github.com/ipni/index-provider/mock" - reframelistener "github.com/ipni/index-provider/reframe" "github.com/ipni/index-provider/testutil" "github.com/libp2p/go-libp2p" "github.com/stretchr/testify/require" @@ -36,14 +36,14 @@ func TestHandleConcurrentRequests(t *testing.T) { cids[i] = newCid(fmt.Sprintf("test%d", i)) } - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) client, server := createClientAndServer(t, listener, prov, priv) @@ -65,7 +65,7 @@ func TestHandleConcurrentRequests(t *testing.T) { } for _, c := range cids { - require.True(t, reframelistener.CidExist(ctx, listener, c, false)) + require.True(t, drouting.CidExist(ctx, listener, c, false)) } } @@ -96,7 +96,7 @@ func TestShouldProcessMillionCIDsInThirtySeconds(t *testing.T) { }() require.NoError(t, err) - ip, err := reframelistener.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + ip, err := drouting.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) cids := make([]cid.Cid, cidsNumber) @@ -104,7 +104,7 @@ func TestShouldProcessMillionCIDsInThirtySeconds(t *testing.T) { cids[i] = newCid(fmt.Sprintf("test%d", i)) } - client, server := createClientAndServer(t, ip, newProvider(t, pID), priv) + client, server := createClientAndServer(t, ip, newAddrInfo(t, pID), priv) defer server.Close() start := time.Now() diff --git a/reframe/listener_test.go b/delegatedrouting/listener_test.go similarity index 66% rename from reframe/listener_test.go rename to delegatedrouting/listener_test.go index 8e25b7ff..0e06d38b 100644 --- a/reframe/listener_test.go +++ b/delegatedrouting/listener_test.go @@ -1,4 +1,4 @@ -package reframe_test +package delegatedrouting_test import ( "context" @@ -12,45 +12,43 @@ import ( "github.com/golang/mock/gomock" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-delegated-routing/client" - "github.com/ipfs/go-delegated-routing/gen/proto" - "github.com/ipfs/go-delegated-routing/server" + "github.com/ipfs/go-libipfs/routing/http/client" + "github.com/ipfs/go-libipfs/routing/http/contentrouter" + "github.com/ipfs/go-libipfs/routing/http/server" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipni/go-libipni/metadata" + drouting "github.com/ipni/index-provider/delegatedrouting" "github.com/ipni/index-provider/engine" mock_provider "github.com/ipni/index-provider/mock" - reframelistener "github.com/ipni/index-provider/reframe" "github.com/ipni/index-provider/testutil" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" - "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) -var defaultMetadata metadata.Metadata = metadata.Default.New(metadata.Bitswap{}) +var ( + defaultMetadata metadata.Metadata = metadata.Default.New(metadata.Bitswap{}) +) func testNonceGen() []byte { return []byte{1, 2, 3, 4, 5} } -func newProvider(t *testing.T, pID peer.ID) *client.Provider { +func newAddrInfo(t *testing.T, pID peer.ID) *peer.AddrInfo { ma, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/5001") require.NoError(t, err) - return &client.Provider{ - Peer: peer.AddrInfo{ - ID: pID, - Addrs: []multiaddr.Multiaddr{ma}, - }, - ProviderProto: []client.TransferProtocol{{Codec: multicodec.TransportBitswap}}, + return &peer.AddrInfo{ + ID: pID, + Addrs: []multiaddr.Multiaddr{ma}, } } -// TestReframeMultihashLister verifies that multihash lister returns correct number of multihashes in deterministic order -func TestReframeMultihashLister(t *testing.T) { +// TestDelegatedRoutingMultihashLister verifies that multihash lister returns correct number of multihashes in deterministic order +func TestDelegatedRoutingMultihashLister(t *testing.T) { cids := make(map[cid.Cid]struct{}) cids[newCid("test1")] = struct{}{} cids[newCid("test2")] = struct{}{} @@ -58,7 +56,7 @@ func TestReframeMultihashLister(t *testing.T) { _, _, pID := testutil.GenerateKeysAndIdentity(t) - lister := &reframelistener.ReframeMultihashLister{ + lister := &drouting.MultihashLister{ CidFetcher: func(contextID []byte) (map[cid.Cid]struct{}, error) { if string(contextID) == "test" { return cids, nil @@ -87,7 +85,7 @@ func TestRetryWithBackOffKeepsRetryingOnError(t *testing.T) { // this test verifies that RetryWithBackOff keeps retrying as long as an error is returned or until the max number of attenpts is reached start := time.Now() attempts := 0 - err := reframelistener.RetryWithBackoff(func() error { + err := drouting.RetryWithBackoff(func() error { attempts++ return fmt.Errorf("test") }, time.Second, 3) @@ -102,7 +100,7 @@ func TestRetryWithBackOffKeepsRetryingOnError(t *testing.T) { func TestRetryWithBackOffStopsRetryingOnSuccess(t *testing.T) { start := time.Now() attempts := 0 - err := reframelistener.RetryWithBackoff(func() error { + err := drouting.RetryWithBackoff(func() error { attempts++ return nil }, time.Second, 3) @@ -129,7 +127,7 @@ func TestProvideRoundtrip(t *testing.T) { defer engine.Shutdown() require.NoError(t, err) - ip, err := reframelistener.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + ip, err := drouting.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) errorClient, errorServer := createClientAndServer(t, ip, nil, nil) @@ -141,11 +139,11 @@ func TestProvideRoundtrip(t *testing.T) { testCid4 := newCid("test4") testCid5 := newCid("test5") - _, err = errorClient.Provide(ctx, []cid.Cid{testCid1}, time.Hour) + _, err = errorClient.ProvideBitswap(ctx, []cid.Cid{testCid1}, time.Hour) require.Error(t, err, "should get sync error on unsigned provide request.") errorServer.Close() - client, server := createClientAndServer(t, ip, newProvider(t, pID), priv) + client, server := createClientAndServer(t, ip, newAddrInfo(t, pID), priv) defer server.Close() provideMany(t, client, ctx, []cid.Cid{testCid1, testCid2}) @@ -188,7 +186,7 @@ func TestProvideRoundtripWithRemove(t *testing.T) { defer engine.Shutdown() require.NoError(t, err) - ip, err := reframelistener.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + ip, err := drouting.New(ctx, engine, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) errorClient, errorServer := createClientAndServer(t, ip, nil, nil) @@ -198,11 +196,11 @@ func TestProvideRoundtripWithRemove(t *testing.T) { testCid2 := newCid("test2") testCid3 := newCid("test3") - _, err = errorClient.Provide(ctx, []cid.Cid{testCid1}, time.Hour) + _, err = errorClient.ProvideBitswap(ctx, []cid.Cid{testCid1}, time.Hour) require.Error(t, err, "should get sync error on unsigned provide request.") errorServer.Close() - client, server := createClientAndServer(t, ip, newProvider(t, pID), priv) + client, server := createClientAndServer(t, ip, newAddrInfo(t, pID), priv) defer server.Close() provideMany(t, client, ctx, []cid.Cid{testCid1, testCid2, testCid3}) @@ -242,17 +240,17 @@ func TestAdvertiseTwoChunksWithOneCidInEach(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - ip, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + ip, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, ip, prov, priv) @@ -274,7 +272,7 @@ func TestAdvertiseUsingAddrsFromParameters(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -287,7 +285,7 @@ func TestAdvertiseUsingAddrsFromParameters(t *testing.T) { mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&peer.AddrInfo{ID: pID, Addrs: []multiaddr.Multiaddr{randomMultiaddr}}), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&peer.AddrInfo{ID: pID, Addrs: []multiaddr.Multiaddr{randomMultiaddr}}), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - ip, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, pID.String(), []string{"/ip4/0.0.0.0/tcp/1001"}, datastore.NewMapDatastore(), testNonceGen) + ip, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, pID.String(), []string{"/ip4/0.0.0.0/tcp/1001"}, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, ip, prov, priv) @@ -306,7 +304,7 @@ func TestProvideRegistersCidInDatastore(t *testing.T) { ctx := context.Background() defer ctx.Done() testCid1 := newCid("test1") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -314,7 +312,7 @@ func TestProvideRegistersCidInDatastore(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -322,13 +320,13 @@ func TestProvideRegistersCidInDatastore(t *testing.T) { provide(t, c, ctx, testCid1) - require.True(t, reframelistener.CidExist(ctx, listener, testCid1, false)) + require.True(t, drouting.CidExist(ctx, listener, testCid1, false)) // verifying that the CID has a current timestamp - tt, err := reframelistener.GetCidTimestampFromDatastore(ctx, listener, testCid1) + tt, err := drouting.GetCidTimestampFromDatastore(ctx, listener, testCid1) require.NoError(t, err) require.True(t, time.Since(tt) < time.Second) - require.Equal(t, []cid.Cid{testCid1}, reframelistener.GetExpiryQueue(ctx, listener)) + require.Equal(t, []cid.Cid{testCid1}, drouting.GetExpiryQueue(ctx, listener)) } func TestCidsAreOrderedByArrivalInExpiryQueue(t *testing.T) { @@ -343,7 +341,7 @@ func TestCidsAreOrderedByArrivalInExpiryQueue(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -351,7 +349,7 @@ func TestCidsAreOrderedByArrivalInExpiryQueue(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -360,10 +358,10 @@ func TestCidsAreOrderedByArrivalInExpiryQueue(t *testing.T) { provide(t, c, ctx, testCid1) provide(t, c, ctx, testCid2) provide(t, c, ctx, testCid3) - require.Equal(t, []cid.Cid{testCid3, testCid2, testCid1}, reframelistener.GetExpiryQueue(ctx, listener)) + require.Equal(t, []cid.Cid{testCid3, testCid2, testCid1}, drouting.GetExpiryQueue(ctx, listener)) provide(t, c, ctx, testCid2) - require.Equal(t, []cid.Cid{testCid2, testCid3, testCid1}, reframelistener.GetExpiryQueue(ctx, listener)) + require.Equal(t, []cid.Cid{testCid2, testCid3, testCid1}, drouting.GetExpiryQueue(ctx, listener)) } func TestFullChunkAdvertisedAndRegisteredInDatastore(t *testing.T) { @@ -379,16 +377,16 @@ func TestFullChunkAdvertisedAndRegisteredInDatastore(t *testing.T) { testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -396,11 +394,11 @@ func TestFullChunkAdvertisedAndRegisteredInDatastore(t *testing.T) { provideMany(t, c, ctx, []cid.Cid{testCid1, testCid2, testCid3}) - require.True(t, reframelistener.CidExist(ctx, listener, testCid1, true)) - require.True(t, reframelistener.CidExist(ctx, listener, testCid2, true)) - require.True(t, reframelistener.CidExist(ctx, listener, testCid3, false)) - require.True(t, reframelistener.ChunkExists(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) - require.Equal(t, []cid.Cid{testCid3, testCid2, testCid1}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.CidExist(ctx, listener, testCid1, true)) + require.True(t, drouting.CidExist(ctx, listener, testCid2, true)) + require.True(t, drouting.CidExist(ctx, listener, testCid3, false)) + require.True(t, drouting.ChunkExists(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) + require.Equal(t, []cid.Cid{testCid3, testCid2, testCid1}, drouting.GetExpiryQueue(ctx, listener)) } func TestRemovedChunkIsRemovedFromIndexes(t *testing.T) { @@ -415,17 +413,17 @@ func TestRemovedChunkIsRemovedFromIndexes(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen()))) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -435,11 +433,11 @@ func TestRemovedChunkIsRemovedFromIndexes(t *testing.T) { time.Sleep(ttl) provide(t, c, ctx, testCid3) - require.True(t, reframelistener.ChunkNotExist(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) - require.True(t, reframelistener.CidExist(ctx, listener, testCid3, false)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid1)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid2)) - require.Equal(t, []cid.Cid{testCid3}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.ChunkNotExist(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) + require.True(t, drouting.CidExist(ctx, listener, testCid3, false)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid1)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid2)) + require.Equal(t, []cid.Cid{testCid3}, drouting.GetExpiryQueue(ctx, listener)) } func TestAdvertiseOneChunkWithTwoCidsInIt(t *testing.T) { @@ -454,16 +452,16 @@ func TestAdvertiseOneChunkWithTwoCidsInIt(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -483,16 +481,16 @@ func TestDoNotReAdvertiseRepeatedCids(t *testing.T) { defer ctx.Done() testCid1 := newCid("test1") testCid2 := newCid("test2") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -517,19 +515,19 @@ func TestAdvertiseExpiredCidsIfProvidedAgain(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen()))) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -555,18 +553,18 @@ func TestRemoveExpiredCidAndReadvertiseChunk(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen()))) mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Any(), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -579,12 +577,12 @@ func TestRemoveExpiredCidAndReadvertiseChunk(t *testing.T) { provide(t, c, ctx, testCid3) // verifying ds and indexes - require.True(t, reframelistener.CidExist(ctx, listener, testCid2, true)) - require.True(t, reframelistener.CidExist(ctx, listener, testCid3, false)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid1)) - require.True(t, reframelistener.ChunkExists(ctx, listener, []cid.Cid{testCid2}, testNonceGen)) - require.True(t, reframelistener.ChunkNotExist(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) - require.Equal(t, []cid.Cid{testCid3, testCid2}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.CidExist(ctx, listener, testCid2, true)) + require.True(t, drouting.CidExist(ctx, listener, testCid3, false)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid1)) + require.True(t, drouting.ChunkExists(ctx, listener, []cid.Cid{testCid2}, testNonceGen)) + require.True(t, drouting.ChunkNotExist(ctx, listener, []cid.Cid{testCid1, testCid2}, testNonceGen)) + require.Equal(t, []cid.Cid{testCid3, testCid2}, drouting.GetExpiryQueue(ctx, listener)) } func TestExpireMultipleChunks(t *testing.T) { @@ -600,21 +598,21 @@ func TestExpireMultipleChunks(t *testing.T) { testCid2 := newCid("test2") testCid3 := newCid("test3") testCid4 := newCid("test4") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid3.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid3.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen()))) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen()))) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid3.String()}, testNonceGen()))) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -636,17 +634,17 @@ func TestDoNotReadvertiseChunkIfAllCidsExpired(t *testing.T) { defer ctx.Done() testCid1 := newCid("test1") testCid2 := newCid("test2") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen()))) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -657,10 +655,10 @@ func TestDoNotReadvertiseChunkIfAllCidsExpired(t *testing.T) { provide(t, c, ctx, testCid2) // verifying ds and indexes - require.True(t, reframelistener.CidExist(ctx, listener, testCid2, false)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid1)) - require.True(t, reframelistener.ChunkNotExist(ctx, listener, []cid.Cid{testCid1}, testNonceGen)) - require.Equal(t, []cid.Cid{testCid2}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.CidExist(ctx, listener, testCid2, false)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid1)) + require.True(t, drouting.ChunkNotExist(ctx, listener, []cid.Cid{testCid1}, testNonceGen)) + require.Equal(t, []cid.Cid{testCid2}, drouting.GetExpiryQueue(ctx, listener)) } func TestDoNotReadvertiseTheSameCids(t *testing.T) { @@ -675,16 +673,16 @@ func TestDoNotReadvertiseTheSameCids(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - ip, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) + ip, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, ip, prov, priv) @@ -705,19 +703,19 @@ func TestDoNotLoadRemovedChunksOnInitialisation(t *testing.T) { defer ctx.Done() testCid1 := newCid("test1") testCid2 := newCid("test2") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen()))) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener1, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener1, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener1, prov, priv) @@ -728,10 +726,10 @@ func TestDoNotLoadRemovedChunksOnInitialisation(t *testing.T) { s.Close() - listener2, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener2, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) - require.True(t, reframelistener.ChunkNotExist(ctx, listener2, []cid.Cid{testCid1}, testNonceGen)) + require.True(t, drouting.ChunkNotExist(ctx, listener2, []cid.Cid{testCid1}, testNonceGen)) } func TestMissingCidTimestampsBackfilledOnIntialisation(t *testing.T) { @@ -746,19 +744,19 @@ func TestMissingCidTimestampsBackfilledOnIntialisation(t *testing.T) { testCid1 := newCid("test1") testCid2 := newCid("test2") testCid3 := newCid("test3") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener1, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener1, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener1, prov, priv) @@ -769,10 +767,10 @@ func TestMissingCidTimestampsBackfilledOnIntialisation(t *testing.T) { time.Sleep(100 * time.Millisecond) provide(t, c, ctx, testCid3) - t1Before, err := reframelistener.GetCidTimestampFromDatastore(ctx, listener1, testCid1) + t1Before, err := drouting.GetCidTimestampFromDatastore(ctx, listener1, testCid1) require.NoError(t, err) - t2Before, err := reframelistener.GetCidTimestampFromDatastore(ctx, listener1, testCid2) + t2Before, err := drouting.GetCidTimestampFromDatastore(ctx, listener1, testCid2) require.NoError(t, err) // cid2 timestamp should be after cid1 timestamp as it has been provided later @@ -780,15 +778,15 @@ func TestMissingCidTimestampsBackfilledOnIntialisation(t *testing.T) { s.Close() - reframelistener.WrappedDatastore(listener1).Delete(ctx, datastore.NewKey("tc/"+testCid1.String())) + drouting.WrappedDatastore(listener1).Delete(ctx, datastore.NewKey("tc/"+testCid1.String())) - listener2, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener2, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) - t1After, err := reframelistener.GetCidTimestampFromCache(ctx, listener2, testCid1) + t1After, err := drouting.GetCidTimestampFromCache(ctx, listener2, testCid1) require.NoError(t, err) - t2After, err := reframelistener.GetCidTimestampFromCache(ctx, listener2, testCid2) + t2After, err := drouting.GetCidTimestampFromCache(ctx, listener2, testCid2) require.NoError(t, err) require.NotEqual(t, t1After, t2After) @@ -816,10 +814,10 @@ func TestSameCidNotDuplicatedInTheCurrentChunkIfProvidedTwice(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), nil) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), nil) require.NoError(t, err) - c, s := createClientAndServer(t, listener, newProvider(t, pID), priv) + c, s := createClientAndServer(t, listener, newAddrInfo(t, pID), priv) defer s.Close() provide(t, c, ctx, testCid1) @@ -842,7 +840,7 @@ func TestShouldStoreSnapshotInDatastore(t *testing.T) { testCid3 := newCid("test3") testCid4 := newCid("test4") testCid5 := newCid("test5") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -851,7 +849,7 @@ func TestShouldStoreSnapshotInDatastore(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) client, server := createClientAndServer(t, listener, prov, priv) @@ -861,12 +859,12 @@ func TestShouldStoreSnapshotInDatastore(t *testing.T) { provide(t, client, ctx, testCid1) provide(t, client, ctx, testCid2) - require.True(t, reframelistener.HasSnapshot(ctx, listener)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid1)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid2)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener, testCid3)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener, testCid4)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener, testCid5)) + require.True(t, drouting.HasSnapshot(ctx, listener)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid1)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid2)) + require.False(t, drouting.HasCidTimestamp(ctx, listener, testCid3)) + require.False(t, drouting.HasCidTimestamp(ctx, listener, testCid4)) + require.False(t, drouting.HasCidTimestamp(ctx, listener, testCid5)) } func TestShouldNotStoreSnapshotInDatastore(t *testing.T) { @@ -883,7 +881,7 @@ func TestShouldNotStoreSnapshotInDatastore(t *testing.T) { testCid3 := newCid("test3") testCid4 := newCid("test4") testCid5 := newCid("test5") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -892,7 +890,7 @@ func TestShouldNotStoreSnapshotInDatastore(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) client, server := createClientAndServer(t, listener, prov, priv) @@ -900,12 +898,12 @@ func TestShouldNotStoreSnapshotInDatastore(t *testing.T) { provideMany(t, client, ctx, []cid.Cid{testCid1, testCid2, testCid3, testCid4, testCid5}) - require.False(t, reframelistener.HasSnapshot(ctx, listener)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid1)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid2)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid3)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid4)) - require.True(t, reframelistener.HasCidTimestamp(ctx, listener, testCid5)) + require.False(t, drouting.HasSnapshot(ctx, listener)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid1)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid2)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid3)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid4)) + require.True(t, drouting.HasCidTimestamp(ctx, listener, testCid5)) } func TestShouldCleanUpTimestampMappingsFromDatastore(t *testing.T) { @@ -922,7 +920,7 @@ func TestShouldCleanUpTimestampMappingsFromDatastore(t *testing.T) { testCid3 := newCid("test3") testCid4 := newCid("test4") testCid5 := newCid("test5") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -932,7 +930,7 @@ func TestShouldCleanUpTimestampMappingsFromDatastore(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener1, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener1, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) client, server := createClientAndServer(t, listener1, prov, priv) @@ -943,15 +941,15 @@ func TestShouldCleanUpTimestampMappingsFromDatastore(t *testing.T) { server.Close() - listener2, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener2, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) - require.True(t, reframelistener.HasSnapshot(ctx, listener2)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener2, testCid1)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener2, testCid2)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener2, testCid3)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener2, testCid4)) - require.False(t, reframelistener.HasCidTimestamp(ctx, listener2, testCid5)) + require.True(t, drouting.HasSnapshot(ctx, listener2)) + require.False(t, drouting.HasCidTimestamp(ctx, listener2, testCid1)) + require.False(t, drouting.HasCidTimestamp(ctx, listener2, testCid2)) + require.False(t, drouting.HasCidTimestamp(ctx, listener2, testCid3)) + require.False(t, drouting.HasCidTimestamp(ctx, listener2, testCid4)) + require.False(t, drouting.HasCidTimestamp(ctx, listener2, testCid5)) } func TestShouldCorrectlyMergeSnapshotAndCidTimestamps(t *testing.T) { @@ -968,7 +966,7 @@ func TestShouldCorrectlyMergeSnapshotAndCidTimestamps(t *testing.T) { testCid3 := newCid("test3") testCid4 := newCid("test4") testCid5 := newCid("test5") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -978,7 +976,7 @@ func TestShouldCorrectlyMergeSnapshotAndCidTimestamps(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener1, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener1, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) client, server := createClientAndServer(t, listener1, prov, priv) @@ -996,10 +994,10 @@ func TestShouldCorrectlyMergeSnapshotAndCidTimestamps(t *testing.T) { server.Close() - listener2, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener2, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) - require.Equal(t, []cid.Cid{testCid2, testCid5, testCid4, testCid1, testCid3}, reframelistener.GetExpiryQueue(ctx, listener2)) + require.Equal(t, []cid.Cid{testCid2, testCid5, testCid4, testCid1, testCid3}, drouting.GetExpiryQueue(ctx, listener2)) } func TestInitialiseFromDatastoreWithoutSnapshot(t *testing.T) { @@ -1027,7 +1025,7 @@ func verifyInitialisationFromDatastore(t *testing.T, snapshotSize int, ttl time. for i := 0; i < len(testCids); i++ { testCids[i] = newCid(fmt.Sprintf("test%d", i)) } - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() @@ -1045,13 +1043,13 @@ func verifyInitialisationFromDatastore(t *testing.T, snapshotSize int, ttl time. cidStrs = append(cidStrs, c.String()) } - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID(cidStrs, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID(cidStrs, testNonceGen())), gomock.Eq(defaultMetadata)) } mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) ds := datastore.NewMapDatastore() - listener1, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen, reframelistener.WithPageSize(pageSize)) + listener1, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen, drouting.WithPageSize(pageSize)) require.NoError(t, err) client, server := createClientAndServer(t, listener1, prov, priv) @@ -1065,7 +1063,7 @@ func verifyInitialisationFromDatastore(t *testing.T, snapshotSize int, ttl time. server.Close() - listener2, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen, reframelistener.WithPageSize(pageSize)) + listener2, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen, drouting.WithPageSize(pageSize)) require.NoError(t, err) // verify that: @@ -1075,13 +1073,13 @@ func verifyInitialisationFromDatastore(t *testing.T, snapshotSize int, ttl time. for i := 0; i < len(testCids); i += chunkSize { if i+chunkSize < len(testCids) { for j := i; j < len(testCids); j++ { - require.True(t, reframelistener.CidExist(ctx, listener2, testCids[j], false)) + require.True(t, drouting.CidExist(ctx, listener2, testCids[j], false)) } break } - require.True(t, reframelistener.ChunkExists(ctx, listener2, testCids[i:i+chunkSize], testNonceGen)) + require.True(t, drouting.ChunkExists(ctx, listener2, testCids[i:i+chunkSize], testNonceGen)) for j := 0; j < chunkSize; j++ { - require.True(t, reframelistener.CidExist(ctx, listener2, testCids[i+j], true)) + require.True(t, drouting.CidExist(ctx, listener2, testCids[i+j], true)) } } @@ -1091,7 +1089,7 @@ func verifyInitialisationFromDatastore(t *testing.T, snapshotSize int, ttl time. reverseTestCids[i] = testCids[len(testCids)-i-1] } - require.Equal(t, reverseTestCids, reframelistener.GetExpiryQueue(ctx, listener2)) + require.Equal(t, reverseTestCids, drouting.GetExpiryQueue(ctx, listener2)) } func TestCleanUpExpiredCidsThatDontHaveChunk(t *testing.T) { @@ -1106,18 +1104,18 @@ func TestCleanUpExpiredCidsThatDontHaveChunk(t *testing.T) { testCid2 := newCid("test2") testCid3 := newCid("test3") testCid100 := newCid("test100") - prov := newProvider(t, pID) + prov := newAddrInfo(t, pID) mc := gomock.NewController(t) defer mc.Finish() mockEng := mock_provider.NewMockInterface(mc) mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(&prov.Peer), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) + mockEng.EXPECT().NotifyPut(gomock.Any(), gomock.Eq(prov), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen())), gomock.Eq(defaultMetadata)) mockEng.EXPECT().NotifyRemove(gomock.Any(), gomock.Eq(pID), gomock.Eq(generateContextID([]string{testCid1.String(), testCid2.String()}, testNonceGen()))) ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, ds, testNonceGen) require.NoError(t, err) c, s := createClientAndServer(t, listener, prov, priv) @@ -1129,10 +1127,10 @@ func TestCleanUpExpiredCidsThatDontHaveChunk(t *testing.T) { provide(t, c, ctx, testCid100) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid3)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid2)) - require.True(t, reframelistener.CidNotExist(ctx, listener, testCid1)) - require.Equal(t, []cid.Cid{testCid100}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid3)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid2)) + require.True(t, drouting.CidNotExist(ctx, listener, testCid1)) + require.Equal(t, []cid.Cid{testCid100}, drouting.GetExpiryQueue(ctx, listener)) } func TestCidsWithoutChunkAreRegisteredInDsAndIndexes(t *testing.T) { @@ -1151,16 +1149,16 @@ func TestCidsWithoutChunkAreRegisteredInDsAndIndexes(t *testing.T) { mockEng.EXPECT().RegisterMultihashLister(gomock.Any()) - listener, err := reframelistener.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), nil) + listener, err := drouting.New(ctx, mockEng, ttl, chunkSize, snapshotSize, "", nil, datastore.NewMapDatastore(), nil) require.NoError(t, err) - c, s := createClientAndServer(t, listener, newProvider(t, pID), priv) + c, s := createClientAndServer(t, listener, newAddrInfo(t, pID), priv) defer s.Close() provide(t, c, ctx, testCid1) - require.True(t, reframelistener.CidExist(ctx, listener, testCid1, false)) - require.Equal(t, []cid.Cid{testCid1}, reframelistener.GetExpiryQueue(ctx, listener)) + require.True(t, drouting.CidExist(ctx, listener, testCid1, false)) + require.Equal(t, []cid.Cid{testCid1}, drouting.GetExpiryQueue(ctx, listener)) } func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { @@ -1183,7 +1181,7 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, + listener, err := drouting.New(ctx, engine, ttl, chunkSize, @@ -1192,7 +1190,7 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { nil, ds, testNonceGen, - reframelistener.WithSnapshotMaxChunkSize(1)) + drouting.WithSnapshotMaxChunkSize(1)) require.NoError(t, err) @@ -1201,15 +1199,15 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { cids[i] = newCid(fmt.Sprintf("test%d", i)) } - client, server := createClientAndServer(t, listener, newProvider(t, pID), priv) + client, server := createClientAndServer(t, listener, newAddrInfo(t, pID), priv) defer server.Close() provideMany(t, client, ctx, cids) - require.Equal(t, cidsNumber, reframelistener.SnapshotsQty(ctx, listener)) + require.Equal(t, cidsNumber, drouting.SnapshotsQty(ctx, listener)) // create a new listener and verify that it has initialised correctly - listener, err = reframelistener.New(ctx, + listener, err = drouting.New(ctx, engine, ttl, chunkSize, @@ -1221,7 +1219,7 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { require.NoError(t, err) - queue := reframelistener.GetExpiryQueue(ctx, listener) + queue := drouting.GetExpiryQueue(ctx, listener) sort.Slice(queue, func(i, j int) bool { return queue[i].String() < queue[j].String() }) @@ -1248,7 +1246,7 @@ func TestShouldCleanUpOldSnapshotChunksAfterStoringNewOnes(t *testing.T) { ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, + listener, err := drouting.New(ctx, engine, ttl, chunkSize, @@ -1257,7 +1255,7 @@ func TestShouldCleanUpOldSnapshotChunksAfterStoringNewOnes(t *testing.T) { nil, ds, testNonceGen, - reframelistener.WithSnapshotMaxChunkSize(1)) + drouting.WithSnapshotMaxChunkSize(1)) require.NoError(t, err) @@ -1266,14 +1264,14 @@ func TestShouldCleanUpOldSnapshotChunksAfterStoringNewOnes(t *testing.T) { cids[i] = newCid(fmt.Sprintf("test%d", i)) } - client, server := createClientAndServer(t, listener, newProvider(t, pID), priv) + client, server := createClientAndServer(t, listener, newAddrInfo(t, pID), priv) defer server.Close() provideMany(t, client, ctx, cids) - require.Equal(t, cidsNumber, reframelistener.SnapshotsQty(ctx, listener)) + require.Equal(t, cidsNumber, drouting.SnapshotsQty(ctx, listener)) time.Sleep(ttl) provideMany(t, client, ctx, cids[0:2]) - require.Equal(t, 2, reframelistener.SnapshotsQty(ctx, listener)) + require.Equal(t, 2, drouting.SnapshotsQty(ctx, listener)) } func TestShouldRecogniseLegacySnapshot(t *testing.T) { @@ -1295,7 +1293,7 @@ func TestShouldRecogniseLegacySnapshot(t *testing.T) { ds := datastore.NewMapDatastore() - listener, err := reframelistener.New(ctx, + listener, err := drouting.New(ctx, engine, ttl, chunkSize, @@ -1304,26 +1302,26 @@ func TestShouldRecogniseLegacySnapshot(t *testing.T) { nil, ds, testNonceGen, - reframelistener.WithSnapshotMaxChunkSize(1)) + drouting.WithSnapshotMaxChunkSize(1)) require.NoError(t, err) - client, server := createClientAndServer(t, listener, newProvider(t, pID), priv) + client, server := createClientAndServer(t, listener, newAddrInfo(t, pID), priv) defer server.Close() provide(t, client, ctx, newCid("test")) - snapshot, err := reframelistener.WrappedDatastore(listener).Get(ctx, datastore.NewKey("ts/0")) + snapshot, err := drouting.WrappedDatastore(listener).Get(ctx, datastore.NewKey("ts/0")) require.NoError(t, err) - err = reframelistener.WrappedDatastore(listener).Put(ctx, datastore.NewKey("ts"), snapshot) + err = drouting.WrappedDatastore(listener).Put(ctx, datastore.NewKey("ts"), snapshot) require.NoError(t, err) - err = reframelistener.WrappedDatastore(listener).Delete(ctx, datastore.NewKey("ts/0")) + err = drouting.WrappedDatastore(listener).Delete(ctx, datastore.NewKey("ts/0")) require.NoError(t, err) // create a new listener and verify that it has initialised correctly - listener, err = reframelistener.New(ctx, + listener, err = drouting.New(ctx, engine, ttl, chunkSize, @@ -1335,16 +1333,16 @@ func TestShouldRecogniseLegacySnapshot(t *testing.T) { require.NoError(t, err) - queue := reframelistener.GetExpiryQueue(ctx, listener) + queue := drouting.GetExpiryQueue(ctx, listener) require.Equal(t, []cid.Cid{newCid("test")}, queue) } -func provide(t *testing.T, cc *client.Client, ctx context.Context, c cid.Cid) time.Duration { +func provide(t *testing.T, cc contentrouter.Client, ctx context.Context, c cid.Cid) time.Duration { return provideMany(t, cc, ctx, []cid.Cid{c}) } -func provideMany(t *testing.T, cc *client.Client, ctx context.Context, cids []cid.Cid) time.Duration { - rc, err := cc.Provide(ctx, cids, 2*time.Hour) +func provideMany(t *testing.T, cc contentrouter.Client, ctx context.Context, cids []cid.Cid) time.Duration { + rc, err := cc.ProvideBitswap(ctx, cids, 2*time.Hour) require.NoError(t, err) return rc } @@ -1364,15 +1362,20 @@ func newCid(s string) cid.Cid { return cid.NewCidV1(cid.Raw, testMH1) } -func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, p *client.Provider, identity crypto.PrivKey) (*client.Client, *httptest.Server) { +func createClientAndServer(t *testing.T, router server.ContentRouter, p *peer.AddrInfo, identity crypto.PrivKey) (contentrouter.Client, *httptest.Server) { // start a server - s := httptest.NewServer(server.DelegatedRoutingAsyncHandler(service)) + s := httptest.NewServer(server.Handler(router)) // start a client - q, err := proto.New_DelegatedRouting_Client(s.URL, proto.DelegatedRouting_Client_WithHTTPClient(s.Client())) - require.NoError(t, err) - c, err := client.NewClient(q, p, identity) - require.NoError(t, err) + var c contentrouter.Client + var err error + if p != nil { + c, err = client.New(s.URL, client.WithIdentity(identity), client.WithProviderInfo(p.ID, p.Addrs)) + require.NoError(t, err) + } else { + c, err = client.New(s.URL, client.WithIdentity(identity)) + require.NoError(t, err) + } return c, s } diff --git a/reframe/options.go b/delegatedrouting/options.go similarity index 97% rename from reframe/options.go rename to delegatedrouting/options.go index dfb52e1e..9b74ea95 100644 --- a/reframe/options.go +++ b/delegatedrouting/options.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting const ( defaultSnapshotMaxChunkSize = 1_000_000 diff --git a/reframe/options_test.go b/delegatedrouting/options_test.go similarity index 56% rename from reframe/options_test.go rename to delegatedrouting/options_test.go index b75997df..ecb54e07 100644 --- a/reframe/options_test.go +++ b/delegatedrouting/options_test.go @@ -1,13 +1,13 @@ -package reframe_test +package delegatedrouting_test import ( "testing" - reframeListener "github.com/ipni/index-provider/reframe" + drouting "github.com/ipni/index-provider/delegatedrouting" "github.com/stretchr/testify/require" ) func TestOptionsDefaults(t *testing.T) { - opts := reframeListener.ApplyOptions() + opts := drouting.ApplyOptions() require.Equal(t, 1_000_000, opts.SnapshotMaxChunkSize) } diff --git a/reframe/stats_reporter.go b/delegatedrouting/stats_reporter.go similarity index 74% rename from reframe/stats_reporter.go rename to delegatedrouting/stats_reporter.go index b0cc63d3..8f108092 100644 --- a/reframe/stats_reporter.go +++ b/delegatedrouting/stats_reporter.go @@ -1,4 +1,4 @@ -package reframe +package delegatedrouting import ( "sync/atomic" @@ -14,15 +14,15 @@ type statsReporter struct { } type stats struct { - putAdsSent int64 - removeAdsSent int64 - cidsProcessed int64 - existingCidsProcessed int64 - cidsExpired int64 - reframeCallsReceived int64 - reframeCallsProcessed int64 - chunkCacheMisses int64 - chunksNotFound int64 + putAdsSent int64 + removeAdsSent int64 + cidsProcessed int64 + existingCidsProcessed int64 + cidsExpired int64 + delegatedRoutingCallsReceived int64 + delegatedRoutingCallsProcessed int64 + chunkCacheMisses int64 + chunksNotFound int64 } func newStatsReporter(totalCidsFunc func() int, totalChunksFunc func() int, currentChunkSizeFunc func() int) *statsReporter { @@ -54,13 +54,13 @@ func (reporter *statsReporter) incCidsExpired() { reporter.s.cidsExpired++ } -func (reporter *statsReporter) incReframeCallsReceived() { +func (reporter *statsReporter) incDelegatedRoutingCallsReceived() { // needs to be threadsafe as it gets called from the webserver handler - atomic.AddInt64(&reporter.s.reframeCallsReceived, 1) + atomic.AddInt64(&reporter.s.delegatedRoutingCallsReceived, 1) } -func (reporter *statsReporter) incReframeCallsProcessed() { - reporter.s.reframeCallsProcessed++ +func (reporter *statsReporter) incDelegatedRoutingCallsProcessed() { + reporter.s.delegatedRoutingCallsProcessed++ } func (reporter *statsReporter) incChunkCacheMisses() { diff --git a/go.mod b/go.mod index 36eda121..c470aacb 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,10 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/ipfs/go-cid v0.4.0 github.com/ipfs/go-datastore v0.6.0 - github.com/ipfs/go-delegated-routing v0.7.0 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-graphsync v0.14.4 github.com/ipfs/go-ipfs-blockstore v1.3.0 + github.com/ipfs/go-libipfs v0.7.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/kubo v0.18.1 github.com/ipld/go-car/v2 v2.8.2 @@ -42,9 +42,11 @@ require ( github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 // indirect github.com/filecoin-project/go-state-types v0.9.9 // indirect github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect + github.com/gorilla/mux v1.8.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect - github.com/ipfs/go-libipfs v0.6.1 // indirect + github.com/ipfs/go-ipns v0.3.0 // indirect + github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.5.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect @@ -52,7 +54,9 @@ require ( github.com/quic-go/qtls-go1-20 v0.1.1 // indirect github.com/quic-go/quic-go v0.33.0 // indirect github.com/quic-go/webtransport-go v0.5.2 // indirect + github.com/samber/lo v1.36.0 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect + go.opencensus.io v0.24.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/fx v1.18.2 // indirect golang.org/x/text v0.7.0 // indirect @@ -106,14 +110,12 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.6 // indirect github.com/ipfs/go-ipld-format v0.4.0 // indirect github.com/ipfs/go-ipld-legacy v0.1.1 // indirect - github.com/ipfs/go-ipns v0.3.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-merkledag v0.10.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/go-unixfsnode v1.6.0 // indirect github.com/ipfs/go-verifcid v0.0.2 // indirect - github.com/ipld/edelweiss v0.2.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/goprocess v0.1.4 // indirect @@ -126,7 +128,6 @@ require ( github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect github.com/libp2p/go-libp2p-gostream v0.6.0 // indirect - github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect @@ -165,7 +166,6 @@ require ( github.com/twmb/murmur3 v1.1.6 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect - go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel/sdk v1.10.0 // indirect go.opentelemetry.io/otel/trace v1.13.0 // indirect go.uber.org/atomic v1.10.0 // indirect diff --git a/go.sum b/go.sum index ed29e613..edec1753 100644 --- a/go.sum +++ b/go.sum @@ -282,6 +282,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= @@ -337,8 +339,6 @@ github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh7 github.com/ipfs/go-datastore v0.5.1/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= -github.com/ipfs/go-delegated-routing v0.7.0 h1:43FyMnKA+8XnyX68Fwg6aoGkqrf8NS5aG7p644s26PU= -github.com/ipfs/go-delegated-routing v0.7.0/go.mod h1:u4zxjUWIe7APUW5ds9CfD0tJX3vM9JhIeNqA8kE4vHE= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= @@ -378,8 +378,8 @@ github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2 github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg= github.com/ipfs/go-ipns v0.3.0 h1:ai791nTgVo+zTuq2bLvEGmWP1M0A6kGTXUsgv/Yq67A= github.com/ipfs/go-ipns v0.3.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= -github.com/ipfs/go-libipfs v0.6.1 h1:OSO9cm1H3r4OXfP0MP1Q5UhTnhd2fByGl6CVYyz/Rhk= -github.com/ipfs/go-libipfs v0.6.1/go.mod h1:FmhKgxMOQA572TK5DA3MZ5GL44ZqsMHIrkgK4gLn4A8= +github.com/ipfs/go-libipfs v0.7.0 h1:Mi54WJTODaOL2/ZSm5loi3SwI3jI2OuFWUrQIkJ5cpM= +github.com/ipfs/go-libipfs v0.7.0/go.mod h1:KsIf/03CqhICzyRGyGo68tooiBE2iFbI/rXW7FhAYr0= github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA= github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs= @@ -404,8 +404,6 @@ github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvT github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipfs/kubo v0.18.1 h1:mF8n2toZkWRc1JXs4pGknqoDGJ9NfP+upy/a8OS3oNg= github.com/ipfs/kubo v0.18.1/go.mod h1:VNKTz0OcX28GvsJQSEAprbMqzlSO19f4esoeDX4ZJLQ= -github.com/ipld/edelweiss v0.2.0 h1:KfAZBP8eeJtrLxLhi7r3N0cBCo7JmwSRhOJp3WSpNjk= -github.com/ipld/edelweiss v0.2.0/go.mod h1:FJAzJRCep4iI8FOFlRriN9n0b7OuX3T/S9++NpBDmA4= github.com/ipld/go-car/v2 v2.8.2 h1:eA3S64qy7Lt+hS8lkO2uXqfNLU7uuGdD/B71hIJw758= github.com/ipld/go-car/v2 v2.8.2/go.mod h1:UeIST4b5Je6LEx8GjFysgeCYwxAHKtAcsWxmF6PupNQ= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= @@ -674,6 +672,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= +github.com/samber/lo v1.36.0 h1:4LaOxH1mHnbDGhTVE0i1z8v/lWaQW8AIfOD3HU4mSaw= +github.com/samber/lo v1.36.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= @@ -718,6 +718,7 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -734,6 +735,7 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= +github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= diff --git a/server/reframe/http/options.go b/server/delegatedrouting/server/options.go similarity index 90% rename from server/reframe/http/options.go rename to server/delegatedrouting/server/options.go index ea80c0c3..6639dc87 100644 --- a/server/reframe/http/options.go +++ b/server/delegatedrouting/server/options.go @@ -1,4 +1,4 @@ -package reframeserver +package server import "time" @@ -28,7 +28,7 @@ func newOptions(o ...Option) (*options, error) { return opts, nil } -// WithListenAddr sets the net address on which the reframe HTTP server is exposed. +// WithListenAddr sets the net address on which the delegated routing HTTP server is exposed. func WithListenAddr(addr string) Option { return func(o *options) error { o.listenAddr = addr diff --git a/server/reframe/http/server.go b/server/delegatedrouting/server/server.go similarity index 61% rename from server/reframe/http/server.go rename to server/delegatedrouting/server/server.go index 3b0d136c..0d259fcb 100644 --- a/server/reframe/http/server.go +++ b/server/delegatedrouting/server/server.go @@ -1,4 +1,4 @@ -package reframeserver +package server import ( "context" @@ -8,10 +8,10 @@ import ( "time" "github.com/ipfs/go-datastore" - drserver "github.com/ipfs/go-delegated-routing/server" + "github.com/ipfs/go-libipfs/routing/http/server" logging "github.com/ipfs/go-log/v2" provider "github.com/ipni/index-provider" - reframelistener "github.com/ipni/index-provider/reframe" + drouting "github.com/ipni/index-provider/delegatedrouting" ) var log = logging.Logger("adminserver") @@ -19,7 +19,7 @@ var log = logging.Logger("adminserver") type Server struct { server *http.Server netListener net.Listener - rListener *reframelistener.ReframeListener + rListener *drouting.Listener } func New(cidTtl time.Duration, @@ -38,15 +38,15 @@ func New(cidTtl time.Duration, } netListener, err := net.Listen("tcp", opts.listenAddr) if err != nil { - return nil, fmt.Errorf("reframe initialisation failed: %s", err) + return nil, fmt.Errorf("delegated routing initialisation failed: %s", err) } - rListener, err := reframelistener.New(context.Background(), e, cidTtl, chunkSize, snapshotSize, providerID, addrs, ds, nil, reframelistener.WithPageSize(pageSize)) + rListener, err := drouting.New(context.Background(), e, cidTtl, chunkSize, snapshotSize, providerID, addrs, ds, nil, drouting.WithPageSize(pageSize)) if err != nil { - return nil, fmt.Errorf("reframe initialisation failed: %s", err) + return nil, fmt.Errorf("delegated routing initialisation failed: %s", err) } - handler := drserver.DelegatedRoutingAsyncHandler(rListener) + handler := server.Handler(rListener) s := &http.Server{ Handler: handler, @@ -62,12 +62,12 @@ func New(cidTtl time.Duration, } func (s *Server) Start() error { - log.Infow("reframe http server listening", "addr", s.netListener.Addr()) + log.Infow("Delegated Routing http server listening", "addr", s.netListener.Addr()) return s.server.Serve(s.netListener) } func (s *Server) Shutdown(ctx context.Context) error { - log.Info("reframe http server shutdown") + log.Info("Delegated Routing http server shutdown") s.rListener.Shutdown() return s.server.Shutdown(ctx) }