From 60def33a9c409587b52d2d85e3c727ca6549cf3d Mon Sep 17 00:00:00 2001 From: Luke Marsden Date: Tue, 14 Nov 2023 15:58:52 +0000 Subject: [PATCH] use recws library to do thread-safe websocket reconnection --- go.mod | 3 ++ go.sum | 4 +++ pkg/http/websocket_client.go | 61 +++++++++++------------------------- 3 files changed, 26 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index 6795aacc..5fe9829b 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,8 @@ require ( k8s.io/apimachinery v0.28.3 ) +require github.com/jpillora/backoff v1.0.0 // indirect + require ( dario.cat/mergo v1.0.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect @@ -85,6 +87,7 @@ require ( github.com/pjbgf/sha1cd v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect + github.com/recws-org/recws v1.4.0 github.com/rivo/uniseg v0.2.0 // indirect github.com/sergi/go-diff v1.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect diff --git a/go.sum b/go.sum index da1d0817..edcb2fd9 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= @@ -289,6 +291,8 @@ github.com/prometheus/client_golang v1.12.0 h1:C+UIj/QWtmqY13Arb8kwMt5j34/0Z2iKa github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a h1:CmF68hwI0XsOQ5UwlBopMi2Ow4Pbg32akc4KIVCOm+Y= github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= +github.com/recws-org/recws v1.4.0 h1:y9LLddtAicjejikNZXiaY9DQjIwcAQ82acd1XU6n0lU= +github.com/recws-org/recws v1.4.0/go.mod h1:7+NQkTmBdU98VSzkzq9/P7+X0xExioUVBx9OeRKQIkk= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/http/websocket_client.go b/pkg/http/websocket_client.go index f8126b61..c44b2c05 100644 --- a/pkg/http/websocket_client.go +++ b/pkg/http/websocket_client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/recws-org/recws" "github.com/rs/zerolog/log" ) @@ -13,53 +14,30 @@ func ConnectWebSocket( url string, messageChan chan []byte, ctx context.Context, -) *websocket.Conn { - closed := false +) *recws.RecConn { - var conn *websocket.Conn + ws := &recws.RecConn{ + KeepAliveTimeout: 10 * time.Second, + } + ws.Dial(url, nil) go func() { for { select { case <-ctx.Done(): - closed = true - if conn != nil { - conn.Close() - } - + go ws.Close() + log.Printf("Websocket closed %s", ws.GetURL()) return - } - } - }() - - for { - var err error - log.Debug().Msgf("WebSocket connection connecting: %s", url) - conn, _, err = websocket.DefaultDialer.Dial(url, nil) - if err != nil { - log.Error().Msgf("WebSocket connection failed: %s\nReconnecting in 2 seconds...", err) - if closed { - break - } - time.Sleep(2 * time.Second) - continue - } - break - } - - if !closed { - go func() { - for { - messageType, p, err := conn.ReadMessage() - if err != nil { - if closed { - return - } - log.Error().Msgf("Read error: %s\nReconnecting in 2 seconds...", err) - time.Sleep(2 * time.Second) - conn = ConnectWebSocket(url, messageChan, ctx) + default: + if !ws.IsConnected() { + log.Printf("Websocket disconnected %s", ws.GetURL()) continue } + messageType, p, err := ws.ReadMessage() + if err != nil { + log.Printf("Error: ReadMessage %s", ws.GetURL()) + return + } if messageType == websocket.TextMessage { log.Debug(). Str("action", "ws READ"). @@ -68,8 +46,7 @@ func ConnectWebSocket( messageChan <- p } } - }() - } - - return conn + } + }() + return ws }