Skip to content

Commit

Permalink
Centrifuge bidirectional emulation layer and client protocol v2 tweaks (
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Jun 28, 2022
1 parent 003a596 commit 01c7b2d
Show file tree
Hide file tree
Showing 67 changed files with 3,557 additions and 667 deletions.
3 changes: 3 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ coverage:
patch: off
ignore:
- "internal/controlpb/control*" # generated code
- "emulation.go" # experimental code
- "handler_http_stream.go" # experimental code
- "handler_sse.go" # experimental code
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
strategy:
matrix:
go-version: [1.16, 1.17]
go-version: [1.17, 1.18]
redis-version: [6]
steps:
- name: Install Go stable version
Expand All @@ -49,7 +49,7 @@ jobs:
run: go test -v -race -tags integration -coverprofile=coverage.out $(go list ./... | grep -v /_examples/)

- name: Upload code coverage to codecov
if: matrix.go-version == '1.17'
if: matrix.go-version == '1.18'
uses: codecov/codecov-action@v1
with:
file: ./coverage.out
18 changes: 9 additions & 9 deletions _examples/chat_json/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// const centrifuge = new Centrifuge('http://localhost:8000/connection/sockjs', {
// sockjsTransports: [
// "xhr-streaming",
// ]
// ],
// });

centrifuge.setConnectData({"user-agent": navigator.userAgent});
Expand Down Expand Up @@ -83,7 +83,7 @@
// subscribe on channel and bind various event listeners. Actual
// subscription request will be sent after client connects to
// a server.
const sub = centrifuge.subscribe(channel, handleMessage)
const sub = centrifuge.subscribe(channel, handlePublication)
.on("join", handleJoin)
.on("leave", handleLeave)
.on("unsubscribe", handleUnsubscribe)
Expand Down Expand Up @@ -117,7 +117,7 @@
drawText('Error subscribing on channel ' + err.channel + ': ' + err.message);
}

function handleMessage(message) {
function handlePublication(message) {
let clientID;
if (message.info){
clientID = message.info.client;
Expand All @@ -129,16 +129,16 @@
drawText(text);
}

function handleJoin(message) {
drawText('Client joined channel ' + this.channel + ' (uid ' + message.info["client"] + ', user '+ message.info["user"] +')');
function handleJoin(ctx) {
drawText('Client joined channel ' + this.channel + ' (uid ' + ctx.info["client"] + ', user '+ ctx.info["user"] +')');
}

function handleLeave(message) {
drawText('Client left channel ' + this.channel + ' (uid ' + message.info["client"] + ', user '+ message.info["user"] +')');
function handleLeave(ctx) {
drawText('Client left channel ' + this.channel + ' (uid ' + ctx.info["client"] + ', user '+ ctx.info["user"] +')');
}

function handleUnsubscribe(sub) {
drawText('Unsubscribed from channel ' + sub.channel);
function handleUnsubscribe(ctx) {
drawText('Unsubscribed from channel ' + ctx.channel + ', ' + JSON.stringify(ctx));
}

function drawText(text) {
Expand Down
40 changes: 23 additions & 17 deletions _examples/chat_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main() {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
Data: []byte(`{}`),
// Subscribe to personal several server-side channel.
// Subscribe to a personal server-side channel.
Subscriptions: map[string]centrifuge.SubscribeOptions{
"#" + cred.UserID: {Recover: true, Presence: true, JoinLeave: true},
},
Expand All @@ -85,7 +85,7 @@ func main() {

node.OnConnect(func(client *centrifuge.Client) {
transport := client.Transport()
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())
log.Printf("[user %s] connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

// Event handler should not block, so start separate goroutine to
// periodically send messages to client.
Expand All @@ -106,23 +106,22 @@ func main() {
}
}()

client.OnAlive(func() {
log.Printf("user %s connection is still active", client.UserID())
})

client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) {
log.Printf("user %s connection is going to expire, refreshing", client.UserID())
log.Printf("[user %s] connection is going to expire, refreshing", client.UserID())

cb(centrifuge.RefreshReply{
ExpireAt: time.Now().Unix() + 60,
}, nil)
})

client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
log.Printf("[user %s] subscribes on %s", client.UserID(), e.Channel)

if !channelSubscribeAllowed(e.Channel) {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied)
return
}

cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Recover: true,
Expand All @@ -133,12 +132,9 @@ func main() {
}, nil)
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
})

client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))
log.Printf("[user %s[ publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))

if !client.IsSubscribed(e.Channel) {
cb(centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied)
return
Expand All @@ -163,14 +159,16 @@ func main() {
})

client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
log.Printf("RPC from user: %s, data: %s, method: %s", client.UserID(), string(e.Data), e.Method)
log.Printf("[user %s] sent RPC, data: %s, method: %s", client.UserID(), string(e.Data), e.Method)

cb(centrifuge.RPCReply{
Data: []byte(`{"year": "2020"}`),
}, nil)
})

client.OnPresence(func(e centrifuge.PresenceEvent, cb centrifuge.PresenceCallback) {
log.Printf("user %s calls presence on %s", client.UserID(), e.Channel)
log.Printf("[user %s] calls presence on %s", client.UserID(), e.Channel)

if !client.IsSubscribed(e.Channel) {
cb(centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied)
return
Expand All @@ -179,11 +177,19 @@ func main() {
})

client.OnMessage(func(e centrifuge.MessageEvent) {
log.Printf("message from user: %s, data: %s", client.UserID(), string(e.Data))
log.Printf("[user %s] sent message, data: %s", client.UserID(), string(e.Data))
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
log.Printf("[user %s] unsubscribed from %s: %s", client.UserID(), e.Channel, e.Reason)
})

client.OnAlive(func() {
log.Printf("[user %s] connection is still active", client.UserID())
})

client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
log.Printf("[user %s] disconnected: %s", client.UserID(), e.Reason)
})
})

Expand Down
4 changes: 4 additions & 0 deletions _examples/chat_oauth2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func callbackHandler(w http.ResponseWriter, r *http.Request) {
defer func() { _ = response.Body.Close() }()

contents, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Printf("Error reading body %s\n", err.Error())
return
}

var user *GoogleUser
err = json.Unmarshal(contents, &user)
Expand Down
9 changes: 7 additions & 2 deletions _examples/custom_ws_gobwas/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (t *customWebsocketTransport) Unidirectional() bool {
return false
}

// Emulation ...
func (t *customWebsocketTransport) Emulation() bool {
return false
}

// DisabledPushFlags ...
func (t *customWebsocketTransport) DisabledPushFlags() uint64 {
return centrifuge.PushFlagDisconnect
Expand Down Expand Up @@ -125,7 +130,7 @@ func (t *customWebsocketTransport) WriteMany(messages ...[]byte) error {
}

// Close implementation.
func (t *customWebsocketTransport) Close(disconnect *centrifuge.Disconnect) error {
func (t *customWebsocketTransport) Close(disconnect centrifuge.Disconnect) error {
t.mu.Lock()
if t.closed {
t.mu.Unlock()
Expand All @@ -135,7 +140,7 @@ func (t *customWebsocketTransport) Close(disconnect *centrifuge.Disconnect) erro
close(t.closeCh)
t.mu.Unlock()

if disconnect != nil {
if disconnect != centrifuge.DisconnectConnectionClosed {
data := ws.NewCloseFrameBody(ws.StatusCode(disconnect.Code), disconnect.CloseText(t.ProtocolVersion()))
_ = wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
return t.conn.Close()
Expand Down
10 changes: 7 additions & 3 deletions _examples/custom_ws_nhooyr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ type customWebsocketTransport struct {

conn *websocket.Conn
protoType centrifuge.ProtocolType
request *http.Request
}

func newWebsocketTransport(conn *websocket.Conn, protoType centrifuge.ProtocolType) *customWebsocketTransport {
Expand Down Expand Up @@ -211,6 +210,11 @@ func (t *customWebsocketTransport) Unidirectional() bool {
return false
}

// Emulation ...
func (t *customWebsocketTransport) Emulation() bool {
return false
}

// DisabledPushFlags ...
func (t *customWebsocketTransport) DisabledPushFlags() uint64 {
return centrifuge.PushFlagDisconnect
Expand Down Expand Up @@ -270,7 +274,7 @@ func (t *customWebsocketTransport) WriteMany(messages ...[]byte) error {
}

// Close ...
func (t *customWebsocketTransport) Close(disconnect *centrifuge.Disconnect) error {
func (t *customWebsocketTransport) Close(disconnect centrifuge.Disconnect) error {
t.mu.Lock()
if t.closed {
t.mu.Unlock()
Expand All @@ -280,7 +284,7 @@ func (t *customWebsocketTransport) Close(disconnect *centrifuge.Disconnect) erro
close(t.closeCh)
t.mu.Unlock()

if disconnect != nil {
if disconnect != centrifuge.DisconnectConnectionClosed {
return t.conn.Close(websocket.StatusCode(disconnect.Code), disconnect.CloseText(t.ProtocolVersion()))
}
return t.conn.Close(websocket.StatusNormalClosure, "")
Expand Down
1 change: 1 addition & 0 deletions _examples/experimental/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This folder contains examples which use experimental features of Centrifuge library. These examples may require additional local setup to make them work.
Loading

0 comments on commit 01c7b2d

Please sign in to comment.