Skip to content

Commit

Permalink
show task waiting in PoRep pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Oct 7, 2024
1 parent aa07266 commit 11ca825
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 23 deletions.
94 changes: 74 additions & 20 deletions web/api/webrpc/pipeline_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,48 @@ type PipelineTask struct {

CreateTime time.Time `db:"create_time"`

TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
StartedSDR bool

TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
StartedTreeD bool

TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
StartedTreeRC bool

TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`

TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
StartedSynthetic bool

TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
StartedPrecommitMsg bool

AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
SeedEpoch *int64 `db:"seed_epoch"`

TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
StartedPoRep bool

TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
StartedFinalize bool

TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
StartedMoveStorage bool

TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
StartedCommitMsg bool

AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`

Expand Down Expand Up @@ -117,6 +126,13 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
missingTasksMap[mt.sectorID()] = mt
}

for _, ta := range tasks {
err = a.GetFirstNotNullPipelineTask(ctx, &ta)
if err != nil {
return nil, err
}
}

head, err := a.deps.Chain.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch chain head: %w", err)
Expand Down Expand Up @@ -245,7 +261,7 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma
COUNT(*) FILTER (WHERE (after_tree_d = false OR after_tree_c = false OR after_tree_r = false) AND after_sdr = true) as CountTrees,
COUNT(*) FILTER (WHERE after_tree_r = true and after_precommit_msg = false) as CountPrecommitMsg,
COUNT(*) FILTER (WHERE after_precommit_msg_success = true AND seed_epoch > $1) as CountWaitSeed,
COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true) as CountPoRep,
COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true AND seed_epoch < $1) as CountPoRep,
COUNT(*) FILTER (WHERE after_commit_msg_success = false AND after_porep = true) as CountCommitMsg,
COUNT(*) FILTER (WHERE after_commit_msg_success = true) as CountDone,
COUNT(*) FILTER (WHERE failed = true) as CountFailed
Expand Down Expand Up @@ -360,3 +376,41 @@ func (a *WebRPC) pipelinePorepMissingTasks(ctx context.Context) ([]porepMissingT

return tasks, nil
}

func (a *WebRPC) GetFirstNotNullPipelineTask(ctx context.Context, p *PipelineTask) error {
tasks := []struct {
AfterFirst, AfterSecond, Started bool
Task *int64
}{
{!p.AfterTreeD, false, p.StartedSDR, p.TaskSDR},
{p.AfterSDR, !p.AfterTreeD, p.StartedTreeD, p.TaskTreeD},
{p.AfterTreeD, !p.AfterTreeC, p.StartedTreeRC, p.TaskTreeC},
{p.AfterTreeC, !p.AfterSynthetic, p.StartedSynthetic, p.TaskSynthetic},
{p.AfterSynthetic, !p.AfterPrecommitMsg, p.StartedPrecommitMsg, p.TaskPrecommitMsg},
{p.AfterPrecommitMsg, !p.AfterPoRep, p.StartedPoRep, p.TaskPoRep},
{p.AfterPoRep, !p.AfterFinalize, p.StartedFinalize, p.TaskFinalize},
{p.AfterPoRep, !p.AfterCommitMsg, p.StartedCommitMsg, p.TaskCommitMsg},
{p.AfterFinalize, !p.AfterMoveStorage, p.StartedMoveStorage, p.TaskMoveStorage},
}

for _, task := range tasks {
if task.AfterFirst && task.AfterSecond && task.Task != nil {
found, err := a.getOwner(ctx, *task.Task)
if err != nil {
return err
}
task.Started = found
}
}

return nil
}

func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) {
var owner int64
err := a.deps.DB.QueryRow(ctx, `SELECT owner_id FROM harmony_task WHERE id = $1`, id).Scan(&owner)
if err != nil {
return false, xerrors.Errorf("failed to fetch owner ID: %w", err)
}
return &owner != nil, nil

Check failure on line 415 in web/api/webrpc/pipeline_porep.go

View workflow job for this annotation

GitHub Actions / lint

SA4022: the address of a variable cannot be nil (staticcheck)
}
10 changes: 7 additions & 3 deletions web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
.pipeline-failed {
background-color: #603030;
}
.pipeline-waiting {
background-color: #d0d0d0;
}`
properties = {
sector: Object,
Expand Down Expand Up @@ -105,7 +109,7 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
<table class="porep-state">
<tbody>
<tr>
${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR)}
${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR, sector.StartedSDR)}
${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC)}
${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic)}
${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)}
Expand Down Expand Up @@ -149,14 +153,14 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
</td>
`;
}
renderSectorState(name, rowspan, sector, task, after) {
renderSectorState(name, rowspan, sector, task, after, started) {
if(task) {
// sector.MissingTasks is a list of tasks
// sector.MissingTasks.includes(task) is true if task is missing
let missing = sector.MissingTasks && sector.MissingTasks.includes(task);

return html`
<td rowspan="${rowspan}" class="${missing ? 'pipeline-failed' : 'pipeline-active'}">
<td rowspan="${rowspan}" class="${missing ? 'pipeline-failed' : (started ? 'pipeline-active' : 'pipeline-waiting')}">
<div>${name}</div>
<div>T:${task}</div>
${missing ? html`<div><b>FAILED</b></div>` : ''}
Expand Down

0 comments on commit 11ca825

Please sign in to comment.