Skip to content

Commit

Permalink
fix: harmony: Correctly separate this counters when sharing Max (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k authored Oct 4, 2024
1 parent 6390f94 commit 866fd2e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
20 changes: 2 additions & 18 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Nil (default)/Zero or less means unrestricted.
// Counters can either be independent when created with Max, or shared between tasks with SharedMax.Make()
Max Limiter
Max taskhelp.Limiter

// Name is the task name to be added to the task list.
Name string
Expand Down Expand Up @@ -105,23 +105,6 @@ type TaskInterface interface {
Adder(AddTaskFunc)
}

type Limiter interface {
// Active returns the number of tasks of this type that are currently running
// in this limiter / limiter group.
Active() int

// ActiveThis returns the number of tasks of this type that are currently running
// in this limiter (e.g. per-task-type count).
ActiveThis() int

// AtMax returns whether this limiter permits more tasks to run.
AtMax() bool

// Add increments / decrements the active task counters by delta. This call
// is atomic
Add(delta int)
}

// AddTaskFunc is responsible for adding a task's details "extra info" to the DB.
// It should return true if the task should be added, false if it was already there.
// This is typically accomplished with a "unique" index on your detals table that
Expand Down Expand Up @@ -188,6 +171,7 @@ func New(
if h.Max == nil {
h.Max = taskhelp.Max(0)
}
h.Max = h.Max.Instance()

if Registry[h.TaskTypeDetails.Name] == nil {
return nil, fmt.Errorf("task %s not registered: var _ = harmonytask.Reg(t TaskInterface)", h.TaskTypeDetails.Name)
Expand Down
25 changes: 25 additions & 0 deletions harmony/taskhelp/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,27 @@ import (
"sync/atomic"
)

type Limiter interface {
// Active returns the number of tasks of this type that are currently running
// in this limiter / limiter group.
Active() int

// ActiveThis returns the number of tasks of this type that are currently running
// in this limiter (e.g. per-task-type count).
ActiveThis() int

// AtMax returns whether this limiter permits more tasks to run.
AtMax() bool

// Add increments / decrements the active task counters by delta. This call
// is atomic
Add(delta int)

// Instance spawns a sub-instance of this limiter. This is called by harmonytask on startup for each task
// using this limiter. Each sub-instance has it's own individual "This" counter, but can share a common counter.
Instance() Limiter
}

type MaxCounter struct {
// maximum number of tasks of this type that can be run
N int
Expand Down Expand Up @@ -37,6 +58,10 @@ func (m *MaxCounter) Add(n int) {
m.currentThis.Add(int32(n))
}

func (m *MaxCounter) Instance() Limiter {
return &MaxCounter{N: m.N, current: m.current, currentThis: new(atomic.Int32)}
}

func Max(n int) *MaxCounter {
return &MaxCounter{N: n, current: new(atomic.Int32), currentThis: new(atomic.Int32)}
}
5 changes: 3 additions & 2 deletions tasks/seal/task_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/dealdata"
ffi2 "github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/paths"
Expand Down Expand Up @@ -45,11 +46,11 @@ type SDRTask struct {

sc *ffi2.SealCalls

max harmonytask.Limiter
max taskhelp.Limiter
min int
}

func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR harmonytask.Limiter, minSDR int) *SDRTask {
func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR taskhelp.Limiter, minSDR int) *SDRTask {
return &SDRTask{
api: api,
db: db,
Expand Down
5 changes: 3 additions & 2 deletions tasks/unseal/task_unseal_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/passcall"
"github.com/filecoin-project/curio/lib/paths"
Expand All @@ -30,14 +31,14 @@ type UnsealSDRApi interface {
}

type TaskUnsealSdr struct {
max harmonytask.Limiter
max taskhelp.Limiter

sc *ffi.SealCalls
db *harmonydb.DB
api UnsealSDRApi
}

func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max harmonytask.Limiter, api UnsealSDRApi) *TaskUnsealSdr {
func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max taskhelp.Limiter, api UnsealSDRApi) *TaskUnsealSdr {
return &TaskUnsealSdr{
max: max,
sc: sc,
Expand Down

0 comments on commit 866fd2e

Please sign in to comment.