Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daemon subdaemon refactor #414

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5df95bc
[daemon] Cleanup routinehelper code
cgalibern Sep 18, 2023
d6f7acb
[scheduler] Refactor to prepare daemon/subdaemon removal
cgalibern Sep 22, 2023
7e2739d
[hb] Refactor to prepare daemon/subdaemon removal
cgalibern Sep 22, 2023
ef8b5b7
[lsnrhttp{inet,ux}] Refactor to prepare daemon/subdaemon removal
cgalibern Sep 22, 2023
de545a5
[listener] Refactor to prepare daemon/subdaemon removal
cgalibern Sep 22, 2023
7db0312
[discover] Refactor to prepare daemon/subdaemon removal
cgalibern Sep 22, 2023
563c80c
[nmon] nmon implements StartStopper interface
cgalibern Sep 22, 2023
21056b3
[dns] dns implements StartStopper interface
cgalibern Sep 22, 2023
7faf951
POST /daemon/stop publish msgbus.DaemonCtl to stop daemon
cgalibern Sep 22, 2023
cc999aa
[daemon] Don't anymore use daemon/subdaemon
cgalibern Sep 22, 2023
77b3104
drop package daemon/subdaemon
cgalibern Sep 22, 2023
b786ab6
[daemon] Don't anymore use daemon/enable
cgalibern Sep 25, 2023
fc42dad
[core/collector] Fix NewPinger endless on stop
cgalibern Sep 25, 2023
ba1deb8
[daemon/nmon] Skip getStats on non Linux
cgalibern Sep 25, 2023
cd83f20
[daemonapi,routehttp] Refactor labelnode -> DaemonApi.LabelNode for test
cgalibern Sep 26, 2023
1a8b42e
[daemon/ccfg] Implement StartStopper interface for ccfg
cgalibern Sep 26, 2023
9d0b8dc
[daemon/ccfg] Delete unused ccfg.databus member
cgalibern Sep 26, 2023
44caffe
[daemon/cstat] Delete unused cstat.databus member
cgalibern Sep 26, 2023
19ca044
[daemon/ctstat] Implement StartStopper interface for cstat
cgalibern Sep 26, 2023
2e41f42
[daemon/dns] Stop() waits for wait group
cgalibern Sep 26, 2023
85e4415
[daemon/dns] Stop remaining listener routines during Stop()
cgalibern Sep 26, 2023
5803471
[daemon/hbcache] Implement StartStopper interface for hbcache.T
cgalibern Sep 26, 2023
a6aeacb
[daemon/istat] Implement StartStopper interface for istat.T
cgalibern Sep 26, 2023
4bcd05d
[daemon/hb] Improves start & stop
cgalibern Sep 26, 2023
50dc1ce
[hb/hbctrl] Implement StartStopper interface for hbctrl.C
cgalibern Sep 26, 2023
2a01cdf
[daemoncli] Close daemonsys when not anymore used
cgalibern Sep 26, 2023
83c4fe0
[daemon] Ensure no more routines when daemon is stopped
cgalibern Sep 26, 2023
b05b682
Rename package daemoncli -> daemoncmd
cgalibern Sep 26, 2023
fddfc4b
[test] Make test pass
cgalibern Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/collector/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c Client) NewPinger(d time.Duration) func() {
for {
select {
case <-stop:
break
return
case <-ticker.C:
c.Ping()
}
Expand Down
4 changes: 2 additions & 2 deletions core/commands/daemon_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/opensvc/om3/core/client"
"github.com/opensvc/om3/core/nodeselector"
"github.com/opensvc/om3/daemon/daemoncli"
"github.com/opensvc/om3/daemon/daemoncmd"
)

type (
Expand Down Expand Up @@ -43,7 +43,7 @@ func (t *CmdDaemonRestart) restartLocal() error {
return err
}
ctx := context.Background()
return daemoncli.NewContext(ctx, cli).RestartFromCmd(ctx)
return daemoncmd.NewContext(ctx, cli).RestartFromCmd(ctx)
}

func (t *CmdDaemonRestart) restartRemotes(nodes []string) error {
Expand Down
4 changes: 2 additions & 2 deletions core/commands/daemon_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package commands
import (
"os"

"github.com/opensvc/om3/daemon/daemoncli"
"github.com/opensvc/om3/daemon/daemoncmd"
)

type (
Expand All @@ -17,7 +17,7 @@ func (t *CmdDaemonRunning) Run() error {
if err != nil {
return err
}
dCli := daemoncli.New(cli)
dCli := daemoncmd.New(cli)
dCli.SetNode(t.NodeSelector)
if !dCli.Running() {
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions core/commands/daemon_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package commands
import (
"context"

"github.com/opensvc/om3/daemon/daemoncli"
"github.com/opensvc/om3/daemon/daemoncmd"
)

type (
Expand Down Expand Up @@ -32,5 +32,5 @@ func (t *CmdDaemonStart) Run() error {
return err
}
ctx := context.Background()
return daemoncli.NewContext(ctx, cli).StartFromCmd(ctx, t.Foreground, t.CpuProfile)
return daemoncmd.NewContext(ctx, cli).StartFromCmd(ctx, t.Foreground, t.CpuProfile)
}
4 changes: 2 additions & 2 deletions core/commands/daemon_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package commands
import (
"context"

"github.com/opensvc/om3/daemon/daemoncli"
"github.com/opensvc/om3/daemon/daemoncmd"
)

type (
Expand All @@ -18,5 +18,5 @@ func (t *CmdDaemonStop) Run() error {
return err
}
ctx := context.Background()
return daemoncli.NewContext(ctx, cli).StopFromCmd(ctx)
return daemoncmd.NewContext(ctx, cli).StopFromCmd(ctx)
}
40 changes: 25 additions & 15 deletions daemon/ccfg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/opensvc/om3/core/node"
"github.com/opensvc/om3/core/object"
"github.com/opensvc/om3/core/xconfig"
"github.com/opensvc/om3/daemon/daemondata"
"github.com/opensvc/om3/daemon/draincommand"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/hostname"
Expand All @@ -37,7 +37,7 @@ type (
ctx context.Context
cancel context.CancelFunc
cmdC chan any
databus *daemondata.T
drainDuration time.Duration
bus *pubsub.Bus
log zerolog.Logger
startedAt time.Time
Expand All @@ -53,6 +53,7 @@ type (
change bool

sub *pubsub.Subscription
wg sync.WaitGroup
}

cmdGet struct {
Expand All @@ -68,20 +69,21 @@ var (
cmdC chan any
)

// Start launches the ccfg worker goroutine
func Start(parent context.Context, drainDuration time.Duration) error {
ctx, cancel := context.WithCancel(parent)

func New(drainDuration time.Duration) *ccfg {
o := &ccfg{
networkSigs: make(map[string]string),
ctx: ctx,
cancel: cancel,
cmdC: make(chan any),
databus: daemondata.FromContext(ctx),
bus: pubsub.BusFromContext(ctx),
log: log.Logger.With().Str("func", "ccfg").Logger(),
localhost: hostname.Hostname(),
cmdC: make(chan any),
drainDuration: drainDuration,
localhost: hostname.Hostname(),
log: log.Logger.With().Str("func", "ccfg").Logger(),
networkSigs: make(map[string]string),
}
return o
}

// Start launches the ccfg worker goroutine
func (o *ccfg) Start(parent context.Context) error {
o.ctx, o.cancel = context.WithCancel(parent)
o.bus = pubsub.BusFromContext(o.ctx)
cmdC = o.cmdC

if n, err := object.NewCluster(object.WithVolatile(true)); err != nil {
Expand All @@ -93,12 +95,14 @@ func Start(parent context.Context, drainDuration time.Duration) error {
o.pubClusterConfig()

o.startSubscriptions()
o.wg.Add(1)
go func() {
defer func() {
draincommand.Do(o.cmdC, drainDuration)
draincommand.Do(o.cmdC, o.drainDuration)
if err := o.sub.Stop(); err != nil && !errors.Is(err, context.Canceled) {
o.log.Warn().Err(err).Msg("subscription stop")
}
o.wg.Done()
}()
o.worker()
}()
Expand All @@ -109,6 +113,12 @@ func Start(parent context.Context, drainDuration time.Duration) error {
return nil
}

func (o *ccfg) Stop() error {
o.cancel()
o.wg.Wait()
return nil
}

func (o *ccfg) startSubscriptions() {
sub := o.bus.Sub("ccfg")
sub.AddFilter(&msgbus.ConfigFileUpdated{}, pubsub.Label{"path", "cluster"})
Expand Down
31 changes: 19 additions & 12 deletions daemon/cstat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ package cstat
import (
"context"
"errors"
"sync"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/opensvc/om3/core/cluster"
"github.com/opensvc/om3/core/node"
"github.com/opensvc/om3/daemon/daemondata"
"github.com/opensvc/om3/daemon/msgbus"
"github.com/opensvc/om3/util/pubsub"
)
Expand All @@ -27,7 +27,6 @@ type (
ctx context.Context
cancel context.CancelFunc
cmdC chan any
databus *daemondata.T
bus *pubsub.Bus
log zerolog.Logger
startedAt time.Time
Expand All @@ -41,26 +40,28 @@ type (
change bool

sub *pubsub.Subscription
wg sync.WaitGroup
}
)

// Start launches the cstat worker goroutine
func Start(parent context.Context) error {
ctx, cancel := context.WithCancel(parent)

o := &cstat{
ctx: ctx,
cancel: cancel,
databus: daemondata.FromContext(ctx),
bus: pubsub.BusFromContext(ctx),
func New() *cstat {
return &cstat{
log: log.Logger.With().Str("func", "cstat").Logger(),
nodeStatus: make(map[string]node.Status),
}
}

// Start launches the cstat worker goroutine
func (o *cstat) Start(parent context.Context) error {
o.ctx, o.cancel = context.WithCancel(parent)
o.bus = pubsub.BusFromContext(o.ctx)

o.startSubscriptions()
o.wg.Add(1)
go func() {
defer o.wg.Done()
defer func() {
if err := o.sub.Stop(); err != nil && !errors.Is(err, context.Canceled){
if err := o.sub.Stop(); err != nil && !errors.Is(err, context.Canceled) {
o.log.Error().Err(err).Msg("subscription stop")
}
}()
Expand All @@ -69,6 +70,12 @@ func Start(parent context.Context) error {
return nil
}

func (o *cstat) Stop() error {
o.cancel()
o.wg.Wait()
return nil
}

func (o *cstat) startSubscriptions() {
sub := o.bus.Sub("cstat")
sub.AddFilter(&msgbus.NodeStatusUpdated{})
Expand Down
14 changes: 0 additions & 14 deletions daemon/daemon/funcopts.go

This file was deleted.

Loading