Skip to content

Commit

Permalink
car server: In S3 mode use public libp2p-http-over-http proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 20, 2024
1 parent 7b67058 commit 3d4b23a
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 26 deletions.
172 changes: 171 additions & 1 deletion rbdeal/car_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package rbdeal

import (
"context"
"encoding/json"
"errors"
"fmt"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"io"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -156,7 +161,7 @@ func (r *ribs) verify(ctx context.Context, token string) (carRequestToken, error
return payload, nil
}

func (r *ribs) makeCarRequestToken(ctx context.Context, group int64, timeout time.Duration, carSize int64, deal uuid.UUID) ([]byte, error) {
func (r *ribs) makeCarRequestToken(group int64, timeout time.Duration, carSize int64, deal uuid.UUID) ([]byte, error) {
p := carRequestToken{
Group: group,
Timeout: time.Now().Add(timeout).Unix(),
Expand All @@ -167,6 +172,171 @@ func (r *ribs) makeCarRequestToken(ctx context.Context, group int64, timeout tim
return jwt.Sign(&p, jwtKey)
}

func (r *ribs) makeCarRequest(group int64, timeout time.Duration, carSize int64, deal uuid.UUID) (types.Transfer, error) {
reqToken, err := r.makeCarRequestToken(group, timeout, carSize, deal)
if err != nil {
return types.Transfer{}, xerrors.Errorf("make car request token: %w", err)
}

prefAddrs := getPreferredAddrs(r.host)

transferParams := &types.HttpRequest{URL: "libp2p://" + prefAddrs[0].String() + "/p2p/" + r.host.ID().String()} // todo get from autonat / config
transferParams.Headers = map[string]string{
"Authorization": string(reqToken),
}

paramsBytes, err := json.Marshal(transferParams)
if err != nil {
return types.Transfer{}, fmt.Errorf("marshalling request parameters: %w", err)
}

transfer := types.Transfer{
Type: "libp2p",
Params: paramsBytes,
Size: uint64(carSize),
}

if os.Getenv("S3_ENDPOINT") != "" {
// with s3 we don't need to transfer the data, use a p2p proxy for more reliable connectivity
transfer, err = convertLibp2pTransferToProxy(transfer, r.host.ID(), prefAddrs)
if err != nil {
return types.Transfer{}, fmt.Errorf("convert libp2p transfer to proxy: %w", err)
}
}

return transfer, nil
}

// ConvertLibp2pTransferToProxy takes a libp2p Transfer and outputs a proxied HTTP Transfer
func convertLibp2pTransferToProxy(transfer types.Transfer, peerid peer.ID, prefAddrs []ma.Multiaddr) (types.Transfer, error) {
// Check that the transfer is of Type "libp2p"
if transfer.Type != "libp2p" {
return types.Transfer{}, fmt.Errorf("transfer is not of type libp2p")
}

// Unmarshal the Params field to get the HttpRequest
var httpReq types.HttpRequest
err := json.Unmarshal(transfer.Params, &httpReq)
if err != nil {
return types.Transfer{}, fmt.Errorf("unmarshal transfer Params: %w", err)
}

// Construct new URL pointing to the proxy server
proxyURL := fmt.Sprintf("https://libp2p.me/%s/", peerid.String())

// Create new Headers, including the X-Multiaddr headers
headers := httpReq.Headers
if headers == nil {
headers = make(map[string]string)
}

// Add X-Multiaddr headers
headers["X-Multiaddr"] = ""
for _, maddr := range prefAddrs {
headers["X-Multiaddr"] = headers["X-Multiaddr"] + maddr.String() + ","
}
headers["X-Multiaddr"] = strings.TrimSuffix(headers["X-Multiaddr"], ",")

// Add X-P2P-Protocol header
headers["X-P2P-Protocol"] = types.DataTransferProtocol

// Create new HttpRequest with the proxy URL and updated headers
newHttpReq := types.HttpRequest{
URL: proxyURL,
Headers: headers,
}

// Marshal the new HttpRequest to Params
newParams, err := json.Marshal(newHttpReq)
if err != nil {
return types.Transfer{}, fmt.Errorf("marshal new HttpRequest: %w", err)
}

// Create new Transfer object of type "http"
newTransfer := types.Transfer{
Type: "http",
Params: newParams,
Size: transfer.Size,
}

return newTransfer, nil
}

func getPreferredAddrs(h host.Host) []ma.Multiaddr {
type addrWithPref struct {
addr ma.Multiaddr
pref int
}

var addrs []addrWithPref

for _, addr := range h.Addrs() {
// Default preference for 'other' addresses
pref := 4

// Extract the protocols from the multiaddress
protocols := addr.Protocols()

// Flags to identify the type of address
isDNS := false
isIP4 := false
isIP6 := false

for _, p := range protocols {
switch p.Code {
case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR:
isDNS = true
case ma.P_IP4:
isIP4 = true
case ma.P_IP6:
isIP6 = true
}
}

// Handle DNS addresses
if isDNS {
pref = 1
addrs = append(addrs, addrWithPref{addr: addr, pref: pref})
continue
}

// Skip private addresses
if manet.IsPrivateAddr(addr) {
continue
}

// Handle public IP addresses
if isIP4 {
pref = 2
} else if isIP6 {
pref = 3
}

addrs = append(addrs, addrWithPref{addr: addr, pref: pref})
}

if len(addrs) == 0 {
for _, a := range h.Addrs() {
addrs = append(addrs, addrWithPref{addr: a, pref: 4})
}

log.Errorw("no non-private addresses found, using all addresses", "addrs", h.Addrs())
}

// Sort the addresses based on the preference
sort.SliceStable(addrs, func(i, j int) bool {
return addrs[i].pref < addrs[j].pref
})

// Extract the sorted multiaddresses
var sortedAddrs []ma.Multiaddr
for _, ap := range addrs {
sortedAddrs = append(sortedAddrs, ap.addr)
}

return sortedAddrs
}

func (r *ribs) handleCarRequest(w http.ResponseWriter, req *http.Request) {
if req.Header.Get("Authorization") == "" {
log.Errorw("car request auth: no auth header", "url", req.URL)
Expand Down
4 changes: 4 additions & 0 deletions rbdeal/deal_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ CREATE INDEX IF NOT EXISTS idx_deals_provider ON deals(provider_addr, group_id,
CREATE INDEX IF NOT EXISTS idx_deals_group ON deals(group_id, rejected, start_time);
CREATE INDEX IF NOT EXISTS idx_deals_retrieval ON deals(last_retrieval_check, last_retrieval_check_success);
CREATE INDEX IF NOT EXISTS idx_deals_start_time_rejected_failed
ON deals (rejected, failed, start_time);
CREATE TABLE IF NOT EXISTS schema_version (
version_number INTEGER PRIMARY KEY,
description TEXT,
Expand Down
2 changes: 1 addition & 1 deletion rbdeal/deal_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context) error {

if gs.TotalDeals-gs.FailedDeals-gs.Unretrievable < int64(targetReplicaCount) {
go func(gid ribs2.GroupKey) {
err := r.makeMoreDeals(context.TODO(), gid, r.host, r.wallet)
err := r.makeMoreDeals(context.TODO(), gid, r.wallet)
if err != nil {
log.Errorf("starting new deals: %s", err)
}
Expand Down
28 changes: 5 additions & 23 deletions rbdeal/group_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rbdeal
import (
"bytes"
"context"
"encoding/json"
"fmt"
gobig "math/big"
"time"
Expand All @@ -20,7 +19,6 @@ import (
ctypes "github.com/filecoin-project/lotus/chain/types"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/host"
iface "github.com/lotus-web3/ribs"
"github.com/lotus-web3/ribs/ributil"
types "github.com/lotus-web3/ribs/ributil/boosttypes"
Expand All @@ -37,7 +35,7 @@ func (e ErrRejected) Error() string {
return fmt.Sprintf("deal proposal rejected: %s", e.Reason)
}

func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host, w *ributil.LocalWallet) error {
func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, w *ributil.LocalWallet) error {
r.dealsLk.Lock()
if _, ok := r.moreDealsLocks[id]; ok {
r.dealsLk.Unlock()
Expand Down Expand Up @@ -163,27 +161,11 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host
}

// generate transfer token
reqToken, err := r.makeCarRequestToken(context.TODO(), id, time.Hour*36, dealInfo.CarSize, dealUuid)
transfer, err := r.makeCarRequest(id, time.Hour*36, dealInfo.CarSize, dealUuid)
if err != nil {
return xerrors.Errorf("make car request token: %w", err)
}

transferParams := &types.HttpRequest{URL: "libp2p://" + h.Addrs()[0].String() + "/p2p/" + h.ID().String()} // todo get from autonat / config
transferParams.Headers = map[string]string{
"Authorization": string(reqToken),
}

paramsBytes, err := json.Marshal(transferParams)
if err != nil {
return fmt.Errorf("marshalling request parameters: %w", err)
}

transfer := types.Transfer{
Type: "libp2p",
Params: paramsBytes,
Size: uint64(dealInfo.CarSize),
}

dealParams := types.DealParams{
DealUUID: dealUuid,
ClientDealProposal: *dealProposal,
Expand Down Expand Up @@ -220,7 +202,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host
return fmt.Errorf("price %d is greater than max price %f", price, maxToPay)
}

if err := h.Connect(ctx, *addrInfo); err != nil {
if err := r.host.Connect(ctx, *addrInfo); err != nil {
err = r.db.StoreRejectedDeal(di.DealUUID, fmt.Sprintf("failed to connect to miner: %s", err), 0)
if err != nil {
return fmt.Errorf("saving rejected deal info: %w", err)
Expand All @@ -229,7 +211,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host
return xerrors.Errorf("connect to miner: %w", err)
}

x, err := h.Peerstore().FirstSupportedProtocol(addrInfo.ID, DealProtocolv120)
x, err := r.host.Peerstore().FirstSupportedProtocol(addrInfo.ID, DealProtocolv120)
if err != nil {
err = r.db.StoreRejectedDeal(di.DealUUID, fmt.Sprintf("failed to connect to miner: %s", err), 0)
if err != nil {
Expand All @@ -251,7 +233,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host

// MAKE THE DEAL

s, err := h.NewStream(ctx, addrInfo.ID, DealProtocolv120)
s, err := r.host.NewStream(ctx, addrInfo.ID, DealProtocolv120)
if err != nil {
err = r.db.StoreRejectedDeal(di.DealUUID, xerrors.Errorf("opening deal proposal stream: %w", err).Error(), 0)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rbdeal/ribs.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (r *ribs) onSub(group iface.GroupKey, from, to iface.GroupState) {
}

go func() {
err = r.makeMoreDeals(context.TODO(), group, r.host, r.wallet)
err = r.makeMoreDeals(context.TODO(), group, r.wallet)
if err != nil {
log.Errorf("starting new deals: %s", err)
}
Expand Down

0 comments on commit 3d4b23a

Please sign in to comment.