Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: F3 #292

Merged
merged 7 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks

import (
"context"
"os"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -32,6 +33,7 @@ import (
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/slotmgr"
"github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/tasks/f3"
"github.com/filecoin-project/curio/tasks/gc"
"github.com/filecoin-project/curio/tasks/message"
"github.com/filecoin-project/curio/tasks/metadata"
Expand Down Expand Up @@ -152,6 +154,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
inclCkTask := winning.NewInclusionCheckTask(db, full)
activeTasks = append(activeTasks, winPoStTask, inclCkTask)

if os.Getenv("CURIO_DISABLE_F3") != "1" {
f3Task := f3.NewF3Task(db, full, maddrs)
activeTasks = append(activeTasks, f3Task)
}

// Warn if also running a sealing task
if cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || cfg.Subsystems.EnablePoRepProof || cfg.Subsystems.EnableMoveStorage || cfg.Subsystems.EnableSendCommitMsg || cfg.Subsystems.EnableUpdateEncode || cfg.Subsystems.EnableUpdateProve || cfg.Subsystems.EnableUpdateSubmit {
log.Error("It's unsafe to run PoSt and sealing tasks concurrently.")
Expand Down
18 changes: 11 additions & 7 deletions deps/apiinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/filecoin-project/curio/api"

lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
)
Expand Down Expand Up @@ -80,9 +81,13 @@ func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, json

type contextKey string

var retryNodeKey = contextKey("retry-node")

// OnSingleNode returns a new context that will try to perform all calls on the same node.
// If the backing node fails, the calls will be retried on a different node, and further calls will be made on that node.
// Not thread safe
func OnSingleNode(ctx context.Context) context.Context {
return context.WithValue(ctx, contextKey("retry-node"), new(*int))
return context.WithValue(ctx, retryNodeKey, new(*int))
}

type httpHead struct {
Expand All @@ -96,7 +101,7 @@ var RPCErrors = jsonrpc.NewErrors()
func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.Chain, jsonrpc.ClientCloser, error) {
var res api.ChainStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(RPCErrors)}, opts...)...)
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(lapi.RPCErrors)}, opts...)...)

return &res, closer, err
}
Expand Down Expand Up @@ -228,12 +233,11 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {

// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = preferredProvider
if ctx.Value(retryNodeKey) != nil {
if (*ctx.Value(retryNodeKey).(**int)) == nil {
*ctx.Value(retryNodeKey).(**int) = preferredProvider
} else {
preferredProvider = *ctx.Value(key).(**int)
preferredProvider = *ctx.Value(retryNodeKey).(**int)
}
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/filecoin-project/go-commp-utils v0.1.4
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8
github.com/filecoin-project/go-commp-utils/v2 v2.1.0
github.com/filecoin-project/go-f3 v0.7.0
github.com/filecoin-project/go-fil-commcid v0.2.0
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0
github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/ipfs/go-ipld-cbor v0.2.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.1.0
github.com/manifoldco/promptui v0.9.0
Expand Down Expand Up @@ -131,7 +133,6 @@ require (
github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 // indirect
github.com/filecoin-project/go-clock v0.1.0 // indirect
github.com/filecoin-project/go-crypto v0.1.0 // indirect
github.com/filecoin-project/go-f3 v0.7.0 // indirect
github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 // indirect
Expand Down Expand Up @@ -205,7 +206,6 @@ require (
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
7 changes: 7 additions & 0 deletions harmony/harmonydb/sql/20241021-f3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE f3_tasks (
sp_id BIGINT PRIMARY KEY,
task_id BIGINT UNIQUE,
previous_ticket BYTEA,

FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE SET NULL
);
13 changes: 13 additions & 0 deletions harmony/taskhelp/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package taskhelp

import "strings"

// SubsetIf returns a subset of the slice for which the predicate is true.
// It does not allocate memory, but rearranges the list in place.
// A non-zero list input will always return a non-zero list.
Expand All @@ -17,3 +19,14 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) {
}
return slice[:ct], true
}

// BackgroundTask are tasks that:
// * Always run in the background
// * Never finish "successfully"
func BackgroundTask(name string) string {
return "bg:" + name
}

func IsBackgroundTask(name string) bool {
return strings.HasPrefix(name, "bg:")
}
Loading