Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Storage reservations in batch sealing #327

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func addSealingTasks(
cfg.Seal.BatchSealPipelines,
!cfg.Seal.SingleHasherPerThread,
cfg.Seal.LayerNVMEDevices,
machineHostPort, slotMgr, db, full, stor, si)
machineHostPort, slotMgr, db, full, stor, si, slr)
if err != nil {
return nil, xerrors.Errorf("setting up batch sealer: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/ffi/piece_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, pieceID storiface.PieceNumber, size int64, data io.Reader) error {
// todo: config(?): allow setting PathStorage for this
// todo storage reservations
paths, _, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
paths, _, done, err := sb.Sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
if err != nil {
return err
}
Expand Down Expand Up @@ -68,9 +68,9 @@ func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID,
}

func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) {
return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
return sb.Sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
}

func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error {
return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
return sb.Sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
}
2 changes: 1 addition & 1 deletion lib/ffi/scrub_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func (sb *SealCalls) CheckUnsealedCID(ctx context.Context, s storiface.SectorRef) (cid.Cid, error) {
reader, err := sb.sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
reader, err := sb.Sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
if err != nil {
return cid.Undef, xerrors.Errorf("getting unsealed sector reader: %w", err)
}
Expand Down
47 changes: 29 additions & 18 deletions lib/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/puzpuzpuz/xsync/v2"
"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/harmony/harmonytask"
Expand Down Expand Up @@ -43,19 +44,19 @@ type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cach
}
*/
type SealCalls struct {
sectors *storageProvider
Sectors *storageProvider

/*// externCalls cointain overrides for calling alternative sealing logic
externCalls ExternalSealer*/
}

func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{
sectors: &storageProvider{
Sectors: &storageProvider{
storage: st,
localStore: ls,
sindex: si,
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](),
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, []*StorageReservation](),
},
}
}
Expand All @@ -64,7 +65,7 @@ type storageProvider struct {
storage *paths.Remote
localStore *paths.Local
sindex paths.SectorIndex
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
storageReservations *xsync.MapOf[harmonytask.TaskID, []*StorageReservation]
}

func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(dontDeclare ...storiface.SectorFileType), err error) {
Expand All @@ -74,7 +75,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
var ok bool
var resv *StorageReservation
if taskID != nil {
resv, ok = l.storageReservations.Load(*taskID)
resvs, rok := l.storageReservations.Load(*taskID)
if rok {
resv, ok = lo.Find(resvs, func(res *StorageReservation) bool {
return res.SectorRef.ID() == sector.ID
})
}
}
if ok && resv != nil {
if resv.Alloc != allocate || resv.Existing != existing {
Expand Down Expand Up @@ -144,7 +150,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, into storiface.SectorFileType, sector storiface.SectorRef, ticket abi.SealRandomness, commDcid cid.Cid) error {
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -223,7 +229,7 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID

log.Debugw("ensureOneCopy", "sector", sid, "type", fileType, "keep", keepIn)

if err := sb.sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
if err := sb.Sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
return err
}
}
Expand All @@ -237,7 +243,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -352,7 +358,7 @@ func (sb *SealCalls) GenerateSynthPoRep() {
}

func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
vproof, err := sb.sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
vproof, err := sb.Sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
if err != nil {
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
}
Expand Down Expand Up @@ -498,7 +504,7 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof)
}

func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) {
return sb.sectors.localStore.Local(ctx)
return sb.Sectors.localStore.Local(ctx)
}

func changePathType(path string, newType storiface.SectorFileType) (string, error) {
Expand Down Expand Up @@ -526,7 +532,7 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro
return newPath, nil
}
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
sectorPaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand All @@ -548,7 +554,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
if err := sb.Sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %+v", err)
}
}()
Expand Down Expand Up @@ -666,11 +672,16 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef

var opts []storiface.AcquireOption
if taskID != nil {
resv, ok := sb.sectors.storageReservations.Load(*taskID)
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
// anyways, and before we start claiming other tasks.
if ok {
if len(resvs) != 1 {
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
}
resv := resvs[0]

defer resv.Release()

if resv.Alloc != storiface.FTNone {
Expand All @@ -684,13 +695,13 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
}
}

err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
if err != nil {
return xerrors.Errorf("moving storage: %w", err)
}

for _, fileType := range toMove.AllSet() {
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
Expand All @@ -699,7 +710,7 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
}

func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) {
stores, err := sb.sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
stores, err := sb.Sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
if err != nil {
return false, "", xerrors.Errorf("finding sector: %w", err)
}
Expand All @@ -718,7 +729,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec

// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(...storiface.SectorFileType), err error) {
fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
fsPath, pathID, releaseSector, err = sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -754,7 +765,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
}

func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error {
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down
19 changes: 12 additions & 7 deletions lib/ffi/snap_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (sb *SealCalls) EncodeUpdate(
noDecl = storiface.FTUnsealed
}

paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (sb *SealCalls) EncodeUpdate(

log.Debugw("get key data", "keyPath", keyPath, "keyCachePath", keyCachePath, "sectorID", sector.ID, "taskID", taskID)

r, err := sb.sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
r, err := sb.Sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sealed sector reader: %w", err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (sb *SealCalls) EncodeUpdate(

// fetch cache
var buf bytes.Buffer // usually 73.2 MiB
err = sb.sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
err = sb.Sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("reading cache: %w", err)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (sb *SealCalls) EncodeUpdate(
}

func (sb *SealCalls) ProveUpdate(ctx context.Context, proofType abi.RegisteredUpdateProof, sector storiface.SectorRef, key, sealed, unsealed cid.Cid) ([]byte, error) {
jsonb, err := sb.sectors.storage.ReadSnapVanillaProof(ctx, sector)
jsonb, err := sb.Sectors.storage.ReadSnapVanillaProof(ctx, sector)
if err != nil {
return nil, xerrors.Errorf("read snap vanilla proof: %w", err)
}
Expand Down Expand Up @@ -301,11 +301,16 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto

var opts []storiface.AcquireOption
if taskID != nil {
resv, ok := sb.sectors.storageReservations.Load(*taskID)
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
// anyways, and before we start claiming other tasks.
if ok {
if len(resvs) != 1 {
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
}
resv := resvs[0]

defer resv.Release()

if resv.Alloc != storiface.FTNone {
Expand All @@ -319,13 +324,13 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto
}
}

err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
if err != nil {
return xerrors.Errorf("moving storage: %w", err)
}

for _, fileType := range toMove.AllSet() {
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
Expand Down
Loading