Skip to content

Commit

Permalink
Add command to import providers from another indexer (#574)
Browse files Browse the repository at this point in the history
* Add command to import providers from another indexer

The "admin import-providers" subcommand imports providers from another indexer. The other indexer is specified using the `--from` flag and specifying the host:port of the other indexer.

* After importing publishers, sync with them immediately
  • Loading branch information
gammazero authored Jun 14, 2022
1 parent dee5770 commit f3ddfab
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 7 deletions.
41 changes: 41 additions & 0 deletions api/v0/admin/client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -120,6 +121,46 @@ func (c *Client) Sync(ctx context.Context, peerID peer.ID, peerAddr multiaddr.Mu
return c.ingestRequest(ctx, peerID, "sync", http.MethodPost, data, q...)
}

// ImportProviders
func (c *Client) ImportProviders(ctx context.Context, fromURL *url.URL) error {
if fromURL == nil || fromURL.String() == "" {
return errors.New("missing indexer url")
}

u := c.baseURL + "/importproviders"

binURL, err := fromURL.MarshalBinary()
if err != nil {
return err
}
params := map[string][]byte{
"indexer": binURL,
}
bodyData, err := json.Marshal(&params)
if err != nil {
return err
}
body := bytes.NewBuffer(bodyData)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

resp, err := c.c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return httpclient.ReadErrorFrom(resp.StatusCode, resp.Body)
}

return nil
}

// ReloadConfig reloads reloadable parts of the configuration file.
func (c *Client) ReloadConfig(ctx context.Context) error {
u := c.baseURL + "/reloadconfig"
Expand Down
27 changes: 27 additions & 0 deletions command/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package command

import (
"fmt"
"net/url"

httpclient "github.com/filecoin-project/storetheindex/api/v0/admin/client/http"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -30,6 +31,13 @@ var block = &cli.Command{
Action: blockCmd,
}

var importProviders = &cli.Command{
Name: "import-providers",
Usage: "Import provider information from another indexer",
Flags: importProvidersFlags,
Action: importProvidersCmd,
}

var reload = &cli.Command{
Name: "reload-config",
Usage: "Reload policy, rate limit, workers, and batch settings from the configuration file",
Expand All @@ -44,6 +52,7 @@ var AdminCmd = &cli.Command{
sync,
allow,
block,
importProviders,
reload,
},
}
Expand Down Expand Up @@ -107,6 +116,24 @@ func blockCmd(cctx *cli.Context) error {
return nil
}

func importProvidersCmd(cctx *cli.Context) error {
fromURL := &url.URL{
Scheme: "http",
Host: cctx.String("from"),
Path: "/providers",
}
cl, err := httpclient.New(cliIndexer(cctx, "admin"))
if err != nil {
return err
}
err = cl.ImportProviders(cctx.Context, fromURL)
if err != nil {
return err
}
fmt.Println("Imported providers from indexer", fromURL.String())
return nil
}

func reloadConfigCmd(cctx *cli.Context) error {
cl, err := httpclient.New(cliIndexer(cctx, "admin"))
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions command/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ var providersListFlags = []cli.Flag{
indexerHostFlag,
}

var importProvidersFlags = []cli.Flag{
&cli.StringFlag{
Name: "from",
Usage: "Host or host:port of indexer to get providers from",
Aliases: []string{"f"},
Required: true,
},
indexerHostFlag,
}

var registerFlags = []cli.Flag{
&cli.StringFlag{
Name: "config",
Expand Down
40 changes: 34 additions & 6 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func (e *e2eTestRunner) start(prog string, args ...string) *exec.Cmd {
switch name {
case "storetheindex":
if strings.Contains(line, "Indexer is ready") {
close(e.indexerReady)
e.indexerReady <- struct{}{}
}
case "provider":
line = strings.ToLower(line)
if strings.Contains(line, "connected to peer successfully") {
close(e.providerHasPeer)
e.providerHasPeer <- struct{}{}
} else if strings.Contains(line, "admin http server listening") {
close(e.providerReady)
e.providerReady <- struct{}{}
}
}
}
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
dir: t.TempDir(),
ctx: ctx,

indexerReady: make(chan struct{}),
providerReady: make(chan struct{}),
providerHasPeer: make(chan struct{}),
indexerReady: make(chan struct{}, 1),
providerReady: make(chan struct{}, 1),
providerHasPeer: make(chan struct{}, 1),
}

carPath := filepath.Join(e.dir, "sample-wrapped-v2.car")
Expand Down Expand Up @@ -283,6 +283,34 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
return nil
})

root2 := filepath.Join(e.dir, ".storetheindex2")
e.env = append(e.env, fmt.Sprintf("%s=%s", config.EnvDir, root2))
e.run(indexer, "init", "--store", "memory", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap",
"--listen-admin", "/ip4/127.0.0.1/tcp/3202", "--listen-finder", "/ip4/127.0.0.1/tcp/3200", "--listen-ingest", "/ip4/127.0.0.1/tcp/3201")

cmdIndexer2 := e.start(indexer, "daemon")
select {
case <-e.indexerReady:
case <-ctx.Done():
t.Fatal("timed out waiting for indexer2 to start")
}

outProviders := e.run(indexer, "providers", "list", "--indexer", "localhost:3200")
if !strings.HasPrefix(string(outProviders), "No providers registered with indexer") {
t.Errorf("expected no providers message, got %q", string(outProviders))
}

// import providers from first indexer.
e.run(indexer, "admin", "import-providers", "--indexer", "localhost:3202", "--from", "localhost:3000")

outProviders = e.run(indexer, "providers", "list", "--indexer", "localhost:3200")

// Check that provider ID now appears in providers output.
if !strings.Contains(string(outProviders), providerID) {
t.Errorf("expected provider id in providers output after import-providers, got %q", string(outProviders))
}
e.stop(cmdIndexer2, time.Second)

e.stop(cmdIndexer, time.Second)
e.stop(cmdProvider, time.Second)
}
Expand Down
69 changes: 68 additions & 1 deletion internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"path"
"sync"
"time"

v0 "github.com/filecoin-project/storetheindex/api/v0"
httpclient "github.com/filecoin-project/storetheindex/api/v0/finder/client/http"
"github.com/filecoin-project/storetheindex/config"
"github.com/filecoin-project/storetheindex/internal/metrics"
"github.com/filecoin-project/storetheindex/internal/registry/discovery"
Expand Down Expand Up @@ -334,7 +336,7 @@ func (r *Registry) Register(ctx context.Context, info *ProviderInfo) error {
return err
}

log.Infow("registered provider", "id", info.AddrInfo.ID, "addrs", info.AddrInfo.Addrs)
log.Infow("Registered provider", "id", info.AddrInfo.ID, "addrs", info.AddrInfo.Addrs)
return nil
}

Expand Down Expand Up @@ -510,6 +512,71 @@ func (r *Registry) AllProviderInfo() []*ProviderInfo {
return infos
}

// ImportProviders reads providers from another indexer and registers any that
// are not already registered. Returns the count of newly registered providers.
func (r *Registry) ImportProviders(ctx context.Context, fromURL *url.URL) (int, error) {
cl, err := httpclient.New(fromURL.String())
if err != nil {
return 0, err
}

provs, err := cl.ListProviders(ctx)
if err != nil {
return 0, err
}

var newProvs []*ProviderInfo
for _, pInfo := range provs {
if r.IsRegistered(pInfo.AddrInfo.ID) {
continue
}
regInfo := &ProviderInfo{
AddrInfo: pInfo.AddrInfo,
}

var pubErr error
if pInfo.Publisher == nil {
pubErr = errors.New("missing publisher")
} else if pInfo.Publisher.ID.Validate() != nil {
pubErr = errors.New("bad publisher id")
} else if len(pInfo.Publisher.Addrs) == 0 {
pubErr = errors.New("publisher missing addresses")
}
if pubErr != nil {
// If publisher does not have a valid ID and addresses, then use
// provider ad publisher.
log.Infow("Provider does not have valid publisher, assuming same as provider", "reason", pubErr, "provider", regInfo.AddrInfo.ID)
regInfo.Publisher = regInfo.AddrInfo.ID
regInfo.PublisherAddr = regInfo.AddrInfo.Addrs[0]
} else {
regInfo.Publisher = pInfo.Publisher.ID
regInfo.PublisherAddr = pInfo.Publisher.Addrs[0]
}

err = r.Register(ctx, regInfo)
if err != nil {
log.Infow("Cannot register provider", "provider", pInfo.AddrInfo.ID, "err", err)
continue
}

newProvs = append(newProvs, regInfo)
}
log.Infow("Imported new providers from other indexer", "from", fromURL.String(), "count", len(newProvs))

// Start gorouting to sync with all the new providers.
go func() {
for _, pinfo := range newProvs {
select {
case r.syncChan <- pinfo:
case <-r.closing:
return
}
}
}()

return len(newProvs), nil
}

func (r *Registry) CheckSequence(peerID peer.ID, seq uint64) error {
return r.sequences.check(peerID, seq)
}
Expand Down
38 changes: 38 additions & 0 deletions server/admin/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"strconv"

Expand Down Expand Up @@ -142,6 +143,43 @@ func (h *adminHandler) sync(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}

func (h *adminHandler) importProviders(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.Errorw("failed reading import cidlist request", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}
var params map[string][]byte
err = json.Unmarshal(body, &params)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
from, ok := params["indexer"]
if !ok {
http.Error(w, "missing indexer url in request", http.StatusBadRequest)
return
}

fromURL := &url.URL{}
err = fromURL.UnmarshalBinary(from)
if err != nil {
http.Error(w, "bad indexer url: "+err.Error(), http.StatusBadRequest)
return
}

_, err = h.reg.ImportProviders(h.ctx, fromURL)
if err != nil {
msg := "Cannot get providers from other indexer"
log.Errorw(msg, "err", err)
http.Error(w, msg, http.StatusBadGateway)
return
}

w.WriteHeader(http.StatusOK)
}

func (h *adminHandler) reloadConfig(w http.ResponseWriter, r *http.Request) {
errChan := make(chan error)
h.reloadErrChan <- errChan
Expand Down
1 change: 1 addition & 0 deletions server/admin/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(listen string, indexer indexer.Interface, ingester *ingest.Ingester, re

// Admin routes
r.HandleFunc("/healthcheck", h.healthCheckHandler).Methods(http.MethodGet)
r.HandleFunc("/importproviders", h.importProviders).Methods(http.MethodPost)
r.HandleFunc("/reloadconfig", h.reloadConfig).Methods(http.MethodPost)

// Ingester routes
Expand Down

0 comments on commit f3ddfab

Please sign in to comment.