Skip to content

Commit

Permalink
[daemon] Daemon BigBang event retrieval
Browse files Browse the repository at this point in the history
During daemon startup we now buffer publications to allow event clients
to retrieve all published event. After 200ms the buffering is disabled.

Before this commit, during daemon startup we couldn't get the initial
events until the listener is started.

om node event re-connect time is reduced to 100ms to allow reconnect
before daemon event buffering is disabled.
  • Loading branch information
cgalibern committed Sep 27, 2024
1 parent 286dd93 commit 547f643
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
4 changes: 3 additions & 1 deletion core/omcmd/node_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ func (t *CmdNodeEvents) nodeEventLoop(ctx context.Context, nodename string) {
_, _ = fmt.Fprintf(os.Stderr, "event read failed for node %s: '%s'\n", nodename, err)
_, _ = fmt.Fprintln(os.Stderr, "press ctrl+c to interrupt retries")
}
time.Sleep(1 * time.Second)
// wait time before reconnect, it is short to avoid loosing daemon
// restart events.
time.Sleep(100 * time.Millisecond)
select {
case <-ctx.Done():
t.errC <- ctx.Err()
Expand Down
15 changes: 15 additions & 0 deletions daemon/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,27 @@ func (t *T) Start(ctx context.Context) error {
t.ctx = pubsub.ContextWithBus(t.ctx, bus)
t.wg.Add(1)
bus.Start(t.ctx)
bus.EnableBufferPublication(2000)
bus.Pub(&msgbus.DaemonStatusUpdated{Node: localhost, Version: version.Version(), Status: "starting"}, labelLocalhost)

t.bus = bus
t.stopFuncs = append(t.stopFuncs, func() error {
bus.Pub(&msgbus.DaemonStatusUpdated{Node: localhost, Version: version.Version(), Status: "stopped"}, labelLocalhost)
// give chance for DaemonStatusUpdated message to reach peers
time.Sleep(300 * time.Millisecond)
defer t.wg.Done()
t.log.Infof("stop pubsub bus")
t.bus.Stop()
t.log.Infof("stopped pubsub bus")
return nil
})

defer func() {
// give a chance for event client to reconnect to the daemon
time.Sleep(200 * time.Millisecond)
bus.DisableBufferPublication()
}()

defer t.stopWatcher()

go t.notifyWatchDogSys(t.ctx)
Expand Down Expand Up @@ -187,6 +199,9 @@ func (t *T) Stop() error {
var errs error
// stop goroutines without cancel context
t.logTransition("stopping 🟡")
localhost := hostname.Hostname()
t.bus.Pub(&msgbus.DaemonStatusUpdated{Node: localhost, Version: version.Version(), Status: "stopping"}, pubsub.Label{"node", localhost})
time.Sleep(300 * time.Millisecond)
defer t.logTransition("stopped 🟡")
for i := len(t.stopFuncs) - 1; i >= 0; i-- {
if err := t.stopFuncs[i](); err != nil {
Expand Down

0 comments on commit 547f643

Please sign in to comment.