Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/harmony cron #82

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks

import (
"context"
"fmt"
"sort"
"strings"
"sync"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp/harmonycron"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/lib/curiochain"
"github.com/filecoin-project/curio/lib/fastparamfetch"
Expand Down Expand Up @@ -187,11 +190,20 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
"miner_addresses", minerAddresses,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

reg, err := resources.Register(db, dependencies.ListenAddr)
if err != nil {
return nil, fmt.Errorf("cannot get resources: %w", err)
}

cron := harmonycron.New(db, reg.MachineID)
dependencies.At = cron.At
activeTasks = append([]harmonytask.TaskInterface{cron}, activeTasks...)

// harmony treats the first task as highest priority, so reverse the order
// (we could have just appended to this list in the reverse order, but defining
// tasks in pipeline order is more intuitive)

ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr, reg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/gbrlsnchs/jwt/v3"
Expand Down Expand Up @@ -178,6 +179,7 @@ type Deps struct {
LocalPaths *paths.BasicLocalStorage
ListenAddr string
Name string
At func(t time.Time, taskType, sqlTable string, sqlRowID int)
}

const (
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-jsonrpc v0.5.0
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-state-types v0.14.0-dev
github.com/filecoin-project/go-state-types v0.14.0-rc1
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/lotus v1.27.0
github.com/filecoin-project/specs-actors/v5 v5.0.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.13.1/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY=
github.com/filecoin-project/go-state-types v0.14.0-dev h1:bDwq1S28D7EC/uDmKU8vvNcdFw/YDsNq09pe3zeV5h4=
github.com/filecoin-project/go-state-types v0.14.0-dev/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY=
github.com/filecoin-project/go-state-types v0.14.0-rc1 h1:kWBGX/uqZmYotYMNmw+R/fIuot/k0KMcEtB7PKFy1SQ=
github.com/filecoin-project/go-state-types v0.14.0-rc1/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v1.0.3 h1:N07o6alys+V1tNoSTi4WuuoeNC4erS/6jE74+NsgQuk=
github.com/filecoin-project/go-statemachine v1.0.3/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54=
Expand Down
15 changes: 15 additions & 0 deletions harmony/harmonydb/sql/20240709-cron.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE harmony_cron (
id SERIAL PRIMARY KEY NOT NULL,
task_id INTEGER NOT NULL REFERENCES harmony_task (id) ON DELETE CASCADE,
task_name VARCHAR(16) NOT NULL,
sql_table TEXT NOT NULL,
sql_row_id INTEGER NOT NULL
);

CREATE OR REPLACE FUNCTION update_ext_taskid(table_name text, tid integer, id_value integer)
RETURNS void AS $$
BEGIN
EXECUTE format('UPDATE %I SET task_id = $1 WHERE id=$2', table_name)
USING tid, id_value;
END;
$$ LANGUAGE plpgsql;
7 changes: 2 additions & 5 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,9 @@ type TaskID int
func New(
db *harmonydb.DB,
impls []TaskInterface,
hostnameAndPort string) (*TaskEngine, error) {
hostnameAndPort string,
reg *resources.Reg) (*TaskEngine, error) {

reg, err := resources.Register(db, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("cannot get resources: %w", err)
}
ctx, grace := context.WithCancel(context.Background())
e := &TaskEngine{
ctx: ctx,
Expand Down
16 changes: 16 additions & 0 deletions harmony/taskhelp/harmonycron/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
HarmonyCron starts tasks at a specified time & associates them with a row in a SQL table.
It supports: At(when time.Time, taskType, sqlTable string, sqlRowID int)

which will add a task to the task engine at the specified time and associate it with the specified row.

Operation:

The cron-task will be picked up by Cron runners, which try to avoid being sealers or provers.
The cron-task is held until the specified time in seconds, then it completes after starting
the task in the task engine.

Requirement: The sqlTable must have columns "id" and "task_id" to be able to update the task_id.
HarmonyCron is a regular harmonytask.TaskInterface task ran by all nodes.
*/
package harmonycron
95 changes: 95 additions & 0 deletions harmony/taskhelp/harmonycron/harmonycron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package harmonycron

import (
"context"
"regexp"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
)

type Cron struct {
DB *harmonydb.DB
AddTaskFunc harmonytask.AddTaskFunc
me int
}

func New(db *harmonydb.DB, myMachineID int) *Cron {
return &Cron{DB: db, me: myMachineID}
}

// At schedules a task to be run at a specific time.
// The task will be added to the harmony_task table with the given taskType.
// The task will be associated with the given sqlTable at the specified sqlRowID.
// Note: sqlTable must use the standard columns "id" and "task_id"
func (c *Cron) At(t time.Time, taskType, sqlTable string, sqlRowID int) {
c.AddTaskFunc(func(tid harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`INSERT INTO harmony_cron (task_id, unixtime, task_type, sql_table, sql_row_id) VALUES ($1, $2, $3, $4, $5)`,
tid, t.Unix(), taskType, sqlTable, sqlRowID)
if harmonydb.IsErrUniqueContraint(err) {
return false, nil
}
return true, err
})
}

var validSQL = regexp.MustCompile(`^[a-zA-Z0-9_]+$`)

func (c *Cron) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var unixtime int64
var taskType, sqlTable, sqlRowID string
err = c.DB.QueryRow(context.Background(), `SELECT unixtime, task_type, sql_table, sql_row_id
FROM harmony_cron WHERE task_id = $1`, taskID).
Scan(&unixtime, &taskType, &sqlTable, &sqlRowID)
if err != nil {
return false, xerrors.Errorf("getting cron task: %w", err)
}
if !validSQL.MatchString(sqlTable) {
return false, xerrors.Errorf("invalid sql table or column name")
}
time.Sleep(time.Until(time.Unix(unixtime, 0)))
if !stillOwned() {
return false, nil
}
_, err = c.DB.BeginTransaction(context.Background(), func(tx *harmonydb.Tx) (commit bool, err error) {
err = tx.QueryRow(`INSERT INTO harmony_task (name, added_by, posted_time) VALUES ($1) RETURNING id`, taskType, c.me, time.Now()).Scan(&taskID)
if err != nil {
return false, xerrors.Errorf("adding new task: %w", err)
}
_, err = tx.Exec(`SELECT update_ext_taskid($1, $2, $3)`, sqlTable, taskID, sqlRowID)
if err != nil {
return false, xerrors.Errorf("deleting cron task: %w", err)
}
return true, nil
})
if err != nil {
return false, xerrors.Errorf("doing cron task: %w", err)
}
return true, nil
}

func (c *Cron) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// TODO: avoid accepting if we are one of the busier nodes.
return &ids[0], nil
}

func (c *Cron) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "Cron",
Cost: resources.Resources{
Cpu: 0,
Ram: 5 << 20,
Gpu: 0,
},
}
}

func (c *Cron) Adder(f harmonytask.AddTaskFunc) {
c.AddTaskFunc = f
}

var _ harmonytask.TaskInterface = &Cron{}
84 changes: 84 additions & 0 deletions itests/harmonycron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package itests

import (
"context"
"testing"
"time"

"github.com/snadrus/must"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp/harmonycron"
)

// TestHarmonyCron is Documentesting
func TestHarmonyCron(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dbConfig := config.HarmonyDB{
Hosts: []string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")},
Database: "yugabyte",
Username: "yugabyte",
Password: "yugabyte",
Port: "5433",
}
db := must.One(harmonydb.NewFromConfigWithITestID(t, dbConfig, harmonydb.ITestNewID()))
hp := "localhost:1234"
reg := must.One(resources.Register(db, hp))
cron := harmonycron.New(db, reg.MachineID)

// FutureTask is the task we want to run in the future.
FutureTask := &FutureTask{db: db, Ch: make(chan string)}
_ = must.One(harmonytask.New(db, []harmonytask.TaskInterface{cron, FutureTask}, hp, reg))
// harmonycron lives in Deps, but we are shortcutting things here.
// Above here is the setup `curio run`, below is the test /////////////////

var taskValueForTesting string

// A tale of PreviousTask and FutureTask /story
// (p *PreviousTask) Do() {
{
taskValue := "abcde" // our future task needs this, so save it to that task's table
var rowID int64
// Pretend harmony_test is FutureTask's table
err := db.QueryRow(ctx, "INSERT INTO harmony_test (options, result) VALUES ('FutureTaskTbl', $1) RETURNING id", taskValue).Scan(&rowID)
require.NoError(t, err)
// PreviousTask then schedules it sometime in the future. It will run AFTER this time.
cron.At(time.Now().Add(3*time.Second), "FutureTask", "harmony_test", int(rowID))
// Now, PreviousTask's machine could go down, & someone's FutureTask runner will pick it up.

taskValueForTesting = taskValue // hack for the test. Nothing to see here.
}
require.Equal(t, taskValueForTesting, <-FutureTask.Ch)
}

type FutureTask struct {
db *harmonydb.DB
Ch chan string
}

func (*FutureTask) Adder(harmonytask.AddTaskFunc) {}
func (*FutureTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := tids[0]
return &id, nil
}
func (w *FutureTask) Do(tID harmonytask.TaskID, stillMe func() bool) (bool, error) {
var taskValue string
err := w.db.QueryRow(context.Background(), "SELECT options FROM harmony_test WHERE task_id=$1", tID).Scan(&taskValue)
if err != nil {
return false, err
}
w.Ch <- taskValue
return false, nil
}
func (*FutureTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "FutureTask",
Cost: resources.Resources{}, // zeroes ok
}
}