Skip to content

Commit

Permalink
Merge pull request #38 from bacalhau-project/better-reconnecting-webs…
Browse files Browse the repository at this point in the history
…ocket

use recws library to do thread-safe websocket reconnection
  • Loading branch information
lukemarsden authored Nov 15, 2023
2 parents e1b2357 + 60def33 commit f70e4e1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 42 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ require (
k8s.io/apimachinery v0.28.3
)

require github.com/jpillora/backoff v1.0.0 // indirect

require (
dario.cat/mergo v1.0.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down Expand Up @@ -85,6 +87,7 @@ require (
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/recws-org/recws v1.4.0
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down Expand Up @@ -289,6 +291,8 @@ github.com/prometheus/client_golang v1.12.0 h1:C+UIj/QWtmqY13Arb8kwMt5j34/0Z2iKa
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a h1:CmF68hwI0XsOQ5UwlBopMi2Ow4Pbg32akc4KIVCOm+Y=
github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/recws-org/recws v1.4.0 h1:y9LLddtAicjejikNZXiaY9DQjIwcAQ82acd1XU6n0lU=
github.com/recws-org/recws v1.4.0/go.mod h1:7+NQkTmBdU98VSzkzq9/P7+X0xExioUVBx9OeRKQIkk=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
61 changes: 19 additions & 42 deletions pkg/http/websocket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/recws-org/recws"
"github.com/rs/zerolog/log"
)

Expand All @@ -13,53 +14,30 @@ func ConnectWebSocket(
url string,
messageChan chan []byte,
ctx context.Context,
) *websocket.Conn {
closed := false
) *recws.RecConn {

var conn *websocket.Conn
ws := &recws.RecConn{
KeepAliveTimeout: 10 * time.Second,
}
ws.Dial(url, nil)

go func() {
for {
select {
case <-ctx.Done():
closed = true
if conn != nil {
conn.Close()
}

go ws.Close()
log.Printf("Websocket closed %s", ws.GetURL())
return
}
}
}()

for {
var err error
log.Debug().Msgf("WebSocket connection connecting: %s", url)
conn, _, err = websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Error().Msgf("WebSocket connection failed: %s\nReconnecting in 2 seconds...", err)
if closed {
break
}
time.Sleep(2 * time.Second)
continue
}
break
}

if !closed {
go func() {
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
if closed {
return
}
log.Error().Msgf("Read error: %s\nReconnecting in 2 seconds...", err)
time.Sleep(2 * time.Second)
conn = ConnectWebSocket(url, messageChan, ctx)
default:
if !ws.IsConnected() {
log.Printf("Websocket disconnected %s", ws.GetURL())
continue
}
messageType, p, err := ws.ReadMessage()
if err != nil {
log.Printf("Error: ReadMessage %s", ws.GetURL())
return
}
if messageType == websocket.TextMessage {
log.Debug().
Str("action", "ws READ").
Expand All @@ -68,8 +46,7 @@ func ConnectWebSocket(
messageChan <- p
}
}
}()
}

return conn
}
}()
return ws
}

0 comments on commit f70e4e1

Please sign in to comment.