Skip to content

Commit

Permalink
separate pdp task config
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 4, 2024
1 parent 7280b3c commit 5c3f546
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 13 deletions.
8 changes: 7 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks

import (
"context"
"github.com/filecoin-project/curio/tasks/pdp"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -242,6 +243,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
}

if cfg.Subsystems.EnablePDP {
pdpNotifTask := pdp.NewPDPNotifyTask(db)
activeTasks = append(activeTasks, pdpNotifTask)
}

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg)
activeTasks = append(activeTasks, indexingTask, ipniTask)
Expand Down Expand Up @@ -361,7 +367,7 @@ func addSealingTasks(
moveStorageTask := seal.NewMoveStorageTask(sp, slr, db, cfg.Subsystems.MoveStorageMaxTasks)
moveStorageSnapTask := snap.NewMoveStorageTask(slr, db, cfg.Subsystems.MoveStorageMaxTasks)

storePieceTask, err := piece2.NewStorePieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.MoveStorageMaxTasks)
storePieceTask, err := piece2.NewStorePieceTask(db, must.One(slrLazy.Val()), stor, cfg.Subsystems.MoveStorageMaxTasks)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ type CurioSubsystemsConfig struct {
// EnableDealMarket enabled the deal market on the node. This would also enable libp2p on the node, if configured.
EnableDealMarket bool

// Enable handling for PDP (proof-of-data possession) deals / proving on this node.
// PDP deals allow the node to directly store and prove unsealed data with "PDP Services" like Storacha.
// This feature is BETA and should only be enabled on nodes which are part of a PDP network.
EnablePDP bool

// EnableCommP enables the commP task on te node. CommP is calculated before sending PublishDealMessage for a Mk12 deal
// Must have EnableDealMarket = True
EnableCommP bool
Expand Down
2 changes: 1 addition & 1 deletion lib/dealdata/dealdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s
reader, _ := padreader.New(pr, uint64(*p.DataRawSize))
pieceReaders = append(pieceReaders, reader)
} else {
reader, _ := padreader.New(NewUrlReader(dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize))
reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize))
pieceReaders = append(pieceReaders, reader)
}

Expand Down
13 changes: 11 additions & 2 deletions lib/dealdata/urlpiecereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ type UrlPieceReader struct {
Headers http.Header
RawSize int64 // the exact number of bytes read, if we read more or less that's an error

RemoteEndpointReader paths.Remote // Only used for .ReadRemote which issues http requests for internal /remote endpoints
RemoteEndpointReader *paths.Remote // Only used for .ReadRemote which issues http requests for internal /remote endpoints

readSoFar int64
closed bool
active io.ReadCloser // auto-closed on EOF
}

func NewUrlReader(p string, h http.Header, rs int64) *UrlPieceReader {
func NewUrlReader(rmt *paths.Remote, p string, h http.Header, rs int64) *UrlPieceReader {
return &UrlPieceReader{
Url: p,
RawSize: rs,
Headers: h,

RemoteEndpointReader: rmt,
}
}

Expand All @@ -51,6 +53,10 @@ func (u *UrlPieceReader) initiateRequest() error {
}

if goUrl.Scheme == CustoreScheme {
if u.RemoteEndpointReader == nil {
return xerrors.New("RemoteEndpoint is nil")
}

goUrl.Scheme = "http"
u.active, err = u.RemoteEndpointReader.ReadRemote(context.Background(), goUrl.String(), 0, 0)
if err != nil {
Expand Down Expand Up @@ -138,6 +144,9 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
func (u *UrlPieceReader) Close() error {
if !u.closed {
u.closed = true
if u.active == nil {
return nil
}
return u.active.Close()
}

Expand Down
4 changes: 2 additions & 2 deletions lib/paths/local_stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func (st *Local) ServeAndRemove(ctx context.Context, id uuid.UUID) (io.ReadClose
continue
}

st.localLk.RUnlock()

stashDir := filepath.Join(p.local, StashDirName)
stashFilePath := filepath.Join(stashDir, fileName)
f, err := os.Open(stashFilePath)
if err == nil {
st.localLk.RUnlock()

// Wrap the file in a custom ReadCloser
return &stashFileReadCloser{
File: f,
Expand Down
4 changes: 4 additions & 0 deletions tasks/pdp/notify_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type PDPNotifyTask struct {
db *harmonydb.DB
}

func NewPDPNotifyTask(db *harmonydb.DB) *PDPNotifyTask {
return &PDPNotifyTask{db: db}
}

func (t *PDPNotifyTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

Expand Down
16 changes: 9 additions & 7 deletions tasks/piece/task_park_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var PieceParkPollInterval = time.Second * 15
// ParkPieceTask gets a piece from some origin, and parks it in storage
// Pieces are always f00, piece ID is mapped to pieceCID in the DB
type ParkPieceTask struct {
db *harmonydb.DB
sc *ffi2.SealCalls
db *harmonydb.DB
sc *ffi2.SealCalls
remote *paths.Remote

TF promise.Promise[harmonytask.AddTaskFunc]

Expand All @@ -38,17 +39,18 @@ type ParkPieceTask struct {
}

func NewParkPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int) (*ParkPieceTask, error) {
return newPieceTask(db, sc, max, false)
return newPieceTask(db, sc, nil, max, false)
}

func NewStorePieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int) (*ParkPieceTask, error) {
return newPieceTask(db, sc, max, true)
func NewStorePieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, remote *paths.Remote, max int) (*ParkPieceTask, error) {
return newPieceTask(db, sc, remote, max, true)
}

func newPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int, longTerm bool) (*ParkPieceTask, error) {
func newPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, remote *paths.Remote, max int, longTerm bool) (*ParkPieceTask, error) {
pt := &ParkPieceTask{
db: db,
sc: sc,
remote: remote,
max: max,
longTerm: longTerm,
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d
if err != nil {
return false, xerrors.Errorf("unmarshaling reference data headers: %w", err)
}
upr := dealdata.NewUrlReader(refData[i].DataURL, hdrs, pieceData.PieceRawSize)
upr := dealdata.NewUrlReader(p.remote, refData[i].DataURL, hdrs, pieceData.PieceRawSize)

defer func() {
_ = upr.Close()
Expand Down

0 comments on commit 5c3f546

Please sign in to comment.