Skip to content

Commit

Permalink
fix: show task waiting in PoRep pipeline (#260)
Browse files Browse the repository at this point in the history
* show task waiting in PoRep pipeline

* fix cluster task sorting order

* docker lotus version

* add remaining wait task in UI

* fix null handling

* convert to a single query
  • Loading branch information
LexLuthr authored Oct 9, 2024
1 parent b0ac13e commit 053902f
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ build_lotus?=0
curio_docker_user?=curio
curio_base_image=$(curio_docker_user)/curio-all-in-one:latest-debug
ffi_from_source?=0
lotus_version?=v1.28.1
lotus_version?=v1.29.0

ifeq ($(build_lotus),1)
# v1: building lotus image with provided lotus version
Expand Down
8 changes: 7 additions & 1 deletion tasks/unseal/task_unseal_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails {
ssize = abi.SectorSize(2 << 20)
}

return harmonytask.TaskTypeDetails{
res := harmonytask.TaskTypeDetails{
Max: taskhelp.Max(t.max),
Name: "UnsealDecode",
Cost: resources.Resources{
Expand All @@ -177,6 +177,12 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails {
return t.schedule(context.Background(), taskFunc)
}),
}

if isDevnet {
res.Cost.Ram = 1 << 30
}

return res
}

func (t *TaskUnsealDecode) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error {
Expand Down
263 changes: 203 additions & 60 deletions web/api/webrpc/pipeline_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,56 @@ type PipelineTask struct {

CreateTime time.Time `db:"create_time"`

TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
StartedSDR bool `db:"started_sdr"`

TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
StartedTreeD bool `db:"started_tree_d"`

TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
StartedTreeRC bool `db:"started_tree_rc"`

TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`

TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
StartedSynthetic bool `db:"started_synthetic"`

TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
StartedPrecommitMsg bool `db:"started_precommit_msg"`

AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
SeedEpoch *int64 `db:"seed_epoch"`

TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
StartedPoRep bool `db:"started_porep"`

TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
StartedFinalize bool `db:"started_finalize"`

TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
StartedMoveStorage bool `db:"started_move_storage"`

TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
StartedCommitMsg bool `db:"started_commit_msg"`

AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`

Failed bool `db:"failed"`
FailedReason string `db:"failed_reason"`
}

func (pt PipelineTask) sectorID() abi.SectorID {
return abi.SectorID{Miner: abi.ActorID(pt.SpID), Number: abi.SectorNumber(pt.SectorNumber)}
MissingTasks []int64 `db:"missing_tasks"`
AllTasks []int64 `db:"all_tasks"`
}

type sectorListEntry struct {
Expand All @@ -74,9 +82,6 @@ type sectorListEntry struct {
AfterSeed bool

ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool

MissingTasks []int64
AllTasks []int64
}

type minerBitfields struct {
Expand All @@ -87,36 +92,183 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
var tasks []PipelineTask

err := a.deps.DB.Select(ctx, &tasks, `SELECT
sp_id, sector_number,
create_time,
task_id_sdr, after_sdr,
task_id_tree_d, after_tree_d,
task_id_tree_c, after_tree_c,
task_id_tree_r, after_tree_r,
task_id_synth, after_synth,
task_id_precommit_msg, after_precommit_msg,
after_precommit_msg_success, seed_epoch,
task_id_porep, porep_proof, after_porep,
task_id_finalize, after_finalize,
task_id_move_storage, after_move_storage,
task_id_commit_msg, after_commit_msg,
after_commit_msg_success,
failed, failed_reason
FROM sectors_sdr_pipeline order by sp_id, sector_number`) // todo where constrain list
sp.sp_id,
sp.sector_number,
sp.create_time,
sp.task_id_sdr,
sp.after_sdr,
sp.task_id_tree_d,
sp.after_tree_d,
sp.task_id_tree_c,
sp.after_tree_c,
sp.task_id_tree_r,
sp.after_tree_r,
sp.task_id_synth,
sp.after_synth,
sp.task_id_precommit_msg,
sp.after_precommit_msg,
sp.after_precommit_msg_success,
sp.seed_epoch,
sp.task_id_porep,
sp.porep_proof,
sp.after_porep,
sp.task_id_finalize,
sp.after_finalize,
sp.task_id_move_storage,
sp.after_move_storage,
sp.task_id_commit_msg,
sp.after_commit_msg,
sp.after_commit_msg_success,
sp.failed,
sp.failed_reason,
-- Compute StartedSDR
CASE
WHEN NOT after_tree_d AND task_id_sdr IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_sdr AND owner_id > 0
)
ELSE FALSE
END AS started_sdr,
-- Compute StartedTreeD
CASE
WHEN after_sdr AND NOT after_tree_d AND task_id_tree_d IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_tree_d AND owner_id > 0
)
ELSE FALSE
END AS started_tree_d,
-- Compute StartedTreeRC
CASE
WHEN after_tree_d AND NOT after_tree_c AND task_id_tree_c IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_tree_c AND owner_id > 0
)
ELSE FALSE
END AS started_tree_rc,
-- Compute StartedSynthetic
CASE
WHEN after_tree_c AND NOT after_synth AND task_id_synth IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_synth AND owner_id > 0
)
ELSE FALSE
END AS started_synthetic,
-- Compute StartedPrecommitMsg
CASE
WHEN after_synth AND NOT after_precommit_msg AND task_id_precommit_msg IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_precommit_msg AND owner_id > 0
)
ELSE FALSE
END AS started_precommit_msg,
-- Compute StartedPoRep
CASE
WHEN after_precommit_msg AND NOT after_porep AND task_id_porep IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_porep AND owner_id > 0
)
ELSE FALSE
END AS started_porep,
-- Compute StartedFinalize
CASE
WHEN after_porep AND NOT after_finalize AND task_id_finalize IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_finalize AND owner_id > 0
)
ELSE FALSE
END AS started_finalize,
-- Compute StartedCommitMsg
CASE
WHEN after_porep AND NOT after_commit_msg AND task_id_commit_msg IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_commit_msg AND owner_id > 0
)
ELSE FALSE
END AS started_commit_msg,
-- Compute StartedMoveStorage
CASE
WHEN after_finalize AND NOT after_move_storage AND task_id_move_storage IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_move_storage AND owner_id > 0
)
ELSE FALSE
END AS started_move_storage,
-- Collect all task IDs into an array without NULLs
(
SELECT array_agg(task_id)
FROM (
VALUES
(sp.task_id_sdr),
(sp.task_id_tree_d),
(sp.task_id_tree_c),
(sp.task_id_tree_r),
(sp.task_id_synth),
(sp.task_id_precommit_msg),
(sp.task_id_porep),
(sp.task_id_finalize),
(sp.task_id_move_storage),
(sp.task_id_commit_msg)
) AS t(task_id)
WHERE task_id IS NOT NULL
) AS all_tasks,
-- Compute missing tasks without NULLs
(
SELECT array_agg(task_id)
FROM (
SELECT task_id
FROM unnest(ARRAY[
sp.task_id_sdr,
sp.task_id_tree_d,
sp.task_id_tree_c,
sp.task_id_tree_r,
sp.task_id_synth,
sp.task_id_precommit_msg,
sp.task_id_porep,
sp.task_id_finalize,
sp.task_id_move_storage,
sp.task_id_commit_msg
]) AS task_id
LEFT JOIN harmony_task ht ON ht.id = task_id
WHERE task_id IS NOT NULL AND ht.id IS NULL
) AS missing
) AS missing_tasks
FROM sectors_sdr_pipeline sp
ORDER BY sp_id, sector_number;
`) // todo where constrain list
if err != nil {
return nil, xerrors.Errorf("failed to fetch pipeline tasks: %w", err)
}

missingTasks, err := a.pipelinePorepMissingTasks(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err)
}

missingTasksMap := make(map[abi.SectorID]porepMissingTask)
for _, mt := range missingTasks {
missingTasksMap[mt.sectorID()] = mt
}

head, err := a.deps.Chain.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch chain head: %w", err)
Expand Down Expand Up @@ -147,12 +299,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e

afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch)

var missingTasks, allTasks []int64
if mt, ok := missingTasksMap[task.sectorID()]; ok {
missingTasks = mt.MissingTaskIDs
allTasks = mt.AllTaskIDs
}

sectorList = append(sectorList, sectorListEntry{
PipelineTask: task,
Address: addr,
Expand All @@ -164,9 +310,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))),
ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))),
ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))),

MissingTasks: missingTasks,
AllTasks: allTasks,
})
}

Expand Down
3 changes: 2 additions & 1 deletion web/api/webrpc/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (a *WebRPC) ClusterTaskSummary(ctx context.Context) ([]TaskSummary, error)
err := a.deps.DB.Select(ctx, &ts, `SELECT
t.id as id, t.name as name, t.update_time as since_posted, t.owner_id as owner_id, hm.host_and_port as owner
FROM harmony_task t LEFT JOIN harmony_machines hm ON hm.id = t.owner_id
ORDER BY t.update_time ASC, t.owner_id`)
ORDER BY
CASE WHEN t.owner_id IS NULL THEN 1 ELSE 0 END, t.update_time ASC`)
if err != nil {
return nil, err // Handle error
}
Expand Down
Loading

0 comments on commit 053902f

Please sign in to comment.