diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index abd5f0c33..d29a37ab8 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -3,6 +3,7 @@ package tasks import ( "context" + "os" "sort" "strings" "sync" @@ -32,6 +33,7 @@ import ( "github.com/filecoin-project/curio/lib/paths" "github.com/filecoin-project/curio/lib/slotmgr" "github.com/filecoin-project/curio/lib/storiface" + "github.com/filecoin-project/curio/tasks/f3" "github.com/filecoin-project/curio/tasks/gc" "github.com/filecoin-project/curio/tasks/message" "github.com/filecoin-project/curio/tasks/metadata" @@ -152,6 +154,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task inclCkTask := winning.NewInclusionCheckTask(db, full) activeTasks = append(activeTasks, winPoStTask, inclCkTask) + if os.Getenv("CURIO_DISABLE_F3") != "1" { + f3Task := f3.NewF3Task(db, full, maddrs) + activeTasks = append(activeTasks, f3Task) + } + // Warn if also running a sealing task if cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || cfg.Subsystems.EnablePoRepProof || cfg.Subsystems.EnableMoveStorage || cfg.Subsystems.EnableSendCommitMsg || cfg.Subsystems.EnableUpdateEncode || cfg.Subsystems.EnableUpdateProve || cfg.Subsystems.EnableUpdateSubmit { log.Error("It's unsafe to run PoSt and sealing tasks concurrently.") diff --git a/deps/apiinfo.go b/deps/apiinfo.go index 1ac663ae0..e40b420f4 100644 --- a/deps/apiinfo.go +++ b/deps/apiinfo.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/curio/api" + lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" cliutil "github.com/filecoin-project/lotus/cli/util" ) @@ -80,9 +81,13 @@ func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, json type contextKey string +var retryNodeKey = contextKey("retry-node") + +// OnSingleNode returns a new context that will try to perform all calls on the same node. +// If the backing node fails, the calls will be retried on a different node, and further calls will be made on that node. // Not thread safe func OnSingleNode(ctx context.Context) context.Context { - return context.WithValue(ctx, contextKey("retry-node"), new(*int)) + return context.WithValue(ctx, retryNodeKey, new(*int)) } type httpHead struct { @@ -96,7 +101,7 @@ var RPCErrors = jsonrpc.NewErrors() func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.Chain, jsonrpc.ClientCloser, error) { var res api.ChainStruct closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin", - api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(RPCErrors)}, opts...)...) + api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(lapi.RPCErrors)}, opts...)...) return &res, closer, err } @@ -228,12 +233,11 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) { // for calls that need to be performed on the same node // primarily for miner when calling create block and submit block subsequently - key := contextKey("retry-node") - if ctx.Value(key) != nil { - if (*ctx.Value(key).(**int)) == nil { - *ctx.Value(key).(**int) = preferredProvider + if ctx.Value(retryNodeKey) != nil { + if (*ctx.Value(retryNodeKey).(**int)) == nil { + *ctx.Value(retryNodeKey).(**int) = preferredProvider } else { - preferredProvider = *ctx.Value(key).(**int) + preferredProvider = *ctx.Value(retryNodeKey).(**int) } } diff --git a/go.mod b/go.mod index 3c91dcc0f..319364744 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/filecoin-project/go-commp-utils v0.1.4 github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8 github.com/filecoin-project/go-commp-utils/v2 v2.1.0 + github.com/filecoin-project/go-f3 v0.7.0 github.com/filecoin-project/go-fil-commcid v0.2.0 github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f @@ -53,6 +54,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.2.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 + github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/libp2p/go-buffer-pool v0.1.0 github.com/manifoldco/promptui v0.9.0 @@ -131,7 +133,6 @@ require ( github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 // indirect github.com/filecoin-project/go-clock v0.1.0 // indirect github.com/filecoin-project/go-crypto v0.1.0 // indirect - github.com/filecoin-project/go-f3 v0.7.0 // indirect github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 // indirect @@ -205,7 +206,6 @@ require ( github.com/jessevdk/go-flags v1.4.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect diff --git a/harmony/harmonydb/sql/20241021-f3.sql b/harmony/harmonydb/sql/20241021-f3.sql new file mode 100644 index 000000000..4dd78e881 --- /dev/null +++ b/harmony/harmonydb/sql/20241021-f3.sql @@ -0,0 +1,7 @@ +CREATE TABLE f3_tasks ( + sp_id BIGINT PRIMARY KEY, + task_id BIGINT UNIQUE, + previous_ticket BYTEA, + + FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE SET NULL +); diff --git a/harmony/taskhelp/common.go b/harmony/taskhelp/common.go index eaeb4a1bf..3ee1869fc 100644 --- a/harmony/taskhelp/common.go +++ b/harmony/taskhelp/common.go @@ -1,5 +1,7 @@ package taskhelp +import "strings" + // SubsetIf returns a subset of the slice for which the predicate is true. // It does not allocate memory, but rearranges the list in place. // A non-zero list input will always return a non-zero list. @@ -17,3 +19,14 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) { } return slice[:ct], true } + +// BackgroundTask are tasks that: +// * Always run in the background +// * Never finish "successfully" +func BackgroundTask(name string) string { + return "bg:" + name +} + +func IsBackgroundTask(name string) bool { + return strings.HasPrefix(name, "bg:") +} diff --git a/tasks/f3/f3_task.go b/tasks/f3/f3_task.go new file mode 100644 index 000000000..0e22e55bd --- /dev/null +++ b/tasks/f3/f3_task.go @@ -0,0 +1,281 @@ +package f3 + +import ( + "context" + "errors" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/jpillora/backoff" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/manifest" + + "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +const ( + // ParticipationCheckProgressMaxAttempts defines the maximum number of failed attempts + // before we abandon the current lease and restart the participation process. + // + // The default backoff takes 12 attempts to reach a maximum delay of 1 minute. + // Allowing for 13 failures results in approximately 2 minutes of backoff since + // the lease was granted. Given a lease validity of up to 5 instances, this means + // we would give up on checking the lease during its mid-validity period; + // typically when we would try to renew the participation ticket. Hence, the value + // to 13. + ParticipationCheckProgressMaxAttempts = 13 + + // ParticipationLeaseTerm is the number of instances the miner will attempt to lease from nodes. + ParticipationLeaseTerm = 5 +) + +var log = logging.Logger("cf3") + +type F3ParticipationAPI interface { + F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign + F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) + F3GetProgress(ctx context.Context) (gpbft.Instant, error) + F3GetManifest(ctx context.Context) (*manifest.Manifest, error) +} + +type F3Task struct { + db *harmonydb.DB + api F3ParticipationAPI + + leaseTerm uint64 + + actors map[dtypes.MinerAddress]bool +} + +func NewF3Task(db *harmonydb.DB, api F3ParticipationAPI, actors map[dtypes.MinerAddress]bool) *F3Task { + return &F3Task{ + db: db, + api: api, + leaseTerm: ParticipationLeaseTerm, + + actors: actors, + } +} + +func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + // Ensure that all chain calls are made on the same node (the first call will determine the node) + ctx := deps.OnSingleNode(context.Background()) + + var spID int64 + err = f.db.QueryRow(ctx, "SELECT sp_id FROM f3_tasks WHERE task_id = $1", taskID).Scan(&spID) + if err != nil { + return false, xerrors.Errorf("failed to get sp_id: %w", err) + } + + maddr, err := address.NewIDAddress(uint64(spID)) + if err != nil { + return false, xerrors.Errorf("failed to parse miner address: %w", err) + } + + for stillOwned() { + var previousTicket []byte + err = f.db.QueryRow(ctx, "SELECT previous_ticket FROM f3_tasks WHERE task_id = $1", taskID).Scan(&previousTicket) + if err != nil { + return false, xerrors.Errorf("failed to get previous ticket: %w", err) + } + + ticket, err := f.tryGetF3ParticipationTicket(ctx, stillOwned, maddr, previousTicket) + if err != nil { + return false, xerrors.Errorf("failed to get participation ticket: %w", err) + } + + lease, participating, err := f.tryF3Participate(ctx, stillOwned, ticket) + if err != nil { + return false, xerrors.Errorf("failed to participate in F3: %w", err) + } + if !participating { + return false, xerrors.Errorf("failed to participate in F3: not participating") + } + + _, err = f.db.Exec(ctx, "UPDATE f3_tasks SET previous_ticket = $1 WHERE task_id = $2", ticket, taskID) + if err != nil { + return false, xerrors.Errorf("failed to update previous ticket: %w", err) + } + + err = f.awaitLeaseExpiry(ctx, stillOwned, lease) + if err != nil { + return false, xerrors.Errorf("failed to await lease expiry: %w", err) + } + } + + return false, xerrors.Errorf("f3 task is background task") +} + +func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) { + for stillOwned() { + switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, ParticipationLeaseTerm); { + case ctx.Err() != nil: + return api.F3ParticipationTicket{}, ctx.Err() + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot participate in F3 as it is disabled.", "err", err) + return api.F3ParticipationTicket{}, xerrors.Errorf("acquiring F3 participation ticket: %w", err) + case err != nil: + log.Errorw("Failed to acquire F3 participation ticket; retrying", "err", err) + time.Sleep(1 * time.Second) + continue + default: + log.Debug("Successfully acquired F3 participation ticket") + return ticket, nil + } + } + return api.F3ParticipationTicket{}, ctx.Err() +} + +func (f *F3Task) tryF3Participate(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) { + for stillOwned() { + switch lease, err := f.api.F3Participate(ctx, ticket); { + case ctx.Err() != nil: + return api.F3ParticipationLease{}, false, ctx.Err() + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot participate in F3 as it is disabled.", "err", err) + return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err) + case errors.Is(err, api.ErrF3ParticipationTicketExpired): + log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "err", err) + return api.F3ParticipationLease{}, false, nil + case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting): + log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "err", err) + return api.F3ParticipationLease{}, false, nil + case errors.Is(err, api.ErrF3ParticipationTicketInvalid): + log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "err", err) + time.Sleep(1 * time.Second) + return api.F3ParticipationLease{}, false, nil + case errors.Is(err, api.ErrF3ParticipationIssuerMismatch): + log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "err", err) + time.Sleep(1 * time.Second) + continue + case errors.Is(err, api.ErrF3NotReady): + log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "err", err) + time.Sleep(30 * time.Second) + continue + case err != nil: + log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "err", err) + return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err) + default: + log.Infow("Successfully acquired F3 participation lease.", + "issuer", lease.Issuer, + "not-before", lease.FromInstance, + "not-after", lease.ToInstance(), + ) + return lease, true, nil + } + } + return api.F3ParticipationLease{}, false, ctx.Err() +} + +func (f *F3Task) awaitLeaseExpiry(ctx context.Context, stillOwned func() bool, lease api.F3ParticipationLease) error { + backoff := &backoff.Backoff{ + Min: 1 * time.Second, + Max: 1 * time.Minute, + Factor: 1.5, + } + + renewLeaseWithin := f.leaseTerm / 2 + for stillOwned() { + manifest, err := f.api.F3GetManifest(ctx) + switch { + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) + return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) + case err != nil: + if backoff.Attempt() > ParticipationCheckProgressMaxAttempts { + log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err) + return nil + } + log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err) + time.Sleep(backoff.Duration()) + case manifest == nil || manifest.NetworkName != lease.Network: + // If we got an unexpected manifest, or no manifest, go back to the + // beginning and try to get another ticket. Switching from having a manifest + // to having no manifest can theoretically happen if the lotus node reboots + // and has no static manifest. + return nil + } + switch progress, err := f.api.F3GetProgress(ctx); { + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) + return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) + case err != nil: + if backoff.Attempt() > ParticipationCheckProgressMaxAttempts { + log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err) + return nil + } + log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err) + time.Sleep(backoff.Duration()) + case progress.ID+renewLeaseWithin >= lease.ToInstance(): + log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance()) + return nil + default: + remainingInstanceLease := lease.ToInstance() - progress.ID + waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment + if waitTime == 0 { + waitTime = 100 * time.Millisecond + } + log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime) + time.Sleep(waitTime) + } + } + return ctx.Err() +} + +func (f *F3Task) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := ids[0] + return &id, nil +} + +func (f *F3Task) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: taskhelp.BackgroundTask("F3Participate"), + Cost: resources.Resources{ + Cpu: 0, + Gpu: 0, + Ram: 10 << 20, + }, + MaxFailures: 1, + } +} + +func (f *F3Task) Adder(taskFunc harmonytask.AddTaskFunc) { + for a := range f.actors { + spid, err := address.IDFromAddress(address.Address(a)) + if err != nil { + log.Errorw("failed to parse miner address", "miner", a, "error", err) + continue + } + + taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec("INSERT INTO f3_tasks (sp_id, task_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", spid, id) + if err != nil { + return false, err + } + + return n > 0, nil + }) + } +} + +func (f *F3Task) GetSpid(db *harmonydb.DB, taskID int64) string { + var spId string + err := db.QueryRow(context.Background(), `SELECT sp_id FROM f3_tasks WHERE task_id = $1`, taskID).Scan(&spId) + if err != nil { + return "" + } + return spId +} + +var _ = harmonytask.Reg(&F3Task{}) +var _ harmonytask.TaskInterface = &F3Task{} diff --git a/tasks/winning/winning_task.go b/tasks/winning/winning_task.go index 182af4604..6fef12eff 100644 --- a/tasks/winning/winning_task.go +++ b/tasks/winning/winning_task.go @@ -22,6 +22,7 @@ import ( prooftypes "github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/curio/build" + "github.com/filecoin-project/curio/deps" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" @@ -95,7 +96,7 @@ func NewWinPostTask(max int, db *harmonydb.DB, remote *paths.Remote, verifier st func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { log.Debugw("WinPostTask.Do()", "taskID", taskID) - ctx := context.TODO() + ctx := deps.OnSingleNode(context.Background()) type BlockCID struct { CID string @@ -397,7 +398,7 @@ func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don { uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.AddRounds)+1) - blockMsg, err = t.api.MinerCreateBlock(context.TODO(), &api.BlockTemplate{ + blockMsg, err = t.api.MinerCreateBlock(ctx, &api.BlockTemplate{ Miner: maddr, Parents: base.TipSet.Key(), Ticket: ticket, diff --git a/web/static/cluster-tasks.mjs b/web/static/cluster-tasks.mjs index b9834a2be..77ab74921 100644 --- a/web/static/cluster-tasks.mjs +++ b/web/static/cluster-tasks.mjs @@ -1,42 +1,90 @@ import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js'; import RPCCall from '/lib/jsonrpc.mjs'; -customElements.define('cluster-tasks',class ClusterTasks extends LitElement { + +customElements.define('cluster-tasks', class ClusterTasks extends LitElement { + static get properties() { + return { + data: { type: Array }, + showBackgroundTasks: { type: Boolean }, + }; + } + constructor() { super(); this.data = []; + this.showBackgroundTasks = false; this.loadData(); } + async loadData() { - this.data = await RPCCall('ClusterTaskSummary') || []; + this.data = (await RPCCall('ClusterTaskSummary')) || []; setTimeout(() => this.loadData(), 1000); this.requestUpdate(); } + + toggleShowBackgroundTasks(e) { + this.showBackgroundTasks = e.target.checked; + } + render() { return html` - - -
SpID | +Task | +ID | +Posted | +Owner | +|||||
---|---|---|---|---|---|---|---|---|---|
SpID | -Task | -ID | -Posted | -Owner | +${entry.SpID ? 'f0' + entry.SpID : 'n/a'} | +${entry.Name} | +${entry.ID} | +${entry.SincePostedStr} | ++ ${entry.OwnerID + ? html`${entry.Owner}` + : ''} + |
${entry.SpID? "f0" + entry.SpID: 'n/a'} | -${entry.Name} | -${entry.ID} | -${entry.SincePostedStr} | -${entry.OwnerID ? html`${entry.Owner}`:''} | -