Skip to content

Commit

Permalink
Work on Centrifuge v0.29.0 (#284)
Browse files Browse the repository at this point in the history
Co-authored-by: frakl <[email protected]>
  • Loading branch information
FZambia and bfwbbrj authored Mar 26, 2023
1 parent 8db9218 commit 94c5b86
Show file tree
Hide file tree
Showing 76 changed files with 2,291 additions and 3,701 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
strategy:
matrix:
go-version: [1.18, 1.19]
go-version: ["1.19", "1.20"]
redis-version: [5, 6, 7]
steps:
- name: Install Go stable version
Expand All @@ -43,7 +43,7 @@ jobs:
run: go test -v -race -tags integration -coverprofile=coverage.out $(go list ./... | grep -v /_examples/)

- name: Upload code coverage to codecov
if: matrix.go-version == '1.18'
if: matrix.go-version == '1.20'
uses: codecov/codecov-action@v3
with:
file: ./coverage.out
11 changes: 3 additions & 8 deletions _examples/chat_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,11 @@ func channelSubscribeAllowed(channel string) bool {

func main() {
node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
HistoryMetaTTL: 24 * time.Hour,
})

// Override default broker which does not use HistoryMetaTTL.
broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{
HistoryMetaTTL: 120 * time.Second,
})
node.SetBroker(broker)

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
Expand Down
8 changes: 4 additions & 4 deletions _examples/chat_oauth2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -224,7 +224,7 @@ func callbackHandler(w http.ResponseWriter, r *http.Request) {
}
defer func() { _ = response.Body.Close() }()

contents, err := ioutil.ReadAll(response.Body)
contents, err := io.ReadAll(response.Body)
if err != nil {
log.Printf("Error reading body %s\n", err.Error())
return
Expand Down Expand Up @@ -277,7 +277,7 @@ func getSessionValue(session *sessions.Session, key string) (string, error) {
if err != nil {
return "", err
}
s, err := ioutil.ReadAll(r)
s, err := io.ReadAll(r)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func Logout(res http.ResponseWriter, req *http.Request) error {
return err
}
session.Options.MaxAge = -1
session.Values = make(map[interface{}]interface{})
session.Values = make(map[any]any)
err = session.Save(req, res)
if err != nil {
return errors.New("could not delete user session")
Expand Down
4 changes: 1 addition & 3 deletions _examples/concurrency/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ func main() {
LogHandler: handleLog,
})

broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{
HistoryMetaTTL: 120 * time.Second,
})
broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{})
node.SetBroker(broker)

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
Expand Down
2 changes: 1 addition & 1 deletion _examples/custom_broker_nats/natsbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type NatsBroker struct {
}

// History ...
func (b *NatsBroker) History(_ string, _ centrifuge.HistoryFilter) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) {
func (b *NatsBroker) History(_ string, _ centrifuge.HistoryOptions) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) {
return nil, centrifuge.StreamPosition{}, centrifuge.ErrorNotAvailable
}

Expand Down
23 changes: 12 additions & 11 deletions _examples/custom_engine_tarantool/tntengine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (b *Broker) Subscribe(ch string) error {
return centrifuge.ErrorBadRequest
}
if b.node.LogEnabled(centrifuge.LogLevelDebug) {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]interface{}{"channel": ch}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]any{"channel": ch}))
}
r := newSubRequest([]string{ch}, true)
s := b.shards[consistentIndex(ch, len(b.shards))]
Expand All @@ -239,7 +239,7 @@ func (b *Broker) Subscribe(ch string) error {
// Unsubscribe - see centrifuge.Broker interface description.
func (b *Broker) Unsubscribe(ch string) error {
if b.node.LogEnabled(centrifuge.LogLevelDebug) {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]interface{}{"channel": ch}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]any{"channel": ch}))
}
r := newSubRequest([]string{ch}, false)
s := b.shards[consistentIndex(ch, len(b.shards))]
Expand Down Expand Up @@ -348,7 +348,8 @@ func (m *historyResponse) DecodeMsgpack(d *msgpack.Decoder) error {
}

// History - see centrifuge.Broker interface description.
func (b *Broker) History(ch string, filter centrifuge.HistoryFilter) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) {
func (b *Broker) History(ch string, opts centrifuge.HistoryOptions) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) {
filter := opts.Filter
var includePubs = true
var offset uint64
if filter.Since != nil {
Expand Down Expand Up @@ -467,7 +468,7 @@ func (m *pubSubMessage) DecodeMsgpack(d *msgpack.Decoder) error {

func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) {
logError := func(errString string) {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]interface{}{"error": errString}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]any{"error": errString}))
}

u, err := uuid.NewRandom()
Expand Down Expand Up @@ -607,7 +608,7 @@ func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler)
case n := <-ch:
err := b.handleMessage(eventHandler, n)
if err != nil {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]interface{}{"error": err.Error()}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]any{"error": err.Error()}))
continue
}
}
Expand All @@ -632,7 +633,7 @@ func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler)
r := newSubRequest(batch, true)
err := b.sendSubscribe(s, r)
if err != nil {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()}))
closeDoneOnce()
return
}
Expand All @@ -644,7 +645,7 @@ func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler)
r := newSubRequest(batch, true)
err := b.sendSubscribe(s, r)
if err != nil {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()}))
closeDoneOnce()
return
}
Expand Down Expand Up @@ -675,10 +676,10 @@ func (b *Broker) waitPubSubMessages(conn *tarantool.Connection, connID string, c
_, err := conn.ExecContext(ctx, tarantool.Call(
"centrifuge.get_messages",
pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25},
).WithPushTyped(func(decode func(interface{}) error) {
).WithPushTyped(func(decode func(any) error) {
var m [][]pubSubMessage
if err := decode(&m); err != nil {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]interface{}{"error": err.Error()}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]any{"error": err.Error()}))
return
}
if len(m) == 1 {
Expand Down Expand Up @@ -738,7 +739,7 @@ func (b *Broker) handleMessage(eventHandler centrifuge.BrokerEventHandler, msg p

func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) {
logError := func(errString string) {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]interface{}{"error": errString}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]any{"error": errString}))
}

u, err := uuid.NewRandom()
Expand Down Expand Up @@ -795,7 +796,7 @@ func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventH
case n := <-workCh:
err := eventHandler.HandleControl(n.Data)
if err != nil {
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling control message", map[string]interface{}{"error": err.Error()}))
b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling control message", map[string]any{"error": err.Error()}))
continue
}
}
Expand Down
10 changes: 6 additions & 4 deletions _examples/custom_engine_tarantool/tntengine/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,17 @@ func BenchmarkTarantoolRecover_OneChannel_Parallel(b *testing.B) {
_, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{HistorySize: numMessages, HistoryTTL: 300 * time.Second})
require.NoError(b, err)
}
_, sp, err := broker.History("channel", centrifuge.HistoryFilter{})
_, sp, err := broker.History("channel", centrifuge.HistoryOptions{})
require.NoError(b, err)
b.ResetTimer()
b.SetParallelism(128)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
pubs, _, err := broker.History("channel", centrifuge.HistoryFilter{
Limit: -1,
Since: &centrifuge.StreamPosition{Offset: sp.Offset - uint64(numMissing), Epoch: ""},
pubs, _, err := broker.History("channel", centrifuge.HistoryOptions{
Filter: centrifuge.HistoryFilter{
Limit: -1,
Since: &centrifuge.StreamPosition{Offset: sp.Offset - uint64(numMissing), Epoch: ""},
},
})
if err != nil {
b.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion _examples/custom_engine_tarantool/tntengine/multi_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *MultiConnection) IsLeader(conn *tarantool.Connection) (bool, error) {
if c.opts.ConnectionMode == ConnectionModeLeaderFollowerRaft {
leaderCheck = "return box.info.election.state == 'leader'"
}
resp, err := conn.ExecContext(ctx, tarantool.Eval(leaderCheck, []interface{}{}))
resp, err := conn.ExecContext(ctx, tarantool.Eval(leaderCheck, []any{}))
if err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions _examples/custom_engine_tarantool/tntengine/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (m PresenceManager) Presence(ch string) (map[string]*centrifuge.ClientInfo,
if len(res.Data) == 0 {
return nil, errors.New("malformed presence result")
}
presenceInterfaceSlice, ok := res.Data[0].([]interface{})
presenceInterfaceSlice, ok := res.Data[0].([]any)
if !ok {
return nil, errors.New("malformed presence format: map expected")
}
presence := make(map[string]*centrifuge.ClientInfo, len(presenceInterfaceSlice))
for _, v := range presenceInterfaceSlice {
presenceRow, ok := v.([]interface{})
presenceRow, ok := v.([]any)
if !ok {
return nil, errors.New("malformed presence format: tuple expected")
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/custom_engine_tarantool/tntengine/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Shard) Exec(request *tarantool.Request) (*tarantool.Response, error) {
return conn.Exec(request)
}

func (s *Shard) ExecTyped(request *tarantool.Request, result interface{}) error {
func (s *Shard) ExecTyped(request *tarantool.Request, result any) error {
conn, err := s.mc.LeaderConn()
if err != nil {
return err
Expand Down
4 changes: 1 addition & 3 deletions _examples/custom_token/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ func main() {
ClientExpiredCloseDelay: 5 * time.Second,
})

broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{
HistoryMetaTTL: 120 * time.Second,
})
broker, _ := centrifuge.NewMemoryBroker(node, centrifuge.MemoryBrokerConfig{})
node.SetBroker(broker)

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
Expand Down
12 changes: 8 additions & 4 deletions _examples/custom_ws_gobwas/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io/ioutil"
"net"
"sync"
"time"

"github.com/centrifugal/centrifuge"
"github.com/centrifugal/protocol"
Expand Down Expand Up @@ -60,9 +61,12 @@ func (t *customWebsocketTransport) DisabledPushFlags() uint64 {
return centrifuge.PushFlagDisconnect
}

// AppLevelPing not implemented here, example only works over ProtocolVersion1.
func (t *customWebsocketTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{}
// PingPongConfig ...
func (t *customWebsocketTransport) PingPongConfig() centrifuge.PingPongConfig {
return centrifuge.PingPongConfig{
PingInterval: 25 * time.Second,
PongTimeout: 10 * time.Second,
}
}

func (t *customWebsocketTransport) read() ([]byte, bool, error) {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (t *customWebsocketTransport) Close(disconnect centrifuge.Disconnect) error
t.mu.Unlock()

if disconnect != centrifuge.DisconnectConnectionClosed {
data := ws.NewCloseFrameBody(ws.StatusCode(disconnect.Code), disconnect.CloseText(t.ProtocolVersion()))
data := ws.NewCloseFrameBody(ws.StatusCode(disconnect.Code), disconnect.Reason)
_ = wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
return t.conn.Close()
}
Expand Down
19 changes: 11 additions & 8 deletions _examples/custom_ws_nhooyr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,12 @@ func (t *customWebsocketTransport) DisabledPushFlags() uint64 {
return centrifuge.PushFlagDisconnect
}

// AppLevelPing not implemented here, example only works over ProtocolVersion1.
func (t *customWebsocketTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{}
// PingPongConfig ...
func (t *customWebsocketTransport) PingPongConfig() centrifuge.PingPongConfig {
return centrifuge.PingPongConfig{
PingInterval: 25 * time.Second,
PongTimeout: 10 * time.Second,
}
}

// Write ...
Expand Down Expand Up @@ -285,7 +288,7 @@ func (t *customWebsocketTransport) Close(disconnect centrifuge.Disconnect) error
t.mu.Unlock()

if disconnect != centrifuge.DisconnectConnectionClosed {
return t.conn.Close(websocket.StatusCode(disconnect.Code), disconnect.CloseText(t.ProtocolVersion()))
return t.conn.Close(websocket.StatusCode(disconnect.Code), disconnect.Reason)
}
return t.conn.Close(websocket.StatusNormalClosure, "")
}
Expand All @@ -294,7 +297,7 @@ func (s *customWebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Reque

conn, err := websocket.Accept(rw, r, &websocket.AcceptOptions{})
if err != nil {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "websocket upgrade error", map[string]interface{}{"error": err.Error()}))
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "websocket upgrade error", map[string]any{"error": err.Error()}))
return
}

Expand All @@ -314,13 +317,13 @@ func (s *customWebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Reque

c, closeFn, err := centrifuge.NewClient(r.Context(), s.node, transport)
if err != nil {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]interface{}{"transport": websocketTransportName}))
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]any{"transport": websocketTransportName}))
return
}
defer func() { _ = closeFn() }()
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]interface{}{"client": c.ID(), "transport": websocketTransportName}))
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"client": c.ID(), "transport": websocketTransportName}))
defer func(started time.Time) {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]interface{}{"client": c.ID(), "transport": websocketTransportName, "duration": time.Since(started)}))
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"client": c.ID(), "transport": websocketTransportName, "duration": time.Since(started)}))
}(time.Now())

for {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ func main() {
}

broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
// Use reasonably large expiration interval for stream meta key,
// much bigger than maximum HistoryLifetime value in Node config.
// This way stream metadata will expire, in some cases you may want
// to prevent its expiration setting this to zero value.
HistoryMetaTTL: 7 * 24 * time.Hour,

// And configure a couple of shards to use.
Shards: redisShards,
})
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 1 addition & 11 deletions _examples/experimental/http3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ func main() {
}

broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
// Use reasonably large expiration interval for stream meta key,
// much bigger than maximum HistoryLifetime value in Node config.
// This way stream metadata will expire, in some cases you may want
// to prevent its expiration setting this to zero value.
HistoryMetaTTL: 7 * 24 * time.Hour,

// And configure a couple of shards to use.
Shards: redisShards,
})
Expand Down Expand Up @@ -254,11 +248,7 @@ func main() {

mux := http.NewServeMux()

mux.Handle("/connection/websocket", authMiddleware(
centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ProtocolVersion: centrifuge.ProtocolVersion2,
}),
))
mux.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
mux.Handle("/connection/http_stream", authMiddleware(centrifuge.NewHTTPStreamHandler(node, centrifuge.HTTPStreamConfig{})))
mux.Handle("/connection/sse", authMiddleware(centrifuge.NewSSEHandler(node, centrifuge.SSEConfig{})))
mux.Handle("/emulation", centrifuge.NewEmulationHandler(node, centrifuge.EmulationConfig{}))
Expand Down
Loading

0 comments on commit 94c5b86

Please sign in to comment.