Skip to content

Commit

Permalink
fix(agent,hass,device): better clean-up on agent quit/cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuar committed Oct 24, 2023
1 parent 1d312a0 commit ec7a7e0
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 58 deletions.
30 changes: 15 additions & 15 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func Run(options AgentOptions) {
var err error

agent := newAgent(&options)
defer close(agent.done)

// Pre-flight: check if agent is registered. If not, run registration flow.
var regWait sync.WaitGroup
Expand All @@ -99,7 +98,6 @@ func Run(options AgentOptions) {
log.Warn().Err(err).Msg("Unable to set config version to app version.")
}
ctx, cancelFunc = agent.setupContext()
agent.handleCancellation(ctx)
}()

// Start main work goroutines
Expand All @@ -125,13 +123,22 @@ func Run(options AgentOptions) {
}()
}()

agent.handleSignals()
agent.handleShutdown()
go func() {
<-agent.done
log.Debug().Msg("Agent done.")
cancelFunc()
}()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
defer close(agent.done)
<-c
log.Debug().Msg("Ctrl-C pressed.")
}()

agent.ui.DisplayTrayIcon(agent)
agent.ui.Run()
defer cancelFunc()

wg.Wait()
}

// Register runs a registration flow. It either prompts the user for needed
Expand Down Expand Up @@ -227,14 +234,6 @@ func (agent *Agent) handleShutdown() {
}()
}

func (agent *Agent) handleCancellation(ctx context.Context) {
go func() {
<-ctx.Done()
log.Debug().Msg("Context canceled.")
os.Exit(1)
}()
}

// Agent satisfies ui.Agent, tracker.Agent and api.Agent interfaces

func (agent *Agent) IsHeadless() bool {
Expand All @@ -254,6 +253,7 @@ func (agent *Agent) AppVersion() string {
}

func (agent *Agent) Stop() {
log.Debug().Msg("Stopping agent.")
close(agent.done)
}

Expand Down
10 changes: 7 additions & 3 deletions internal/agent/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ func (agent *Agent) runNotificationsWorker(ctx context.Context, options AgentOpt
go func() {
defer wg.Done()
for {
restartCh := make(chan struct{})
api.StartWebsocket(ctx, agent, notifyCh, restartCh)
<-restartCh
select {
case <-ctx.Done():
log.Debug().Msg("Stopping websocket.")
return
default:
api.StartWebsocket(ctx, agent, notifyCh)
}
}
}()

Expand Down
20 changes: 8 additions & 12 deletions internal/agent/ui/fyneUI/fyneUI.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ server (if not auto-detected) and long-lived access token.`
)

type fyneUI struct {
app fyne.App
mainWindow fyne.Window
text *translations.Translator
app fyne.App
text *translations.Translator
}

func (i *fyneUI) Run() {
Expand Down Expand Up @@ -79,10 +78,6 @@ func NewFyneUI(agent ui.Agent) *fyneUI {
text: translations.NewTranslator(),
}
i.app.SetIcon(&ui.TrayIcon{})
i.mainWindow = i.app.NewWindow(agent.AppName())
i.mainWindow.SetCloseIntercept(func() {
i.mainWindow.Hide()
})
return i
}
return &fyneUI{}
Expand All @@ -97,6 +92,7 @@ func (i *fyneUI) DisplayTrayIcon(agent ui.Agent) {
}
if desk, ok := i.app.(desktop.App); ok {
menuItemQuit := fyne.NewMenuItem(i.text.Translate("Quit"), func() {
i.app.Quit()
agent.Stop()
})
menuItemQuit.IsQuit = true
Expand Down Expand Up @@ -144,29 +140,29 @@ func (i *fyneUI) DisplayTrayIcon(agent ui.Agent) {
// complete registration. It will populate with any values that were already
// provided via the command-line.
func (i *fyneUI) DisplayRegistrationWindow(ctx context.Context, agent ui.Agent, done chan struct{}) {
i.mainWindow.SetTitle(i.text.Translate("App Registration"))
w := i.app.NewWindow(i.text.Translate("App Registration"))

var allFormItems []*widget.FormItem

allFormItems = append(allFormItems, i.serverConfigItems(ctx, agent, i.text)...)
registrationForm := widget.NewForm(allFormItems...)
registrationForm.OnSubmit = func() {
i.mainWindow.Hide()
w.Close()
close(done)
}
registrationForm.OnCancel = func() {
log.Warn().Msg("Canceling registration.")
close(done)
i.mainWindow.Close()
w.Close()
ctx.Done()
}

i.mainWindow.SetContent(container.New(layout.NewVBoxLayout(),
w.SetContent(container.New(layout.NewVBoxLayout(),
widget.NewLabel(i.text.Translate(explainRegistration)),
registrationForm,
))
log.Debug().Msg("Asking user for registration details.")
i.mainWindow.Show()
w.Show()
}

// aboutWindow creates a window that will show some interesting information
Expand Down
2 changes: 1 addition & 1 deletion internal/device/helpers/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func PollSensors(ctx context.Context, updater func(), interval, stdev time.Durat
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-ticker.C:
updater()
Expand Down
64 changes: 37 additions & 27 deletions internal/hass/api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type websocketResponse struct {
Success bool `json:"success,omitempty"`
}

func StartWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string, doneCh chan struct{}) {
func StartWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string) {
var websocketURL string
if err := settings.GetConfig(config.PrefWebsocketURL, &websocketURL); err != nil {
log.Warn().Err(err).Msg("Could not retrieve websocket URL from config.")
Expand All @@ -60,7 +60,7 @@ func StartWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string
retryFunc := func() error {
var resp *http.Response
socket, resp, err = gws.NewClient(
newWebsocket(ctx, settings, notifyCh, doneCh),
newWebsocket(ctx, settings, notifyCh),
&gws.ClientOption{Addr: websocketURL})
if err != nil {
log.Error().Err(err).
Expand All @@ -77,7 +77,11 @@ func StartWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string
return
}
log.Trace().Caller().Msg("Websocket connection established.")
go socket.ReadLoop()
go func() {
<-ctx.Done()
socket.WriteClose(1000, nil)
}()
socket.ReadLoop()
}

type webSocketData struct {
Expand All @@ -94,7 +98,7 @@ type WebSocket struct {
nextID uint64
}

func newWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string, doneCh chan struct{}) *WebSocket {
func newWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string) *WebSocket {
var token, webhookID string
if err := settings.GetConfig(config.PrefToken, &token); err != nil {
log.Warn().Err(err).Msg("Could not retrieve token from config.")
Expand All @@ -110,23 +114,24 @@ func newWebsocket(ctx context.Context, settings Agent, notifyCh chan [2]string,
WriteCh: make(chan *webSocketData),
token: token,
webhookID: webhookID,
doneCh: doneCh,
doneCh: make(chan struct{}),
}
go ws.responseHandler(ctx, notifyCh)
go ws.requestHandler(ctx)
go func() {
<-ctx.Done()
close(ws.doneCh)
}()
go ws.responseHandler(notifyCh)
go ws.requestHandler()
return ws
}

func (c *WebSocket) OnError(socket *gws.Conn, err error) {
log.Error().Err(err).
Msg("Error on websocket")
c.doneCh <- struct{}{}
}

func (c *WebSocket) OnClose(socket *gws.Conn, err error) {
log.Debug().Err(err).Msg("Websocket connection closed.")
c.doneCh <- struct{}{}
close(c.doneCh)
}

func (c *WebSocket) OnPong(socket *gws.Conn, payload []byte) {
Expand All @@ -137,20 +142,25 @@ func (c *WebSocket) OnOpen(socket *gws.Conn) {
log.Trace().Caller().Msg("Websocket opened.")
go func() {
ticker := time.NewTicker(PingInterval)
for range ticker.C {
log.Trace().Caller().
Msg("Sending ping on websocket")
if err := socket.SetDeadline(time.Now().Add(2 * PingInterval)); err != nil {
log.Error().Err(err).
Msg("Error setting deadline on websocket.")
for {
select {
case <-c.doneCh:
return
}
c.WriteCh <- &webSocketData{
conn: socket,
data: &websocketMsg{
Type: "ping",
ID: atomic.LoadUint64(&c.nextID),
},
case <-ticker.C:
log.Trace().Caller().
Msg("Sending ping on websocket")
if err := socket.SetDeadline(time.Now().Add(2 * PingInterval)); err != nil {
log.Error().Err(err).
Msg("Error setting deadline on websocket.")
return
}
c.WriteCh <- &webSocketData{
conn: socket,
data: &websocketMsg{
Type: "ping",
ID: atomic.LoadUint64(&c.nextID),
},
}
}
}
}()
Expand All @@ -175,10 +185,10 @@ func (c *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {
}
}

func (c *WebSocket) responseHandler(ctx context.Context, notifyCh chan [2]string) {
func (c *WebSocket) responseHandler(notifyCh chan [2]string) {
for {
select {
case <-ctx.Done():
case <-c.doneCh:
log.Trace().Caller().Msg("Stopping websocket response handler.")
return
case r := <-c.ReadCh:
Expand Down Expand Up @@ -237,10 +247,10 @@ func (c *WebSocket) responseHandler(ctx context.Context, notifyCh chan [2]string
}
}

func (c *WebSocket) requestHandler(ctx context.Context) {
func (c *WebSocket) requestHandler() {
for {
select {
case <-ctx.Done():
case <-c.doneCh:
log.Trace().Caller().Msg("Stopping websocket request handler.")
return
case m := <-c.WriteCh:
Expand Down

0 comments on commit ec7a7e0

Please sign in to comment.