Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unsealed sector for synthetic PoRep #295

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 110 additions & 79 deletions lib/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/lib/ffiselect"
"github.com/filecoin-project/curio/lib/proof"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"

// TODO everywhere here that we call this we should call our proxy instead.
ffi "github.com/filecoin-project/filecoin-ffi"
Expand Down Expand Up @@ -525,111 +525,137 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro

return newPath, nil
}
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()

func (sb *SealCalls) GenerateUnsealedSector(ctx context.Context, sector storiface.SectorRef, sectorPaths, pathIDs storiface.SectorPaths, keepUnsealed bool) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return xerrors.Errorf("getting sector size: %w", err)

}

if keepUnsealed {
// We are going to be moving the unsealed file, no need to allocate storage specifically for it
sectorPaths.Unsealed, err = changePathType(sectorPaths.Cache, storiface.FTUnsealed)
if err != nil {
return xerrors.Errorf("changing path type: %w", err)
// Return early if unsealed copy is not required
if !keepUnsealed {
return nil
}

// We are going to be moving the unsealed file, no need to allocate storage specifically for it
sectorPaths.Unsealed, err = changePathType(sectorPaths.Cache, storiface.FTUnsealed)
if err != nil {
return xerrors.Errorf("changing path type: %w", err)

}

pathIDs.Unsealed = pathIDs.Cache // this is just an uuid string

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %s", err)
}
}()

pathIDs.Unsealed = pathIDs.Cache // this is just an uuid string
// tree-d contains exactly unsealed data in the prefix, so
// * we move it to a temp file
// * we truncate the temp file to the sector size
// * we move the temp file to the unsealed location

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %+v", err)
}
}()

// tree-d contains exactly unsealed data in the prefix, so
// * we move it to a temp file
// * we truncate the temp file to the sector size
// * we move the temp file to the unsealed location

// temp path in cache where we'll move tree-d before truncating
// it is in the cache directory so that we can use os.Rename to move it
// to unsealed (which may be on a different filesystem)
tempUnsealed := filepath.Join(sectorPaths.Cache, storiface.SectorName(sector.ID))

_, terr := os.Stat(tempUnsealed)
tempUnsealedExists := terr == nil

// First handle an edge case where we have already gone through this step,
// but ClearCache or later steps failed. In that case we'll see tree-d missing and unsealed present

if _, err := os.Stat(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName)); err != nil {
if os.IsNotExist(err) {
// check that unsealed exists and is the right size
st, err := os.Stat(sectorPaths.Unsealed)
if err != nil {
if os.IsNotExist(err) {
if tempUnsealedExists {
// unsealed file does not exist, but temp unsealed file does
// so we can just resume where the previous attempt left off
goto retryUnsealedMove
}
return xerrors.Errorf("neither unsealed file nor temp-unsealed file exists")
// temp path in cache where we'll move tree-d before truncating
// it is in the cache directory so that we can use os.Rename to move it
// to unsealed (which may be on a different filesystem)
tempUnsealed := filepath.Join(sectorPaths.Cache, storiface.SectorName(sector.ID))

_, terr := os.Stat(tempUnsealed)
tempUnsealedExists := terr == nil

// First handle an edge case where we have already gone through this step,
// in that case we'll see tree-d missing and unsealed present

if _, err := os.Stat(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName)); err != nil {
if os.IsNotExist(err) {
// check that unsealed exists and is the right size
st, err := os.Stat(sectorPaths.Unsealed)
if err != nil {
if os.IsNotExist(err) {
if tempUnsealedExists {
// unsealed file does not exist, but temp unsealed file does
// so we can just resume where the previous attempt left off
return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)
}
return xerrors.Errorf("stat unsealed file: %w", err)
return xerrors.Errorf("neither unsealed file nor temp-unsealed file exists")

}
if st.Size() != int64(ssize) {
if tempUnsealedExists {
// unsealed file exists but is the wrong size, and temp unsealed file exists
// so we can just resume where the previous attempt left off with some cleanup
return xerrors.Errorf("stat unsealed file: %w", err)

if err := os.Remove(sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed file from last attempt: %w", err)
}
}
if st.Size() != int64(ssize) {
if tempUnsealedExists {
// unsealed file exists but is the wrong size, and temp unsealed file exists
// so we can just resume where the previous attempt left off with some cleanup

if err := os.Remove(sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed file from last attempt: %w", err)

goto retryUnsealedMove
}
return xerrors.Errorf("unsealed file is not the right size: %d != %d and temp unsealed is missing", st.Size(), ssize)

return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)

}
return xerrors.Errorf("unsealed file is not the right size: %d != %d and temp unsealed is missing", st.Size(), ssize)

// all good, just log that this edge case happened
log.Warnw("unsealed file exists but tree-d is missing, skipping move", "sector", sector.ID, "unsealed", sectorPaths.Unsealed, "cache", sectorPaths.Cache)
goto afterUnsealedMove
}
return xerrors.Errorf("stat tree-d file: %w", err)

// all good, just log that this edge case happened
log.Warnw("unsealed file exists but tree-d is missing, skipping move", "sector", sector.ID, "unsealed", sectorPaths.Unsealed, "cache", sectorPaths.Cache)
return nil
}
return xerrors.Errorf("stat tree-d file: %w", err)
}

// If the state in clean do the move
// If the state in clean do the move

// move tree-d to temp file
if err := os.Rename(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName), tempUnsealed); err != nil {
return xerrors.Errorf("moving tree-d to temp file: %w", err)
}
// move tree-d to temp file
if err := os.Rename(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName), tempUnsealed); err != nil {
return xerrors.Errorf("moving tree-d to temp file: %w", err)
}

retryUnsealedMove:
return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)
}

// truncate sealed file to sector size
if err := os.Truncate(tempUnsealed, int64(ssize)); err != nil {
return xerrors.Errorf("truncating unsealed file to sector size: %w", err)
}
func TruncateAndMoveUnsealed(tempUnsealed, unsealed string, ssize abi.SectorSize) error {
// truncate sealed file to sector size
if err := os.Truncate(tempUnsealed, int64(ssize)); err != nil {
return xerrors.Errorf("truncating unsealed file to sector size: %w", err)
}

// move temp file to unsealed location
if err := paths.Move(tempUnsealed, sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("move temp unsealed sector to final location (%s -> %s): %w", tempUnsealed, sectorPaths.Unsealed, err)
}
// move temp file to unsealed location
if err := paths.Move(tempUnsealed, unsealed); err != nil {
return xerrors.Errorf("move temp unsealed sector to final location (%s -> %s): %w", tempUnsealed, unsealed, err)
}
return nil
}

func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}

defer releaseSector()

ssize, err := sector.ProofType.SectorSize()
if err != nil {
return xerrors.Errorf("getting sector size: %w", err)
}

afterUnsealedMove:
// If this is a synthetic proof sector then unsealed should already exist otherwise generate it
if abi.Synthetic[sector.ProofType] {
if err = ffi.ClearSyntheticProofs(uint64(ssize), sectorPaths.Cache); err != nil {
if err := ffi.ClearSyntheticProofs(uint64(ssize), sectorPaths.Cache); err != nil {
return xerrors.Errorf("Unable to delete Synth cache: %w", err)
}
} else {
if err := sb.GenerateUnsealedSector(ctx, sector, sectorPaths, pathIDs, keepUnsealed); err != nil {
return xerrors.Errorf("generating unsealed copy of the sector: %w", err)
}
}

if err := ffi.ClearCache(uint64(ssize), sectorPaths.Cache); err != nil {
Expand Down Expand Up @@ -753,7 +779,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
return nil
}

func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error {
func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo, keepUnsealed bool) error {
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
Expand All @@ -780,6 +806,11 @@ func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.Task
}
}

// Generate unsealed copy before clearing cache
if err = sb.GenerateUnsealedSector(ctx, sector, fspaths, pathIDs, keepUnsealed); err != nil {
return xerrors.Errorf("generating unsealed sector: %w", err)
}

if err = ffi.ClearCache(uint64(ssize), fspaths.Cache); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure ClearCache will work when tree-d is not present? I imagine it should have no issues, but I'm actually not 100% certain, so we should test this first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good idea. @Reiers Can we test this on calibnet?

return xerrors.Errorf("failed to clear cache for synthetic proof of sector %d of miner %d", sector.ID.Miner, sector.ID.Number)
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/slotmgr"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"
)

type FinalizeTask struct {
Expand Down
10 changes: 8 additions & 2 deletions tasks/seal/task_synth_proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/filecoin-project/curio/lib/dealdata"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/paths"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"
)

type SyntheticProofTask struct {
Expand Down Expand Up @@ -71,6 +71,12 @@ func (s *SyntheticProofTask) Do(taskID harmonytask.TaskID, stillOwned func() boo
return true, nil
}

var keepUnsealed bool

if err := s.db.QueryRow(ctx, `SELECT COALESCE(BOOL_OR(NOT data_delete_on_finalize), FALSE) FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber).Scan(&keepUnsealed); err != nil {
return false, err
}

sealed, err := cid.Parse(sectorParams.SealedCID)
if err != nil {
return false, xerrors.Errorf("failed to parse sealed cid: %w", err)
Expand All @@ -94,7 +100,7 @@ func (s *SyntheticProofTask) Do(taskID harmonytask.TaskID, stillOwned func() boo
return false, xerrors.Errorf("getting deal data: %w", err)
}

err = s.sc.SyntheticProofs(ctx, &taskID, sref, sealed, unsealed, sectorParams.TicketValue, dealData.PieceInfos)
err = s.sc.SyntheticProofs(ctx, &taskID, sref, sealed, unsealed, sectorParams.TicketValue, dealData.PieceInfos, keepUnsealed)
if err != nil {
serr := resetSectorSealingState(ctx, sectorParams.SpID, sectorParams.SectorNumber, err, s.db, s.TypeDetails().Name)
if serr != nil {
Expand Down