diff --git a/rbdeal/car_server.go b/rbdeal/car_server.go index 0e5193f..f16a3d4 100644 --- a/rbdeal/car_server.go +++ b/rbdeal/car_server.go @@ -2,11 +2,16 @@ package rbdeal import ( "context" + "encoding/json" "errors" "fmt" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "io" "net" "net/http" + "os" + "sort" "strconv" "strings" "sync/atomic" @@ -156,7 +161,7 @@ func (r *ribs) verify(ctx context.Context, token string) (carRequestToken, error return payload, nil } -func (r *ribs) makeCarRequestToken(ctx context.Context, group int64, timeout time.Duration, carSize int64, deal uuid.UUID) ([]byte, error) { +func (r *ribs) makeCarRequestToken(group int64, timeout time.Duration, carSize int64, deal uuid.UUID) ([]byte, error) { p := carRequestToken{ Group: group, Timeout: time.Now().Add(timeout).Unix(), @@ -167,6 +172,171 @@ func (r *ribs) makeCarRequestToken(ctx context.Context, group int64, timeout tim return jwt.Sign(&p, jwtKey) } +func (r *ribs) makeCarRequest(group int64, timeout time.Duration, carSize int64, deal uuid.UUID) (types.Transfer, error) { + reqToken, err := r.makeCarRequestToken(group, timeout, carSize, deal) + if err != nil { + return types.Transfer{}, xerrors.Errorf("make car request token: %w", err) + } + + prefAddrs := getPreferredAddrs(r.host) + + transferParams := &types.HttpRequest{URL: "libp2p://" + prefAddrs[0].String() + "/p2p/" + r.host.ID().String()} // todo get from autonat / config + transferParams.Headers = map[string]string{ + "Authorization": string(reqToken), + } + + paramsBytes, err := json.Marshal(transferParams) + if err != nil { + return types.Transfer{}, fmt.Errorf("marshalling request parameters: %w", err) + } + + transfer := types.Transfer{ + Type: "libp2p", + Params: paramsBytes, + Size: uint64(carSize), + } + + if os.Getenv("S3_ENDPOINT") != "" { + // with s3 we don't need to transfer the data, use a p2p proxy for more reliable connectivity + transfer, err = convertLibp2pTransferToProxy(transfer, r.host.ID(), prefAddrs) + if err != nil { + return types.Transfer{}, fmt.Errorf("convert libp2p transfer to proxy: %w", err) + } + } + + return transfer, nil +} + +// ConvertLibp2pTransferToProxy takes a libp2p Transfer and outputs a proxied HTTP Transfer +func convertLibp2pTransferToProxy(transfer types.Transfer, peerid peer.ID, prefAddrs []ma.Multiaddr) (types.Transfer, error) { + // Check that the transfer is of Type "libp2p" + if transfer.Type != "libp2p" { + return types.Transfer{}, fmt.Errorf("transfer is not of type libp2p") + } + + // Unmarshal the Params field to get the HttpRequest + var httpReq types.HttpRequest + err := json.Unmarshal(transfer.Params, &httpReq) + if err != nil { + return types.Transfer{}, fmt.Errorf("unmarshal transfer Params: %w", err) + } + + // Construct new URL pointing to the proxy server + proxyURL := fmt.Sprintf("https://libp2p.me/%s/", peerid.String()) + + // Create new Headers, including the X-Multiaddr headers + headers := httpReq.Headers + if headers == nil { + headers = make(map[string]string) + } + + // Add X-Multiaddr headers + headers["X-Multiaddr"] = "" + for _, maddr := range prefAddrs { + headers["X-Multiaddr"] = headers["X-Multiaddr"] + maddr.String() + "," + } + headers["X-Multiaddr"] = strings.TrimSuffix(headers["X-Multiaddr"], ",") + + // Add X-P2P-Protocol header + headers["X-P2P-Protocol"] = types.DataTransferProtocol + + // Create new HttpRequest with the proxy URL and updated headers + newHttpReq := types.HttpRequest{ + URL: proxyURL, + Headers: headers, + } + + // Marshal the new HttpRequest to Params + newParams, err := json.Marshal(newHttpReq) + if err != nil { + return types.Transfer{}, fmt.Errorf("marshal new HttpRequest: %w", err) + } + + // Create new Transfer object of type "http" + newTransfer := types.Transfer{ + Type: "http", + Params: newParams, + Size: transfer.Size, + } + + return newTransfer, nil +} + +func getPreferredAddrs(h host.Host) []ma.Multiaddr { + type addrWithPref struct { + addr ma.Multiaddr + pref int + } + + var addrs []addrWithPref + + for _, addr := range h.Addrs() { + // Default preference for 'other' addresses + pref := 4 + + // Extract the protocols from the multiaddress + protocols := addr.Protocols() + + // Flags to identify the type of address + isDNS := false + isIP4 := false + isIP6 := false + + for _, p := range protocols { + switch p.Code { + case ma.P_DNS, ma.P_DNS4, ma.P_DNS6, ma.P_DNSADDR: + isDNS = true + case ma.P_IP4: + isIP4 = true + case ma.P_IP6: + isIP6 = true + } + } + + // Handle DNS addresses + if isDNS { + pref = 1 + addrs = append(addrs, addrWithPref{addr: addr, pref: pref}) + continue + } + + // Skip private addresses + if manet.IsPrivateAddr(addr) { + continue + } + + // Handle public IP addresses + if isIP4 { + pref = 2 + } else if isIP6 { + pref = 3 + } + + addrs = append(addrs, addrWithPref{addr: addr, pref: pref}) + } + + if len(addrs) == 0 { + for _, a := range h.Addrs() { + addrs = append(addrs, addrWithPref{addr: a, pref: 4}) + } + + log.Errorw("no non-private addresses found, using all addresses", "addrs", h.Addrs()) + } + + // Sort the addresses based on the preference + sort.SliceStable(addrs, func(i, j int) bool { + return addrs[i].pref < addrs[j].pref + }) + + // Extract the sorted multiaddresses + var sortedAddrs []ma.Multiaddr + for _, ap := range addrs { + sortedAddrs = append(sortedAddrs, ap.addr) + } + + return sortedAddrs +} + func (r *ribs) handleCarRequest(w http.ResponseWriter, req *http.Request) { if req.Header.Get("Authorization") == "" { log.Errorw("car request auth: no auth header", "url", req.URL) diff --git a/rbdeal/deal_db.go b/rbdeal/deal_db.go index c03d4f6..fd5d64f 100644 --- a/rbdeal/deal_db.go +++ b/rbdeal/deal_db.go @@ -248,6 +248,10 @@ CREATE INDEX IF NOT EXISTS idx_deals_provider ON deals(provider_addr, group_id, CREATE INDEX IF NOT EXISTS idx_deals_group ON deals(group_id, rejected, start_time); CREATE INDEX IF NOT EXISTS idx_deals_retrieval ON deals(last_retrieval_check, last_retrieval_check_success); +CREATE INDEX IF NOT EXISTS idx_deals_start_time_rejected_failed + ON deals (rejected, failed, start_time); + + CREATE TABLE IF NOT EXISTS schema_version ( version_number INTEGER PRIMARY KEY, description TEXT, diff --git a/rbdeal/deal_tracker.go b/rbdeal/deal_tracker.go index afd5d4d..698ec71 100644 --- a/rbdeal/deal_tracker.go +++ b/rbdeal/deal_tracker.go @@ -286,7 +286,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context) error { if gs.TotalDeals-gs.FailedDeals-gs.Unretrievable < int64(targetReplicaCount) { go func(gid ribs2.GroupKey) { - err := r.makeMoreDeals(context.TODO(), gid, r.host, r.wallet) + err := r.makeMoreDeals(context.TODO(), gid, r.wallet) if err != nil { log.Errorf("starting new deals: %s", err) } diff --git a/rbdeal/group_deal.go b/rbdeal/group_deal.go index 0918b36..8feca92 100644 --- a/rbdeal/group_deal.go +++ b/rbdeal/group_deal.go @@ -3,7 +3,6 @@ package rbdeal import ( "bytes" "context" - "encoding/json" "fmt" gobig "math/big" "time" @@ -20,7 +19,6 @@ import ( ctypes "github.com/filecoin-project/lotus/chain/types" "github.com/google/uuid" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/host" iface "github.com/lotus-web3/ribs" "github.com/lotus-web3/ribs/ributil" types "github.com/lotus-web3/ribs/ributil/boosttypes" @@ -37,7 +35,7 @@ func (e ErrRejected) Error() string { return fmt.Sprintf("deal proposal rejected: %s", e.Reason) } -func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host, w *ributil.LocalWallet) error { +func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, w *ributil.LocalWallet) error { r.dealsLk.Lock() if _, ok := r.moreDealsLocks[id]; ok { r.dealsLk.Unlock() @@ -163,27 +161,11 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host } // generate transfer token - reqToken, err := r.makeCarRequestToken(context.TODO(), id, time.Hour*36, dealInfo.CarSize, dealUuid) + transfer, err := r.makeCarRequest(id, time.Hour*36, dealInfo.CarSize, dealUuid) if err != nil { return xerrors.Errorf("make car request token: %w", err) } - transferParams := &types.HttpRequest{URL: "libp2p://" + h.Addrs()[0].String() + "/p2p/" + h.ID().String()} // todo get from autonat / config - transferParams.Headers = map[string]string{ - "Authorization": string(reqToken), - } - - paramsBytes, err := json.Marshal(transferParams) - if err != nil { - return fmt.Errorf("marshalling request parameters: %w", err) - } - - transfer := types.Transfer{ - Type: "libp2p", - Params: paramsBytes, - Size: uint64(dealInfo.CarSize), - } - dealParams := types.DealParams{ DealUUID: dealUuid, ClientDealProposal: *dealProposal, @@ -220,7 +202,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host return fmt.Errorf("price %d is greater than max price %f", price, maxToPay) } - if err := h.Connect(ctx, *addrInfo); err != nil { + if err := r.host.Connect(ctx, *addrInfo); err != nil { err = r.db.StoreRejectedDeal(di.DealUUID, fmt.Sprintf("failed to connect to miner: %s", err), 0) if err != nil { return fmt.Errorf("saving rejected deal info: %w", err) @@ -229,7 +211,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host return xerrors.Errorf("connect to miner: %w", err) } - x, err := h.Peerstore().FirstSupportedProtocol(addrInfo.ID, DealProtocolv120) + x, err := r.host.Peerstore().FirstSupportedProtocol(addrInfo.ID, DealProtocolv120) if err != nil { err = r.db.StoreRejectedDeal(di.DealUUID, fmt.Sprintf("failed to connect to miner: %s", err), 0) if err != nil { @@ -251,7 +233,7 @@ func (r *ribs) makeMoreDeals(ctx context.Context, id iface.GroupKey, h host.Host // MAKE THE DEAL - s, err := h.NewStream(ctx, addrInfo.ID, DealProtocolv120) + s, err := r.host.NewStream(ctx, addrInfo.ID, DealProtocolv120) if err != nil { err = r.db.StoreRejectedDeal(di.DealUUID, xerrors.Errorf("opening deal proposal stream: %w", err).Error(), 0) if err != nil { diff --git a/rbdeal/ribs.go b/rbdeal/ribs.go index 835847d..19bb213 100644 --- a/rbdeal/ribs.go +++ b/rbdeal/ribs.go @@ -328,7 +328,7 @@ func (r *ribs) onSub(group iface.GroupKey, from, to iface.GroupState) { } go func() { - err = r.makeMoreDeals(context.TODO(), group, r.host, r.wallet) + err = r.makeMoreDeals(context.TODO(), group, r.wallet) if err != nil { log.Errorf("starting new deals: %s", err) }