Skip to content

Commit

Permalink
feat: Storage reservations in batch sealing
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 15, 2024
1 parent 5303e0a commit d4e2999
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 82 deletions.
6 changes: 3 additions & 3 deletions lib/ffi/piece_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, pieceID storiface.PieceNumber, size int64, data io.Reader) error {
// todo: config(?): allow setting PathStorage for this
// todo storage reservations
paths, _, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
paths, _, done, err := sb.Sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
if err != nil {
return err
}
Expand Down Expand Up @@ -68,9 +68,9 @@ func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID,
}

func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) {
return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
return sb.Sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
}

func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error {
return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
return sb.Sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
}
2 changes: 1 addition & 1 deletion lib/ffi/scrub_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func (sb *SealCalls) CheckUnsealedCID(ctx context.Context, s storiface.SectorRef) (cid.Cid, error) {
reader, err := sb.sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
reader, err := sb.Sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
if err != nil {
return cid.Undef, xerrors.Errorf("getting unsealed sector reader: %w", err)
}
Expand Down
47 changes: 29 additions & 18 deletions lib/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/samber/lo"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -43,19 +44,19 @@ type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cach
}
*/
type SealCalls struct {
sectors *storageProvider
Sectors *storageProvider

/*// externCalls cointain overrides for calling alternative sealing logic
externCalls ExternalSealer*/
}

func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{
sectors: &storageProvider{
Sectors: &storageProvider{
storage: st,
localStore: ls,
sindex: si,
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](),
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, []*StorageReservation](),
},
}
}
Expand All @@ -64,7 +65,7 @@ type storageProvider struct {
storage *paths.Remote
localStore *paths.Local
sindex paths.SectorIndex
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
storageReservations *xsync.MapOf[harmonytask.TaskID, []*StorageReservation]
}

func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(dontDeclare ...storiface.SectorFileType), err error) {
Expand All @@ -74,7 +75,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
var ok bool
var resv *StorageReservation
if taskID != nil {
resv, ok = l.storageReservations.Load(*taskID)
resvs, ok := l.storageReservations.Load(*taskID)
if ok {
resv, ok = lo.Find(resvs, func(res *StorageReservation) bool {

Check failure on line 80 in lib/ffi/sdr_funcs.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to ok (ineffassign)
return res.SectorRef.ID() == sector.ID
})
}
}
if ok && resv != nil {
if resv.Alloc != allocate || resv.Existing != existing {
Expand Down Expand Up @@ -144,7 +150,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
}

func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, into storiface.SectorFileType, sector storiface.SectorRef, ticket abi.SealRandomness, commDcid cid.Cid) error {
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -223,7 +229,7 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID

log.Debugw("ensureOneCopy", "sector", sid, "type", fileType, "keep", keepIn)

if err := sb.sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
if err := sb.Sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
return err
}
}
Expand All @@ -237,7 +243,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -352,7 +358,7 @@ func (sb *SealCalls) GenerateSynthPoRep() {
}

func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
vproof, err := sb.sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
vproof, err := sb.Sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
if err != nil {
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
}
Expand Down Expand Up @@ -498,7 +504,7 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof)
}

func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) {
return sb.sectors.localStore.Local(ctx)
return sb.Sectors.localStore.Local(ctx)
}

func changePathType(path string, newType storiface.SectorFileType) (string, error) {
Expand Down Expand Up @@ -526,7 +532,7 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro
return newPath, nil
}
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
sectorPaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand All @@ -548,7 +554,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
if err := sb.Sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %+v", err)
}
}()
Expand Down Expand Up @@ -666,11 +672,16 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef

var opts []storiface.AcquireOption
if taskID != nil {
resv, ok := sb.sectors.storageReservations.Load(*taskID)
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
// anyways, and before we start claiming other tasks.
if ok {
if len(resvs) != 1 {
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
}
resv := resvs[0]

defer resv.Release()

if resv.Alloc != storiface.FTNone {
Expand All @@ -684,13 +695,13 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
}
}

err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
if err != nil {
return xerrors.Errorf("moving storage: %w", err)
}

for _, fileType := range toMove.AllSet() {
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
Expand All @@ -699,7 +710,7 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
}

func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) {
stores, err := sb.sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
stores, err := sb.Sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
if err != nil {
return false, "", xerrors.Errorf("finding sector: %w", err)
}
Expand All @@ -718,7 +729,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec

// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(...storiface.SectorFileType), err error) {
fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
fsPath, pathID, releaseSector, err = sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -754,7 +765,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
}

func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error {
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down
19 changes: 12 additions & 7 deletions lib/ffi/snap_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (sb *SealCalls) EncodeUpdate(
noDecl = storiface.FTUnsealed
}

paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (sb *SealCalls) EncodeUpdate(

log.Debugw("get key data", "keyPath", keyPath, "keyCachePath", keyCachePath, "sectorID", sector.ID, "taskID", taskID)

r, err := sb.sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
r, err := sb.Sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sealed sector reader: %w", err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (sb *SealCalls) EncodeUpdate(

// fetch cache
var buf bytes.Buffer // usually 73.2 MiB
err = sb.sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
err = sb.Sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("reading cache: %w", err)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (sb *SealCalls) EncodeUpdate(
}

func (sb *SealCalls) ProveUpdate(ctx context.Context, proofType abi.RegisteredUpdateProof, sector storiface.SectorRef, key, sealed, unsealed cid.Cid) ([]byte, error) {
jsonb, err := sb.sectors.storage.ReadSnapVanillaProof(ctx, sector)
jsonb, err := sb.Sectors.storage.ReadSnapVanillaProof(ctx, sector)
if err != nil {
return nil, xerrors.Errorf("read snap vanilla proof: %w", err)
}
Expand Down Expand Up @@ -301,11 +301,16 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto

var opts []storiface.AcquireOption
if taskID != nil {
resv, ok := sb.sectors.storageReservations.Load(*taskID)
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
// anyways, and before we start claiming other tasks.
if ok {
if len(resvs) != 1 {
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
}
resv := resvs[0]

defer resv.Release()

if resv.Alloc != storiface.FTNone {
Expand All @@ -319,13 +324,13 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto
}
}

err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
if err != nil {
return xerrors.Errorf("moving storage: %w", err)
}

for _, fileType := range toMove.AllSet() {
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
Expand Down
Loading

0 comments on commit d4e2999

Please sign in to comment.