Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to http delegated routing #348

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,15 @@ provider import car -l http://localhost:3102 -i <path-to-car-file>

Both CARv1 and CARv2 formats are supported. Index is regenerated on the fly if one is not present.

#### Exposing reframe server from provider (experimental)
#### Exposing delegated routing server from provider (experimental)

Provider can export a reframe server. [Reframe](https://github.com/ipfs/specs/blob/main/reframe/REFRAME_PROTOCOL.md) is a protocol
that allows IPFS nodes to advertise their contents to indexers alongside DHT. Reframe server is off by default.
To enable it, add the following configuration block to the provider config file.
Provider can export a Delegated Routing server. Delegated Routing allows IPFS nodes to advertise their contents to indexers alongside DHT.
Delegated Routing server is off by default. To enable it, add the following configuration block to the provider config file.

```
{
...
Reframe {
DelegatedRouting {
ListenMultiaddr: "/ip4/0.0.0.0/tcp/50617 (example)"
}
...
Expand Down
48 changes: 24 additions & 24 deletions cmd/provider/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/ipni/index-provider/engine"
"github.com/ipni/index-provider/engine/policy"
adminserver "github.com/ipni/index-provider/server/admin/http"
reframeserver "github.com/ipni/index-provider/server/reframe/http"
droutingserver "github.com/ipni/index-provider/server/delegatedrouting/server"
"github.com/ipni/index-provider/supplier"
"github.com/libp2p/go-libp2p"
"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -187,7 +187,7 @@ func daemonCommand(cctx *cli.Context) error {
adminErrChan <- adminSvr.Start()
}()

reframeErrChan := make(chan error, 1)
droutingErrChan := make(chan error, 1)
// If there are bootstrap peers and bootstrapping is enabled, then try to
// connect to the minimum set of peers.
if len(cfg.Bootstrap.Peers) != 0 && cfg.Bootstrap.MinimumPeers != 0 {
Expand All @@ -206,36 +206,36 @@ func daemonCommand(cctx *cli.Context) error {
defer bootstrapper.Close()
}

// setting up reframe server
var reframeSrv *reframeserver.Server
if len(cfg.Reframe.ListenMultiaddr) != 0 {
reframeAddr, err := cfg.Reframe.ListenNetAddr()
// setting up delegated routing server
var droutingSrv *droutingserver.Server
if len(cfg.DelegatedRouting.ListenMultiaddr) != 0 {
droutingAddr, err := cfg.DelegatedRouting.ListenNetAddr()
if err != nil {
return err
}

reframeSrv, err = reframeserver.New(
time.Duration(cfg.Reframe.CidTtl),
cfg.Reframe.ChunkSize,
cfg.Reframe.SnapshotSize,
cfg.Reframe.DsPageSize,
cfg.Reframe.ProviderID,
cfg.Reframe.Addrs,
droutingSrv, err = droutingserver.New(
time.Duration(cfg.DelegatedRouting.CidTtl),
cfg.DelegatedRouting.ChunkSize,
cfg.DelegatedRouting.SnapshotSize,
cfg.DelegatedRouting.DsPageSize,
cfg.DelegatedRouting.ProviderID,
cfg.DelegatedRouting.Addrs,
eng,
ds,
reframeserver.WithListenAddr(reframeAddr),
reframeserver.WithReadTimeout(time.Duration(cfg.Reframe.ReadTimeout)),
reframeserver.WithWriteTimeout(time.Duration(cfg.Reframe.WriteTimeout)),
droutingserver.WithListenAddr(droutingAddr),
droutingserver.WithReadTimeout(time.Duration(cfg.DelegatedRouting.ReadTimeout)),
droutingserver.WithWriteTimeout(time.Duration(cfg.DelegatedRouting.WriteTimeout)),
)

if err != nil {
return err
}
log.Infow("reframe server initialized", "address", cfg.Reframe.ListenMultiaddr)
log.Infow("delegated routing server initialized", "address", cfg.DelegatedRouting.ListenMultiaddr)

fmt.Fprintf(cctx.App.ErrWriter, "Starting reframe server on %s ...", cfg.Reframe.ListenMultiaddr)
fmt.Fprintf(cctx.App.ErrWriter, "Starting delegated routing server on %s ...", cfg.DelegatedRouting.ListenMultiaddr)
go func() {
reframeErrChan <- reframeSrv.Start()
droutingErrChan <- droutingSrv.Start()
}()
}

Expand All @@ -246,8 +246,8 @@ func daemonCommand(cctx *cli.Context) error {
case err = <-adminErrChan:
log.Errorw("Failed to start admin server", "err", err)
finalErr = ErrDaemonStart
case err = <-reframeErrChan:
log.Errorw("Failed to start reframe server", "err", err)
case err = <-droutingErrChan:
log.Errorw("Failed to start delegated routing server", "err", err)
finalErr = ErrDaemonStart
}

Expand Down Expand Up @@ -281,9 +281,9 @@ func daemonCommand(cctx *cli.Context) error {
log.Errorw("Error shutting down admin server: %s", err)
finalErr = ErrDaemonStop
}
if reframeSrv != nil {
if err = reframeSrv.Shutdown(shutdownCtx); err != nil {
log.Errorw("Error shutting down reframe server.", "err", err)
if droutingSrv != nil {
if err = droutingSrv.Shutdown(shutdownCtx); err != nil {
log.Errorw("Error shutting down delegated routing server.", "err", err)
finalErr = ErrDaemonStop
}
}
Expand Down
32 changes: 16 additions & 16 deletions cmd/provider/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (

// Config is used to load config files.
type Config struct {
Identity Identity
Datastore Datastore
Ingest Ingest
ProviderServer ProviderServer
AdminServer AdminServer
Bootstrap Bootstrap
DirectAnnounce DirectAnnounce
Reframe Reframe
Identity Identity
Datastore Datastore
Ingest Ingest
ProviderServer ProviderServer
AdminServer AdminServer
Bootstrap Bootstrap
DirectAnnounce DirectAnnounce
DelegatedRouting DelegatedRouting
}

const (
Expand Down Expand Up @@ -96,13 +96,13 @@ func Load(filePath string) (*Config, error) {

// Populate with initial values in case they are not present in config.
cfg := Config{
Bootstrap: NewBootstrap(),
Datastore: NewDatastore(),
Ingest: NewIngest(),
AdminServer: NewAdminServer(),
ProviderServer: NewProviderServer(),
DirectAnnounce: NewDirectAnnounce(),
Reframe: NewReframe(),
Bootstrap: NewBootstrap(),
Datastore: NewDatastore(),
Ingest: NewIngest(),
AdminServer: NewAdminServer(),
ProviderServer: NewProviderServer(),
DirectAnnounce: NewDirectAnnounce(),
DelegatedRouting: NewDelegatedRouting(),
}

if err = json.NewDecoder(f).Decode(&cfg); err != nil {
Expand Down Expand Up @@ -158,5 +158,5 @@ func (c *Config) PopulateDefaults() {
c.Datastore.PopulateDefaults()
c.Ingest.PopulateDefaults()
c.ProviderServer.PopulateDefaults()
c.Reframe.PopulateDefaults()
c.DelegatedRouting.PopulateDefaults()
}
14 changes: 7 additions & 7 deletions cmd/provider/internal/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func Init(out io.Writer) (*Config, error) {

func InitWithIdentity(identity Identity) (*Config, error) {
return &Config{
Identity: identity,
Bootstrap: NewBootstrap(),
Datastore: NewDatastore(),
Ingest: NewIngest(),
ProviderServer: NewProviderServer(),
AdminServer: NewAdminServer(),
Reframe: NewReframe(),
Identity: identity,
Bootstrap: NewBootstrap(),
Datastore: NewDatastore(),
Ingest: NewIngest(),
ProviderServer: NewProviderServer(),
AdminServer: NewAdminServer(),
DelegatedRouting: NewDelegatedRouting(),
}, nil
}

Expand Down
52 changes: 26 additions & 26 deletions cmd/provider/internal/config/reframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
)

const (
defaultReframeReadTimeout = Duration(10 * time.Minute)
defaultReframeWriteTimeout = Duration(10 * time.Minute)
defaultReframeCidTtl = Duration(24 * time.Hour)
defaultReframeChunkSize = 1_000
defaultReframeSnapshotSize = 10_000
defaultPageSize = 5_000
defaultDelegatedRoutingReadTimeout = Duration(10 * time.Minute)
defaultDelegatedRoutingWriteTimeout = Duration(10 * time.Minute)
defaultDelegatedRoutingCidTtl = Duration(24 * time.Hour)
defaultDelegatedRoutingChunkSize = 1_000
defaultDelegatedRoutingSnapshotSize = 10_000
defaultPageSize = 5_000
)

// Reframe tracks the configuration of reframe serber. If specified, index provider will expose a reframe server that will
// DelegatedRouting tracks the configuration of delegated routing server. If specified, index provider will expose a delegated routing server that will
// allow an IPFS node to advertise their CIDs through the delegated routing protocol.
type Reframe struct {
type DelegatedRouting struct {
ListenMultiaddr string
ReadTimeout Duration
WriteTimeout Duration
Expand All @@ -30,52 +30,52 @@ type Reframe struct {
// SnapshotSize is the maximum number of records in the Provide payload after which it is considered a snapshot.
// Snapshots don't have individual timestamps recorded into the datastore. Instead, timestamps are recorded as a binary blob after processing is done.
SnapshotSize int
// ProviderID is a Peer ID of the IPFS node that the reframe server is expecting advertisements from
// ProviderID is a Peer ID of the IPFS node that the delegated routing server is expecting advertisements from
ProviderID string
// DsPageSize is a size of the database page that is going to be used on reframe server initialisation.
// DsPageSize is a size of the database page that is going to be used on delegated routing server initialisation.
DsPageSize int
// Addrs is a list of multiaddresses of the IPFS node that the reframe server is expecting advertisements from
// Addrs is a list of multiaddresses of the IPFS node that the delegated routing server is expecting advertisements from
Addrs []string
}

// NewReframe instantiates a new Reframe config with default values.
func NewReframe() Reframe {
return Reframe{
// NewDelegatedRouting instantiates a new delegated routing config with default values.
func NewDelegatedRouting() DelegatedRouting {
return DelegatedRouting{
// we would like this functionality to be off by default
ProviderID: "",
ListenMultiaddr: "",
ReadTimeout: defaultReframeReadTimeout,
WriteTimeout: defaultReframeWriteTimeout,
CidTtl: defaultReframeCidTtl,
ChunkSize: defaultReframeChunkSize,
SnapshotSize: defaultReframeSnapshotSize,
ReadTimeout: defaultDelegatedRoutingReadTimeout,
WriteTimeout: defaultDelegatedRoutingWriteTimeout,
CidTtl: defaultDelegatedRoutingCidTtl,
ChunkSize: defaultDelegatedRoutingChunkSize,
SnapshotSize: defaultDelegatedRoutingSnapshotSize,
DsPageSize: defaultPageSize,
}
}

// PopulateDefaults replaces zero-values in the config with default values.
func (c *Reframe) PopulateDefaults() {
func (c *DelegatedRouting) PopulateDefaults() {
if c.ReadTimeout == 0 {
c.ReadTimeout = defaultReframeReadTimeout
c.ReadTimeout = defaultDelegatedRoutingReadTimeout
}
if c.WriteTimeout == 0 {
c.WriteTimeout = defaultReframeWriteTimeout
c.WriteTimeout = defaultDelegatedRoutingWriteTimeout
}
if c.CidTtl == 0 {
c.CidTtl = defaultReframeCidTtl
c.CidTtl = defaultDelegatedRoutingCidTtl
}
if c.ChunkSize == 0 {
c.ChunkSize = defaultReframeChunkSize
c.ChunkSize = defaultDelegatedRoutingChunkSize
}
if c.SnapshotSize == 0 {
c.SnapshotSize = defaultReframeSnapshotSize
c.SnapshotSize = defaultDelegatedRoutingSnapshotSize
}
if c.DsPageSize == 0 {
c.DsPageSize = defaultPageSize
}
}

func (as *Reframe) ListenNetAddr() (string, error) {
func (as *DelegatedRouting) ListenNetAddr() (string, error) {
maddr, err := multiaddr.NewMultiaddr(as.ListenMultiaddr)
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion reframe/chunker.go → delegatedrouting/chunker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reframe
package delegatedrouting

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion reframe/cid_queue.go → delegatedrouting/cid_queue.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reframe
package delegatedrouting

import (
"container/list"
Expand Down
2 changes: 1 addition & 1 deletion reframe/ds_wrapper.go → delegatedrouting/ds_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reframe
package delegatedrouting

import (
"bytes"
Expand Down
Loading