diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 94cdcfc68..eb58eb69c 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -3,6 +3,7 @@ package tasks import ( "context" + "fmt" "sort" "strings" "sync" @@ -21,6 +22,8 @@ import ( "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp/harmonycron" "github.com/filecoin-project/curio/lib/chainsched" "github.com/filecoin-project/curio/lib/curiochain" "github.com/filecoin-project/curio/lib/fastparamfetch" @@ -187,11 +190,20 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task "miner_addresses", minerAddresses, "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) + reg, err := resources.Register(db, dependencies.ListenAddr) + if err != nil { + return nil, fmt.Errorf("cannot get resources: %w", err) + } + + cron := harmonycron.New(db, reg.MachineID) + dependencies.At = cron.At + activeTasks = append([]harmonytask.TaskInterface{cron}, activeTasks...) + // harmony treats the first task as highest priority, so reverse the order // (we could have just appended to this list in the reverse order, but defining // tasks in pipeline order is more intuitive) - ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr) + ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr, reg) if err != nil { return nil, err } diff --git a/deps/deps.go b/deps/deps.go index 3f30ce79a..86fc7d4b0 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/BurntSushi/toml" "github.com/gbrlsnchs/jwt/v3" @@ -178,6 +179,7 @@ type Deps struct { LocalPaths *paths.BasicLocalStorage ListenAddr string Name string + At func(t time.Time, taskType, sqlTable string, sqlRowID int) } const ( diff --git a/go.mod b/go.mod index 68be56908..d2643caa4 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-jsonrpc v0.5.0 github.com/filecoin-project/go-padreader v0.0.1 - github.com/filecoin-project/go-state-types v0.14.0-dev + github.com/filecoin-project/go-state-types v0.14.0-rc1 github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/lotus v1.27.0 github.com/filecoin-project/specs-actors/v5 v5.0.6 diff --git a/go.sum b/go.sum index 83f21ead7..170175524 100644 --- a/go.sum +++ b/go.sum @@ -362,8 +362,8 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.13.1/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY= -github.com/filecoin-project/go-state-types v0.14.0-dev h1:bDwq1S28D7EC/uDmKU8vvNcdFw/YDsNq09pe3zeV5h4= -github.com/filecoin-project/go-state-types v0.14.0-dev/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY= +github.com/filecoin-project/go-state-types v0.14.0-rc1 h1:kWBGX/uqZmYotYMNmw+R/fIuot/k0KMcEtB7PKFy1SQ= +github.com/filecoin-project/go-state-types v0.14.0-rc1/go.mod h1:cHpOPup9H1g2T29dKHAjC2sc7/Ef5ypjuW9A3I+e9yY= github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statemachine v1.0.3 h1:N07o6alys+V1tNoSTi4WuuoeNC4erS/6jE74+NsgQuk= github.com/filecoin-project/go-statemachine v1.0.3/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54= diff --git a/harmony/harmonydb/sql/20240709-cron.sql b/harmony/harmonydb/sql/20240709-cron.sql new file mode 100644 index 000000000..7c3f120ef --- /dev/null +++ b/harmony/harmonydb/sql/20240709-cron.sql @@ -0,0 +1,15 @@ +CREATE TABLE harmony_cron ( + id SERIAL PRIMARY KEY NOT NULL, + task_id INTEGER NOT NULL REFERENCES harmony_task (id) ON DELETE CASCADE, + task_name VARCHAR(16) NOT NULL, + sql_table TEXT NOT NULL, + sql_row_id INTEGER NOT NULL +); + +CREATE OR REPLACE FUNCTION update_ext_taskid(table_name text, tid integer, id_value integer) +RETURNS void AS $$ +BEGIN + EXECUTE format('UPDATE %I SET task_id = $1 WHERE id=$2', table_name) + USING tid, id_value; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index 1027618e7..bc10f49b3 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -133,12 +133,9 @@ type TaskID int func New( db *harmonydb.DB, impls []TaskInterface, - hostnameAndPort string) (*TaskEngine, error) { + hostnameAndPort string, + reg *resources.Reg) (*TaskEngine, error) { - reg, err := resources.Register(db, hostnameAndPort) - if err != nil { - return nil, fmt.Errorf("cannot get resources: %w", err) - } ctx, grace := context.WithCancel(context.Background()) e := &TaskEngine{ ctx: ctx, diff --git a/harmony/taskhelp/harmonycron/doc.go b/harmony/taskhelp/harmonycron/doc.go new file mode 100644 index 000000000..e90698409 --- /dev/null +++ b/harmony/taskhelp/harmonycron/doc.go @@ -0,0 +1,16 @@ +/* +HarmonyCron starts tasks at a specified time & associates them with a row in a SQL table. +It supports: At(when time.Time, taskType, sqlTable string, sqlRowID int) + + which will add a task to the task engine at the specified time and associate it with the specified row. + +Operation: + + The cron-task will be picked up by Cron runners, which try to avoid being sealers or provers. + The cron-task is held until the specified time in seconds, then it completes after starting + the task in the task engine. + +Requirement: The sqlTable must have columns "id" and "task_id" to be able to update the task_id. +HarmonyCron is a regular harmonytask.TaskInterface task ran by all nodes. +*/ +package harmonycron diff --git a/harmony/taskhelp/harmonycron/harmonycron.go b/harmony/taskhelp/harmonycron/harmonycron.go new file mode 100644 index 000000000..86adc0711 --- /dev/null +++ b/harmony/taskhelp/harmonycron/harmonycron.go @@ -0,0 +1,95 @@ +package harmonycron + +import ( + "context" + "regexp" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" +) + +type Cron struct { + DB *harmonydb.DB + AddTaskFunc harmonytask.AddTaskFunc + me int +} + +func New(db *harmonydb.DB, myMachineID int) *Cron { + return &Cron{DB: db, me: myMachineID} +} + +// At schedules a task to be run at a specific time. +// The task will be added to the harmony_task table with the given taskType. +// The task will be associated with the given sqlTable at the specified sqlRowID. +// Note: sqlTable must use the standard columns "id" and "task_id" +func (c *Cron) At(t time.Time, taskType, sqlTable string, sqlRowID int) { + c.AddTaskFunc(func(tid harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + _, err := tx.Exec(`INSERT INTO harmony_cron (task_id, unixtime, task_type, sql_table, sql_row_id) VALUES ($1, $2, $3, $4, $5)`, + tid, t.Unix(), taskType, sqlTable, sqlRowID) + if harmonydb.IsErrUniqueContraint(err) { + return false, nil + } + return true, err + }) +} + +var validSQL = regexp.MustCompile(`^[a-zA-Z0-9_]+$`) + +func (c *Cron) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + var unixtime int64 + var taskType, sqlTable, sqlRowID string + err = c.DB.QueryRow(context.Background(), `SELECT unixtime, task_type, sql_table, sql_row_id + FROM harmony_cron WHERE task_id = $1`, taskID). + Scan(&unixtime, &taskType, &sqlTable, &sqlRowID) + if err != nil { + return false, xerrors.Errorf("getting cron task: %w", err) + } + if !validSQL.MatchString(sqlTable) { + return false, xerrors.Errorf("invalid sql table or column name") + } + time.Sleep(time.Until(time.Unix(unixtime, 0))) + if !stillOwned() { + return false, nil + } + _, err = c.DB.BeginTransaction(context.Background(), func(tx *harmonydb.Tx) (commit bool, err error) { + err = tx.QueryRow(`INSERT INTO harmony_task (name, added_by, posted_time) VALUES ($1) RETURNING id`, taskType, c.me, time.Now()).Scan(&taskID) + if err != nil { + return false, xerrors.Errorf("adding new task: %w", err) + } + _, err = tx.Exec(`SELECT update_ext_taskid($1, $2, $3)`, sqlTable, taskID, sqlRowID) + if err != nil { + return false, xerrors.Errorf("deleting cron task: %w", err) + } + return true, nil + }) + if err != nil { + return false, xerrors.Errorf("doing cron task: %w", err) + } + return true, nil +} + +func (c *Cron) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // TODO: avoid accepting if we are one of the busier nodes. + return &ids[0], nil +} + +func (c *Cron) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "Cron", + Cost: resources.Resources{ + Cpu: 0, + Ram: 5 << 20, + Gpu: 0, + }, + } +} + +func (c *Cron) Adder(f harmonytask.AddTaskFunc) { + c.AddTaskFunc = f +} + +var _ harmonytask.TaskInterface = &Cron{} diff --git a/itests/harmonycron_test.go b/itests/harmonycron_test.go new file mode 100644 index 000000000..a240d2891 --- /dev/null +++ b/itests/harmonycron_test.go @@ -0,0 +1,84 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/snadrus/must" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp/harmonycron" +) + +// TestHarmonyCron is Documentesting +func TestHarmonyCron(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbConfig := config.HarmonyDB{ + Hosts: []string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")}, + Database: "yugabyte", + Username: "yugabyte", + Password: "yugabyte", + Port: "5433", + } + db := must.One(harmonydb.NewFromConfigWithITestID(t, dbConfig, harmonydb.ITestNewID())) + hp := "localhost:1234" + reg := must.One(resources.Register(db, hp)) + cron := harmonycron.New(db, reg.MachineID) + + // FutureTask is the task we want to run in the future. + FutureTask := &FutureTask{db: db, Ch: make(chan string)} + _ = must.One(harmonytask.New(db, []harmonytask.TaskInterface{cron, FutureTask}, hp, reg)) + // harmonycron lives in Deps, but we are shortcutting things here. + // Above here is the setup `curio run`, below is the test ///////////////// + + var taskValueForTesting string + + // A tale of PreviousTask and FutureTask /story + // (p *PreviousTask) Do() { + { + taskValue := "abcde" // our future task needs this, so save it to that task's table + var rowID int64 + // Pretend harmony_test is FutureTask's table + err := db.QueryRow(ctx, "INSERT INTO harmony_test (options, result) VALUES ('FutureTaskTbl', $1) RETURNING id", taskValue).Scan(&rowID) + require.NoError(t, err) + // PreviousTask then schedules it sometime in the future. It will run AFTER this time. + cron.At(time.Now().Add(3*time.Second), "FutureTask", "harmony_test", int(rowID)) + // Now, PreviousTask's machine could go down, & someone's FutureTask runner will pick it up. + + taskValueForTesting = taskValue // hack for the test. Nothing to see here. + } + require.Equal(t, taskValueForTesting, <-FutureTask.Ch) +} + +type FutureTask struct { + db *harmonydb.DB + Ch chan string +} + +func (*FutureTask) Adder(harmonytask.AddTaskFunc) {} +func (*FutureTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := tids[0] + return &id, nil +} +func (w *FutureTask) Do(tID harmonytask.TaskID, stillMe func() bool) (bool, error) { + var taskValue string + err := w.db.QueryRow(context.Background(), "SELECT options FROM harmony_test WHERE task_id=$1", tID).Scan(&taskValue) + if err != nil { + return false, err + } + w.Ch <- taskValue + return false, nil +} +func (*FutureTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "FutureTask", + Cost: resources.Resources{}, // zeroes ok + } +}