Skip to content

Commit

Permalink
Merge pull request #12710 from tomponline/tp-task-mu
Browse files Browse the repository at this point in the history
Task: Remove unnecessary calls to defer g.mu.Unlock()
  • Loading branch information
tomponline authored Jan 18, 2024
2 parents c0f3378 + 2702d08 commit 029b142
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
8 changes: 6 additions & 2 deletions lxd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (d *Daemon) init() error {
// leader.
d.gateway.Cluster = d.db.Cluster
taskFunc, taskSchedule := cluster.HeartbeatTask(d.gateway)
hbGroup := task.Group{}
hbGroup := task.NewGroup()
d.taskClusterHeartbeat = hbGroup.Add(taskFunc, taskSchedule)
hbGroup.Start(d.shutdownCtx)
d.gateway.WaitUpgradeNotification()
Expand Down Expand Up @@ -1560,6 +1560,8 @@ func (d *Daemon) init() error {
d.startClusterTasks()
}

d.tasks = *task.NewGroup()

// FIXME: There's no hard reason for which we should not run these
// tasks in mock mode. However it requires that we tweak them so
// they exit gracefully without blocking (something we should do
Expand Down Expand Up @@ -1622,6 +1624,8 @@ func (d *Daemon) startClusterTasks() {
// Run asynchronously so that connecting to remote members doesn't delay starting up other cluster tasks.
go cluster.EventsUpdateListeners(d.endpoints, d.db.Cluster, d.serverCert, nil, d.events.Inject)

d.clusterTasks = *task.NewGroup()

// Heartbeats
d.taskClusterHeartbeat = d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))

Expand All @@ -1640,7 +1644,7 @@ func (d *Daemon) startClusterTasks() {

func (d *Daemon) stopClusterTasks() {
_ = d.clusterTasks.Stop(3 * time.Second)
d.clusterTasks = task.Group{}
d.clusterTasks = *task.NewGroup()
}

// numRunningInstances returns the number of running instances.
Expand Down
35 changes: 17 additions & 18 deletions lxd/task/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,32 @@ import (
//
// All tasks in a group will be started and stopped at the same time.
type Group struct {
cancel func()
cancel context.CancelFunc
wg sync.WaitGroup
tasks []Task
running map[int]bool
mu sync.Mutex
}

// NewGroup returns new initialised Group.
func NewGroup() *Group {
return &Group{
running: make(map[int]bool),
}
}

// Add a new task to the group, returning its index.
func (g *Group) Add(f Func, schedule Schedule) *Task {
g.mu.Lock()
defer g.mu.Unlock()

i := len(g.tasks)
g.tasks = append(g.tasks, Task{
f: f,
schedule: schedule,
reset: make(chan struct{}, 16), // Buffered to not block senders
})
t := &g.tasks[len(g.tasks)-1] // Get the task we added to g.tasks.
g.mu.Unlock()

return &g.tasks[i]
return t
}

// Start all the tasks in the group.
Expand All @@ -40,15 +46,10 @@ func (g *Group) Start(ctx context.Context) {
// concurrent calls to Start() or Add(0) don't race. This ensures all tasks in this group
// are started based on a consistent snapshot of g.running and g.tasks.
g.mu.Lock()
defer g.mu.Unlock()

ctx, g.cancel = context.WithCancel(ctx)
g.wg.Add(len(g.tasks))

if g.running == nil {
g.running = make(map[int]bool)
}

for i := range g.tasks {
if g.running[i] {
continue
Expand All @@ -62,14 +63,13 @@ func (g *Group) Start(ctx context.Context) {

// Ensure running map is updated before wait group Done() is called.
g.mu.Lock()
defer g.mu.Unlock()

if g.running != nil {
g.running[i] = false
g.wg.Done()
}
g.running[i] = false
g.mu.Unlock()
g.wg.Done()
}(i)
}

g.mu.Unlock()
}

// Stop all tasks in the group.
Expand Down Expand Up @@ -106,14 +106,13 @@ func (g *Group) Stop(timeout time.Duration) error {
select {
case <-ctx.Done():
g.mu.Lock()
defer g.mu.Unlock()

running := []string{}
for i, value := range g.running {
if value {
running = append(running, strconv.Itoa(i))
}
}
g.mu.Unlock()

return fmt.Errorf("Task(s) still running: IDs %v", running)
case <-graceful:
Expand Down
4 changes: 2 additions & 2 deletions lxd/task/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestGroup_Add(t *testing.T) {
group := &task.Group{}
group := task.NewGroup()
ok := make(chan struct{})
f := func(context.Context) { close(ok) }
group.Add(f, task.Every(time.Second))
Expand All @@ -23,7 +23,7 @@ func TestGroup_Add(t *testing.T) {
}

func TestGroup_StopUngracefully(t *testing.T) {
group := &task.Group{}
group := task.NewGroup()

// Create a task function that blocks.
ok := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion lxd/task/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// function to reset the task's state. See Group.Stop() and Group.Reset() for
// more details.
func Start(ctx context.Context, f Func, schedule Schedule) (func(time.Duration) error, func()) {
group := Group{}
group := NewGroup()
task := group.Add(f, schedule)
group.Start(ctx)

Expand Down

0 comments on commit 029b142

Please sign in to comment.