Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat!: IPNS routing based on IPIP-351 or IPIP-379 (#185)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcin Rataj <[email protected]>
  • Loading branch information
hacdias and lidel authored Oct 5, 2023
1 parent 04eb5db commit f0bcbcb
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 33 deletions.
3 changes: 0 additions & 3 deletions blockstore_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (

const (
EnvProxyGateway = "PROXY_GATEWAY_URL"

DefaultProxyGateway = "http://127.0.0.1:8080"
DefaultKuboRPC = "http://127.0.0.1:5001"
)

type proxyBlockStore struct {
Expand Down
25 changes: 17 additions & 8 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@

### `KUBO_RPC_URL`

Default: see `DefaultKuboRPC`

Single URL or a comma separated list of RPC endpoints that provide `/api/v0` from Kubo.

We use this as temporary solution for IPNS Record routing until [IPIP-351](https://github.com/ipfs/specs/pull/351) ships with Kubo 0.19,
and we also redirect some legacy `/api/v0` commands that need to be handled on `ipfs.io`.
This is used to redirect legacy `/api/v0` commands that need to be handled on `ipfs.io`.
If this is not set, the redirects are not set up.

### `BLOCK_CACHE_SIZE`

Expand Down Expand Up @@ -59,9 +56,21 @@ in the near feature.

### `PROXY_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support `?format=block|car`
responses. This is used by default with `http://127.0.0.1:8080` unless `STRN_ORCHESTRATOR_URL`
is set.
Single URL or a comma separated list of Gateway endpoints that support `?format=block|car|ipns-record`
responses. Either this variable or `STRN_ORCHESTRATOR_URL` must be set.

If this gateway does not support `application/vnd.ipfs.ipns-record`, you can use `IPNS_RECORD_GATEWAY_URL`
to override the gateway address from which to retrieve IPNS Records from.

### `IPNS_RECORD_GATEWAY_URL`

Single URL or a comma separated list of Gateway endpoints that support requests for `application/vnd.ipfs.ipns-record`.
This is used for IPNS Record routing.

`IPNS_RECORD_GATEWAY_URL` also supports [Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/)
for IPNS Record routing ([IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/)). To use it, the provided URL must end with `/routing/v1`.

If not set, the IPNS records will be fetched from `KUBO_RPC_URL`.

## Saturn Backend

Expand Down
26 changes: 19 additions & 7 deletions handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"math/rand"
"net/http"
Expand All @@ -12,6 +13,7 @@ import (
_ "net/http/pprof"

"github.com/ipfs/bifrost-gateway/lib"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/filecoin-saturn/caboose"
Expand Down Expand Up @@ -68,9 +70,16 @@ func withRequestLogger(next http.Handler) http.Handler {
})
}

func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) {
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(kuboRPC, cdns)
func makeGatewayHandler(bs bstore.Blockstore, kuboRPC, ipnsRecordGateways []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) {
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway or kubo RPC.
var routing routing.ValueStore
if len(ipnsRecordGateways) != 0 {
routing = newProxyRouting(ipnsRecordGateways, cdns)
} else if len(kuboRPC) != 0 {
routing = newRPCProxyRouting(kuboRPC, cdns)
} else {
return nil, errors.New("either KUBO_RPC_URL, IPNS_RECORD_GATEWAY_URL or PROXY_GATEWAY_URL with support for application/vnd.ipfs.ipns-record must be provided in order to delegate IPNS routing")
}

// Sets up a cache to store blocks in
cacheBlockStore, err := lib.NewCacheBlockStore(blockCacheSize)
Expand Down Expand Up @@ -157,10 +166,13 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC
mux := http.NewServeMux()
mux.Handle("/ipfs/", ipfsHandler)
mux.Handle("/ipns/", ipnsHandler)
// TODO: below is legacy which we want to remove, measuring this separately
// allows us to decide when is the time to do it.
legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc")
mux.Handle("/api/v0/", legacyKuboRpcHandler)

if len(kuboRPC) != 0 {
// TODO: below is legacy which we want to remove, measuring this separately
// allows us to decide when is the time to do it.
legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc")
mux.Handle("/api/v0/", legacyKuboRpcHandler)
}

// Construct the HTTP handler for the gateway.
handler := withConnect(mux)
Expand Down
26 changes: 19 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ func main() {
}

const (
EnvKuboRPC = "KUBO_RPC_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
EnvGraphBackend = "GRAPH_BACKEND"
RequestIDHeader = "X-Bfid"
EnvKuboRPC = "KUBO_RPC_URL"
EnvIPNSRecordGateway = "IPNS_RECORD_GATEWAY_URL"
EnvBlockCacheSize = "BLOCK_CACHE_SIZE"
EnvGraphBackend = "GRAPH_BACKEND"
RequestIDHeader = "X-Bfid"
)

func init() {
Expand All @@ -57,7 +58,7 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
// Get env variables.
saturnOrchestrator := getEnv(EnvSaturnOrchestrator, "")
proxyGateway := getEnvs(EnvProxyGateway, "")
kuboRPC := getEnvs(EnvKuboRPC, DefaultKuboRPC)
kuboRPC := getEnvs(EnvKuboRPC, "")

blockCacheSize, err := getEnvInt(EnvBlockCacheSize, lib.DefaultCacheBlockStoreSize)
if err != nil {
Expand Down Expand Up @@ -107,7 +108,15 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
log.Fatalf("Unable to start. bifrost-gateway requires either PROXY_GATEWAY_URL or STRN_ORCHESTRATOR_URL to be set.\n\nRead docs at https://github.com/ipfs/bifrost-gateway/blob/main/docs/environment-variables.md\n\n")
}

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, gatewayPort, blockCacheSize, cdns, useGraphBackend)
// Prefer IPNS_RECORD_GATEWAY_URL when an explicit URL for IPNS routing is set
ipnsRecordGateway := getEnvs(EnvIPNSRecordGateway, "")
if len(ipnsRecordGateway) == 0 {
// Fallback to PROXY_GATEWAY_URL, assuming it is modern
// enough to support application/vnd.ipfs.ipns-record responses
ipnsRecordGateway = proxyGateway
}

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, ipnsRecordGateway, gatewayPort, blockCacheSize, cdns, useGraphBackend)
if err != nil {
return err
}
Expand All @@ -128,7 +137,10 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
log.Printf("%s: %d", EnvBlockCacheSize, blockCacheSize)
log.Printf("%s: %t", EnvGraphBackend, useGraphBackend)

log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " "))
if len(kuboRPC) != 0 {
log.Printf("Legacy RPC at /api/v0 (%s) provided by %s", EnvKuboRPC, strings.Join(kuboRPC, " "))
}

log.Printf("Path gateway listening on http://127.0.0.1:%d", gatewayPort)
log.Printf(" Smoke test (JPG): http://127.0.0.1:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", gatewayPort)
log.Printf("Subdomain gateway configured on dweb.link and http://localhost:%d", gatewayPort)
Expand Down
140 changes: 132 additions & 8 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type proxyRouting struct {
type rpcProxyRouting struct {
kuboRPC []string
httpClient *http.Client
rand *rand.Rand
}

func newProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {
func newRPCProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &proxyRouting{
return &rpcProxyRouting{
kuboRPC: kuboRPC,
httpClient: &http.Client{
Transport: otelhttp.NewTransport(&customTransport{
Expand All @@ -48,15 +48,15 @@ func newProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {
}
}

func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {
func (ps *rpcProxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {
return routing.ErrNotSupported
}

func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {
func (ps *rpcProxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {
return ps.fetch(ctx, k)
}

func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {
func (ps *rpcProxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}
Expand All @@ -76,7 +76,7 @@ func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routi
return ch, nil
}

func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) {
func (ps *rpcProxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) {
name, err := ipns.NameFromRoutingKey([]byte(key))
if err != nil {
return nil, err
Expand Down Expand Up @@ -151,6 +151,130 @@ func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err e
return rb, nil
}

func (ps *proxyRouting) getRandomKuboURL() string {
func (ps *rpcProxyRouting) getRandomKuboURL() string {
return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))]
}

type proxyRouting struct {
ipnsRecordGateways []string
httpClient *http.Client
rand *rand.Rand
}

func newProxyRouting(ipnsRecordGateways []string, cdns *cachedDNS) routing.ValueStore {
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &proxyRouting{
ipnsRecordGateways: ipnsRecordGateways,
httpClient: &http.Client{
Transport: otelhttp.NewTransport(&customTransport{
// RoundTripper with increased defaults than http.Transport such that retrieving
// multiple lookups concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
ForceAttemptHTTP2: true,
},
}),
},
rand: rand,
}
}

func (ps *proxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {
return routing.ErrNotSupported
}

func (ps *proxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}

name, err := ipns.NameFromRoutingKey([]byte(k))
if err != nil {
return nil, err
}

return ps.fetch(ctx, name)
}

func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}

name, err := ipns.NameFromRoutingKey([]byte(k))
if err != nil {
return nil, err
}

ch := make(chan []byte)

go func() {
v, err := ps.fetch(ctx, name)
if err != nil {
close(ch)
} else {
ch <- v
close(ch)
}
}()

return ch, nil
}

func (ps *proxyRouting) fetch(ctx context.Context, name ipns.Name) ([]byte, error) {
urlStr := fmt.Sprintf("%s/ipns/%s", ps.getRandomGatewayURL(), name.String())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.ipfs.ipns-record")

goLog.Debugw("routing proxy fetch", "key", name.String(), "from", req.URL.String())
defer func() {
if err != nil {
goLog.Debugw("routing proxy fetch error", "key", name.String(), "from", req.URL.String(), "error", err.Error())
}
}()

resp, err := ps.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status from remote gateway: %s", resp.Status)
}

rb, err := io.ReadAll(io.LimitReader(resp.Body, int64(ipns.MaxRecordSize)))
if err != nil {
return nil, err
}

rec, err := ipns.UnmarshalRecord(rb)
if err != nil {
return nil, err
}

err = ipns.ValidateWithName(rec, name)
if err != nil {
return nil, err
}

return rb, nil
}

func (ps *proxyRouting) getRandomGatewayURL() string {
return strings.TrimSuffix(ps.ipnsRecordGateways[ps.rand.Intn(len(ps.ipnsRecordGateways))], "/")
}

0 comments on commit f0bcbcb

Please sign in to comment.