From 82d66d594f4efd03aa760ecac9d0010865198812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 5 Nov 2024 11:51:39 +0700 Subject: [PATCH 1/2] fix: Improve supraseal batch ref tracking --- cmd/curio/tasks/tasks.go | 4 +- lib/slotmgr/slotmgr.go | 241 +++++++++++++++++++++++++++--- tasks/seal/task_finalize.go | 37 +---- tasks/sealsupra/task_supraseal.go | 41 ++--- 4 files changed, 239 insertions(+), 84 deletions(-) diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index d29a37ab8..3b86b7818 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -266,15 +266,13 @@ func addSealingTasks( } if cfg.Subsystems.EnableBatchSeal { - slotMgr = slotmgr.NewSlotMgr() - batchSealTask, err := sealsupra.NewSupraSeal( cfg.Seal.BatchSealSectorSize, cfg.Seal.BatchSealBatchSize, cfg.Seal.BatchSealPipelines, !cfg.Seal.SingleHasherPerThread, cfg.Seal.LayerNVMEDevices, - machineHostPort, slotMgr, db, full, stor, si) + machineHostPort, db, full, stor, si) if err != nil { return nil, xerrors.Errorf("setting up batch sealer: %w", err) } diff --git a/lib/slotmgr/slotmgr.go b/lib/slotmgr/slotmgr.go index f8b9445ed..568d90c09 100644 --- a/lib/slotmgr/slotmgr.go +++ b/lib/slotmgr/slotmgr.go @@ -2,42 +2,241 @@ package slotmgr import ( "context" + "sync" + "time" + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" "go.opencensus.io/stats" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/curio/harmony/harmonydb" ) +var WatchInterval = 5 * time.Minute + +var log = logging.Logger("slotmgr") + +type slot struct { + slotOffset uint64 + sectors map[abi.SectorID]struct{} + + // work is true when the slot is actively used for batch sealing (P1/P2) + // false when not is use AND when sectors in the slot are waiting for finalization + // When work is set to false, slot.sectors should be a superset of batch refs in the DB + // and a local process should periodically check the db for removed refs and remove them from the slot + work bool +} + type SlotMgr struct { - Slots chan uint64 + db *harmonydb.DB + machine string + + // in use + slots []*slot + + lk sync.Mutex + cond *sync.Cond +} + +func NewSlotMgr(db *harmonydb.DB, machineHostAndPort string, slotOffs []uint64) (*SlotMgr, error) { + slots := make([]*slot, len(slotOffs)) + for i := range slots { + slots[i] = &slot{ + sectors: map[abi.SectorID]struct{}{}, + } + } + + var slotRefs []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + PipelineSlot uint64 `db:"pipeline_slot"` + } + + err := db.Select(context.Background(), &slotRefs, `SELECT sp_id, sector_number, pipeline_slot as count FROM batch_sector_refs WHERE machine_host_and_port = $2`, machineHostAndPort) + if err != nil { + return nil, xerrors.Errorf("getting slot refs: %w", err) + } + + for _, ref := range slotRefs { + slot, found := lo.Find(slots, func(st *slot) bool { + return st.slotOffset == ref.PipelineSlot + }) + if !found { + return nil, xerrors.Errorf("slot %d not found", ref.PipelineSlot) + } + + slot.sectors[abi.SectorID{ + Miner: abi.ActorID(ref.SpID), + Number: abi.SectorNumber(ref.SectorNumber), + }] = struct{}{} + } + + stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(len(slotOffs)))) + sm := &SlotMgr{ + db: db, + machine: machineHostAndPort, + + slots: slots, + } + sm.cond = sync.NewCond(&sm.lk) + + go sm.watchSlots() + + return sm, nil +} + +func (s *SlotMgr) watchSlots() { + for { + time.Sleep(WatchInterval) + if err := s.watchSingle(); err != nil { + log.Errorf("watchSingle failed: %s", err) + } + } +} + +func (s *SlotMgr) watchSingle() error { + s.lk.Lock() + defer s.lk.Unlock() + + for _, slt := range s.slots { + if slt.work || len(slt.sectors) == 0 { + // only watch slots which are NOT worked on and have sectors + continue + } + + var refs []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + } + + err := s.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number FROM batch_sector_refs WHERE pipeline_slot = $1 AND machine_host_and_port = $2`, slt.slotOffset, s.machine) + if err != nil { + return xerrors.Errorf("getting refs: %w", err) + } + + // find refs which are in the slot but not in the db + for id := range slt.sectors { + found := false + for _, ref := range refs { + if (abi.SectorID{ + Miner: abi.ActorID(ref.SpID), + Number: abi.SectorNumber(ref.SectorNumber), + }) == id { + found = true + break + } + } + + if !found { + log.Warnf("slot %d: removing local sector ref %d", slt.slotOffset, id) + delete(slt.sectors, id) + } + } + + if len(slt.sectors) == 0 { + s.cond.Signal() + } + } + + return nil } -const maxPipelines = 2 +// Get allocates a slot for work. Called from a batch sealing task +func (s *SlotMgr) Get(ids []abi.SectorID) uint64 { + s.lk.Lock() -func NewSlotMgr() *SlotMgr { - slots := make(chan uint64, maxPipelines) - stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(maxPipelines))) - return &SlotMgr{slots} + for { + for _, slt := range s.slots { + if len(slt.sectors) == 0 { + for _, id := range ids { + slt.sectors[id] = struct{}{} + } + slt.work = true + + s.lk.Unlock() + + stats.Record(context.Background(), SlotMgrMeasures.SlotsAcquired.M(1)) + stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available()))) + + return slt.slotOffset + } + } + + s.cond.Wait() + } } -func (s *SlotMgr) Get() uint64 { - slot := <-s.Slots - stats.Record(context.Background(), SlotMgrMeasures.SlotsAcquired.M(1)) - stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available()))) - return slot +// MarkWorkDone marks a slot as no longer being actively used for batch sealing +// This is when sectors start waiting for finalization (After C1 outputs were produced) +func (s *SlotMgr) MarkWorkDone(slotOff uint64) error { + s.lk.Lock() + defer s.lk.Unlock() + + sl, found := lo.Find(s.slots, func(st *slot) bool { + return st.slotOffset == slotOff + }) + if !found { + return xerrors.Errorf("slot %d not found", slotOff) + } + + sl.work = false + return nil } -func (s *SlotMgr) Put(slot uint64) error { - select { - case s.Slots <- slot: - stats.Record(context.Background(), SlotMgrMeasures.SlotsReleased.M(1)) - stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available()))) - return nil - default: - stats.Record(context.Background(), SlotMgrMeasures.SlotErrors.M(1)) - return xerrors.Errorf("slot list full, max %d", cap(s.Slots)) +// AbortSlot marks a slot which was used for work as immediately free +func (s *SlotMgr) AbortSlot(slotOff uint64) error { + s.lk.Lock() + defer s.lk.Unlock() + + sl, found := lo.Find(s.slots, func(st *slot) bool { + return st.slotOffset == slotOff + }) + if !found { + return xerrors.Errorf("slot %d not found", slotOff) + } + + sl.sectors = map[abi.SectorID]struct{}{} + sl.work = false + s.cond.Signal() + return nil +} + +func (s *SlotMgr) SectorDone(ctx context.Context, slotOff uint64, id abi.SectorID) error { + _, err := s.db.Exec(ctx, `DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, id.Miner, id.Number) + if err != nil { + return xerrors.Errorf("deleting batch refs: %w", err) + } + + s.lk.Lock() + defer s.lk.Unlock() + + sl, found := lo.Find(s.slots, func(st *slot) bool { + return st.slotOffset == slotOff + }) + if !found { + return xerrors.Errorf("slot %d not found", slotOff) + } + + delete(sl.sectors, id) + if len(sl.sectors) == 0 { + s.cond.Signal() } + return nil } func (s *SlotMgr) Available() int { - return len(s.Slots) + s.lk.Lock() + defer s.lk.Unlock() + + var available int + for _, slt := range s.slots { + if len(slt.sectors) == 0 { + available++ + } + } + + return available } diff --git a/tasks/seal/task_finalize.go b/tasks/seal/task_finalize.go index 13dff17a0..fcbb1a1d4 100644 --- a/tasks/seal/task_finalize.go +++ b/tasks/seal/task_finalize.go @@ -152,41 +152,8 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do if f.slots != nil { // batch handling part 2: - // delete from batch_sector_refs - var freeSlot bool - - _, err = f.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - _, err = tx.Exec(`DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("deleting batch refs: %w", err) - } - - // get sector ref count, if zero free the pipeline slot - var count []struct { - Count int64 `db:"count"` - } - err = tx.Select(&count, `SELECT COUNT(1) as count FROM batch_sector_refs WHERE machine_host_and_port = $1 AND pipeline_slot = $2`, ownedBy[0].HostAndPort, refs[0].PipelineSlot) - if err != nil { - return false, xerrors.Errorf("getting batch ref count: %w", err) - } - - if count[0].Count == 0 { - freeSlot = true - } else { - log.Infow("Not freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort, "remaining", count[0].Count) - } - - return true, nil - }, harmonydb.OptionRetry()) - if err != nil { - return false, xerrors.Errorf("deleting batch refs: %w", err) - } - - if freeSlot { - log.Infow("Freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort) - if err := f.slots.Put(uint64(refs[0].PipelineSlot)); err != nil { - return false, xerrors.Errorf("freeing slot: %w", err) - } + if err := f.slots.SectorDone(ctx, uint64(refs[0].PipelineSlot), sector.ID); err != nil { + return false, xerrors.Errorf("mark batch ref done: %w", err) } } diff --git a/tasks/sealsupra/task_supraseal.go b/tasks/sealsupra/task_supraseal.go index 3db7e1791..f056c644f 100644 --- a/tasks/sealsupra/task_supraseal.go +++ b/tasks/sealsupra/task_supraseal.go @@ -60,7 +60,7 @@ type SupraSeal struct { } func NewSupraSeal(sectorSize string, batchSize, pipelines int, dualHashers bool, nvmeDevices []string, machineHostAndPort string, - slots *slotmgr.SlotMgr, db *harmonydb.DB, api SupraSealNodeAPI, storage *paths.Remote, sindex paths.SectorIndex) (*SupraSeal, error) { + db *harmonydb.DB, api SupraSealNodeAPI, storage *paths.Remote, sindex paths.SectorIndex) (*SupraSeal, error) { var spt abi.RegisteredSealProof switch sectorSize { case "32GiB": @@ -115,31 +115,16 @@ func NewSupraSeal(sectorSize string, batchSize, pipelines int, dualHashers bool, return nil, xerrors.Errorf("not enough space for %d pipelines (can do %d), only %d pages available, want %d (slot size %d) pages", pipelines, maxPipelines, space, slotSize*uint64(pipelines), slotSize) } + var slotOffs []uint64 for i := 0; i < pipelines; i++ { slot := slotSize * uint64(i) - - var slotRefs []struct { - Count int `db:"count"` - } - - err := db.Select(context.Background(), &slotRefs, `SELECT COUNT(*) as count FROM batch_sector_refs WHERE pipeline_slot = $1 AND machine_host_and_port = $2`, slot, machineHostAndPort) - if err != nil { - return nil, xerrors.Errorf("getting slot refs: %w", err) - } - - if len(slotRefs) > 0 { - if slotRefs[0].Count > 0 { - log.Infow("slot already in use", "slot", slot, "refs", slotRefs[0].Count) - continue - } - } - log.Infow("batch slot", "slot", slot, "machine", machineHostAndPort) + slotOffs = append(slotOffs, slot) + } - err = slots.Put(slot) - if err != nil { - return nil, xerrors.Errorf("putting slot: %w", err) - } + slots, err := slotmgr.NewSlotMgr(db, machineHostAndPort, slotOffs) + if err != nil { + return nil, xerrors.Errorf("creating slot manager: %w", err) } return &SupraSeal{ @@ -195,12 +180,14 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done outPaths := make([]supraffi.Path, len(sectors)) outPathIDs := make([]storiface.SectorPaths, len(sectors)) alloc := storiface.FTSealed | storiface.FTCache + sectorsIDs := make([]abi.SectorID, len(sectors)) for i, t := range sectors { sid := abi.SectorID{ Miner: abi.ActorID(t.SpID), Number: abi.SectorNumber(t.SectorNumber), } + sectorsIDs = append(sectorsIDs, sid) // cleanup any potential previous failed attempts if err := s.storage.Remove(ctx, sid, storiface.FTSealed, true, nil); err != nil { @@ -250,10 +237,10 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } s.inSDR.Lock() - slot := s.slots.Get() + slot := s.slots.Get(sectorsIDs) cleanup := func() { - perr := s.slots.Put(slot) + perr := s.slots.AbortSlot(slot) if perr != nil { log.Errorf("putting slot back: %s", err) } @@ -280,7 +267,7 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done s.inSDR.Unlock() s.outSDR.Lock() cleanup = func() { - perr := s.slots.Put(slot) + perr := s.slots.AbortSlot(slot) if perr != nil { log.Errorf("putting slot back: %s", err) } @@ -397,6 +384,10 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // NOTE: We're not releasing the slot yet, we keep it until sector Finalize } + if err := s.slots.MarkWorkDone(slot); err != nil { + return true, xerrors.Errorf("marking work done: %w", err) + } + return true, nil } From fe9d6a05f305037d12b2e47b715d3eb1f7a326fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 5 Nov 2024 11:51:39 +0700 Subject: [PATCH 2/2] fix: Improve supraseal batch ref tracking --- lib/slotmgr/slotmgr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/slotmgr/slotmgr.go b/lib/slotmgr/slotmgr.go index 568d90c09..a2115ba8f 100644 --- a/lib/slotmgr/slotmgr.go +++ b/lib/slotmgr/slotmgr.go @@ -55,7 +55,7 @@ func NewSlotMgr(db *harmonydb.DB, machineHostAndPort string, slotOffs []uint64) PipelineSlot uint64 `db:"pipeline_slot"` } - err := db.Select(context.Background(), &slotRefs, `SELECT sp_id, sector_number, pipeline_slot as count FROM batch_sector_refs WHERE machine_host_and_port = $2`, machineHostAndPort) + err := db.Select(context.Background(), &slotRefs, `SELECT sp_id, sector_number, pipeline_slot as count FROM batch_sector_refs WHERE machine_host_and_port = $1`, machineHostAndPort) if err != nil { return nil, xerrors.Errorf("getting slot refs: %w", err) }