Skip to content

Commit

Permalink
feat: ✨ add a framework for allowing sensor workers to be start/stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuar committed Jun 14, 2024
1 parent aae9e81 commit 988626a
Show file tree
Hide file tree
Showing 27 changed files with 428 additions and 141 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ issues:
source: "^//go:generate "
- path: '(.+)_test\.go'
text: "copies lock value"
- linters:
- govet
text: "lostcancel"

severity:
default-severity: "@linter"
Expand Down
130 changes: 120 additions & 10 deletions internal/agent/device_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package agent

import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog/log"

"github.com/joshuar/go-hass-agent/internal/hass/sensor"
"github.com/joshuar/go-hass-agent/internal/linux"
"github.com/joshuar/go-hass-agent/internal/linux/apps"
"github.com/joshuar/go-hass-agent/internal/linux/battery"
Expand All @@ -27,8 +30,8 @@ import (
"github.com/joshuar/go-hass-agent/pkg/linux/dbusx"
)

// workers is the list of sensor workers supported on Linux.
var workers = []func(ctx context.Context) (*linux.SensorWorker, error){
// allworkers is the list of sensor allworkers supported on Linux.
var allworkers = []func() (*linux.SensorWorker, error){
apps.NewAppWorker,
battery.NewBatteryWorker,
cpu.NewLoadAvgWorker,
Expand All @@ -52,27 +55,134 @@ var workers = []func(ctx context.Context) (*linux.SensorWorker, error){
user.NewUserWorker,
}

var (
ErrWorkerAlreadyStarted = errors.New("worker already started")
ErrUnknownWorker = errors.New("unknown worker")
)

type workerControl struct {
object Worker
control context.CancelFunc
}

type linuxWorkers map[string]*workerControl

func (w linuxWorkers) ActiveWorkers() []string {
workers := make([]string, 0, len(w))

for _, worker := range w {
if worker.control != nil {
workers = append(workers, worker.object.Name())
}
}

return workers
}

func (w linuxWorkers) InactiveWorkers() []string {
workers := make([]string, 0, len(w))

for _, worker := range w {
if worker.control == nil {
workers = append(workers, worker.object.Name())
}
}

return workers
}

func (w linuxWorkers) Start(ctx context.Context, name string) (<-chan sensor.Details, error) {
if worker, ok := w[name]; ok {
if worker.control != nil {
return nil, ErrWorkerAlreadyStarted
}

workerCtx, workerCancelFunc := context.WithCancel(ctx)

workerCh, err := w[name].object.Updates(workerCtx)
if err != nil {
return nil, fmt.Errorf("could not start worker: %w", err)
}

w[name].control = workerCancelFunc

return workerCh, nil
}

return nil, ErrUnknownWorker
}

func (w linuxWorkers) Stop(name string) error {
var worker *workerControl

var exists bool

if worker, exists = w[name]; !exists {
return ErrUnknownWorker
}

worker.control()

return nil
}

func (w linuxWorkers) StartAll(ctx context.Context) (<-chan sensor.Details, error) {
outCh := make([]<-chan sensor.Details, 0, len(allworkers))

var allerr error

log.Debug().Msg("Starting all Linux workers.")

for name, worker := range w {
workerCtx, cancelFunc := context.WithCancel(ctx)

log.Debug().Str("name", name).Str("description", worker.object.Description()).Msg("Starting sensor worker.")

workerCh, err := worker.object.Updates(workerCtx)
if err != nil {
allerr = errors.Join(allerr, err)

continue
}

outCh = append(outCh, workerCh)
worker.control = cancelFunc
}

return sensor.MergeSensorCh(ctx, outCh...), allerr
}

func (w linuxWorkers) StopAll() error {
for _, worker := range w {
worker.control()
}

return nil
}

func newDevice(_ context.Context) *linux.Device {
return linux.NewDevice(preferences.AppName, preferences.AppVersion)
}

// sensorWorkers initialises the list of workers for sensors and returns those
// createSensorWorkers initialises the list of workers for sensors and returns those
// that are supported on this device.
func sensorWorkers(ctx context.Context) []Worker {
activeWorkers := make([]Worker, 0, len(workers))
//
//nolint:exhaustruct
func createSensorWorkers() WorkerController {
workers := make(linuxWorkers)

for _, w := range workers {
worker, err := w(ctx)
for _, w := range allworkers {
worker, err := w()
if err != nil {
log.Debug().Err(err).Msg("Could not activate a worker.")
log.Debug().Err(err).Msg("Could not initialise worker.")

continue
}

activeWorkers = append(activeWorkers, worker)
workers[worker.Name()] = &workerControl{object: worker}
}

return activeWorkers
return workers
}

// setupDeviceContext returns a new Context that contains the D-Bus API.
Expand Down
65 changes: 38 additions & 27 deletions internal/agent/runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ import (
"github.com/joshuar/go-hass-agent/internal/scripts"
)

// WorkerController represents an object that manages one or more Workers.
type WorkerController interface {
// ActiveWorkers is a list of the names of all currently active Workers.
ActiveWorkers() []string
// InactiveWorkers is a list of the names of all currently inactive Workers.
InactiveWorkers() []string
// Start provides a way to start the named Worker.
Start(ctx context.Context, name string) (<-chan sensor.Details, error)
// Stop provides a way to stop the named Worker.
Stop(name string) error
// StartAll will start all Workers that this controller manages.
StartAll(ctx context.Context) (<-chan sensor.Details, error)
// StopAll will stop all Workers that this controller manages.
StopAll() error
}

// Worker represents an object that is responsible for controlling the
// publishing of one or more sensors.
type Worker interface {
Expand Down Expand Up @@ -59,36 +75,26 @@ type MQTTWorker interface {
// runWorkers will call all the sensor worker functions that have been defined
// for this device.
func runWorkers(ctx context.Context, trk SensorTracker, reg sensor.Registry) {
workers := sensorWorkers(ctx)
workers = append(workers, device.NewExternalIPUpdaterWorker(), device.NewVersionWorker())

outCh := make([]<-chan sensor.Details, 0, len(workers))

cancelFuncs := make([]context.CancelFunc, 0, len(workers))

log.Debug().Msg("Starting worker funcs.")

for worker := range len(workers) {
workerCtx, cancelFunc := context.WithCancel(ctx)
cancelFuncs = append(cancelFuncs, cancelFunc)

log.Debug().Str("name", workers[worker].Name()).Str("description", workers[worker].Description()).Msg("Starting sensor worker.")

workerCh, err := workers[worker].Updates(workerCtx)
if err != nil {
log.Warn().Err(err).Str("name", workers[worker].Name()).Msg("Could not start worker.")

continue
}

outCh = append(outCh, workerCh)
// Create sensor workers for OS.
osWorkers := createSensorWorkers()
// Start sensor workers for OS.
sensorUpdates, err := osWorkers.StartAll(ctx)
if err != nil {
log.Debug().Err(err).Msg("Some OS workers could not be started.")
}
// Create sensor workers for device.
deviceWorkers := device.CreateSensorWorkers()
// Start sensor workers for device.
deviceUpdates, err := deviceWorkers.StartAll(ctx)
if err != nil {
log.Debug().Err(err).Msg("Some device workers could not be started.")
}

sensorUpdates := sensor.MergeSensorCh(ctx, outCh...)
// Listen for sensor updates from all workers.
go func() {
log.Debug().Msg("Listening for sensor updates.")

for update := range sensorUpdates {
for update := range sensor.MergeSensorCh(ctx, sensorUpdates, deviceUpdates) {
go func(update sensor.Details) {
if err := trk.UpdateSensor(ctx, reg, update); err != nil {
log.Warn().Err(err).Str("id", update.ID()).Msg("Update failed.")
Expand All @@ -108,12 +114,17 @@ func runWorkers(ctx context.Context, trk SensorTracker, reg sensor.Registry) {

wg.Add(1)

// When context is cancelled, stop all sensor workers.
go func() {
defer wg.Done()
<-ctx.Done()

for _, c := range cancelFuncs {
c()
if err := osWorkers.StopAll(); err != nil {
log.Debug().Err(err).Msg("Stopping OS workers reported an error.")
}

if err := deviceWorkers.StopAll(); err != nil {
log.Debug().Err(err).Msg("Stopping device workers reported an error.")
}
}()
wg.Wait()
Expand Down
Loading

0 comments on commit 988626a

Please sign in to comment.