Skip to content

Commit

Permalink
disconnect improvements (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Nov 4, 2020
1 parent 47dfbae commit bc633f1
Show file tree
Hide file tree
Showing 16 changed files with 579 additions and 82 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ jobs:
golangci:
name: Lint
runs-on: ubuntu-latest
# Prevent duplicate builds on internal PRs.
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
steps:
- uses: actions/checkout@v2
- name: golangci-lint
Expand All @@ -13,6 +15,8 @@ jobs:
build:
name: Test with Go ${{ matrix.go-version }}
runs-on: ubuntu-latest
# Prevent duplicate builds on internal PRs.
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
strategy:
matrix:
go-version: [1.14, 1.15]
Expand Down
43 changes: 43 additions & 0 deletions _examples/single_connection/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title></title>
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
<script type="text/javascript" src="//cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script type="text/javascript" src="//cdn.jsdelivr.net/gh/centrifugal/centrifuge-js@latest/dist/centrifuge.min.js"></script>
<script type="text/javascript">
$(function () {
const container = $('#messages');

const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket');
// var centrifuge = new Centrifuge('http://localhost:8000/connection/sockjs', {
// sockjsTransports: [
// "xhr-streaming",
// ]
// });

// bind listeners on centrifuge object instance events.
centrifuge.on('connect', function(ctx){
drawText('Connected with client ID ' + ctx.client + ' over ' + ctx.transport);
});

centrifuge.on('disconnect', function(ctx){
drawText('Disconnected: ' + ctx.reason + (ctx.reconnect?", will try to reconnect":", won't try to reconnect"));
});

// Trigger actual connection establishing with a server.
// At this moment actual client work starts - i.e. subscriptions
// defined start subscribing etc.
centrifuge.connect();

function drawText(text) {
container.prepend($('<li/>').html([(new Date()).toString(), ' ' + text].join(':')));
}
});
</script>
</head>
<body>
<ul id="messages"></ul>
</body>
</html>
114 changes: 114 additions & 0 deletions _examples/single_connection/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"

_ "net/http/pprof"

"github.com/centrifugal/centrifuge"
)

func handleLog(e centrifuge.LogEntry) {
log.Printf("%s: %v", e.Message, e.Fields)
}

func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
newCtx := centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
UserID: "42",
Info: []byte(`{"name": "Alexander"}`),
})
r = r.WithContext(newCtx)
h.ServeHTTP(w, r)
})
}

func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
_ = n.Shutdown(context.Background())
done <- true
}()
<-done
}

func main() {
cfg := centrifuge.DefaultConfig
cfg.LogLevel = centrifuge.LogLevelInfo
cfg.LogHandler = handleLog

node, _ := centrifuge.New(cfg)

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
cred, _ := centrifuge.GetCredentials(ctx)
return centrifuge.ConnectReply{
// Subscribe to personal several server-side channel.
Subscriptions: map[string]centrifuge.SubscribeOptions{
"#" + cred.UserID: {Presence: true},
},
}, nil
})

node.OnConnect(func(client *centrifuge.Client) {
presenceStats, err := node.PresenceStats("#" + client.UserID())
if err != nil {
client.Disconnect(centrifuge.DisconnectServerError)
return
}
if presenceStats.NumClients >= 2 {
err = node.Disconnect(
client.UserID(),
centrifuge.WithDisconnect(centrifuge.DisconnectConnectionLimit),
centrifuge.WithClientWhitelist([]string{client.ID()}),
)
if err != nil {
client.Disconnect(centrifuge.DisconnectServerError)
return
}
}

transport := client.Transport()
log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())

client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
})
})

if err := node.Run(); err != nil {
log.Fatal(err)
}

websocketHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
UseWriteBufferPool: true,
})
http.Handle("/connection/websocket", authMiddleware(websocketHandler))

sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{
URL: "https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js",
HandlerPrefix: "/connection/sockjs",
WebsocketReadBufferSize: 1024,
WebsocketWriteBufferSize: 1024,
})
http.Handle("/connection/sockjs/", authMiddleware(sockjsHandler))
http.Handle("/", http.FileServer(http.Dir("./")))

go func() {
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatal(err)
}
}()

waitExitSignal(node)
log.Println("bye!")
}
13 changes: 13 additions & 0 deletions _examples/single_connection/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
This example demonstrates how to keep single connection from the same user globally over all Centrifuge nodes.

As soon as user connects we subscribe it to a personal channel with presence enabled. Then inside `OnConnect` handler we check whether user has more than 1 connection inside personal channel at the moment. If yes – we disconnect other user connections (except current one) from a server.

We also could disconnect all other user connections without using channel presence at all, but this results in more unnecessary disconnect messages travelling around Centrifuge nodes.

To start example run the following command from example directory:

```
go run main.go
```

Then go to http://localhost:8000 to see it in action. Then open another browser tab – as soon as the new connection establishes the previous one will be closed.
27 changes: 27 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
v0.14.0
=======

* Add possibility to disconnect user with custom `Disconnect` object, and with client ID whitelist.
* Thus fixing non-working `WithReconnect` option when calling `node.Disconnect` method.
* No error returned from `client.Disconnect` method anymore. It was always `nil` before.

Here is what changed since v0.13.0:

```
gorelease -base v0.13.0 -version v0.14.0
github.com/centrifugal/centrifuge
---------------------------------
Incompatible changes:
- (*Client).Disconnect: changed from func(*Disconnect) error to func(*Disconnect)
- DisconnectOptions.Reconnect: removed
- DisconnectOptions: old is comparable, new is not
- WithReconnect: removed
Compatible changes:
- DisconnectOptions.ClientWhitelist: added
- DisconnectOptions.Disconnect: added
- WithClientWhitelist: added
- WithDisconnect: added
v0.14.0 is a valid semantic version for this release.
```

v0.13.0
=======

Expand Down
7 changes: 3 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,10 @@ func (c *Client) sendUnsub(ch string, resubscribe bool) error {
// and alive callback ordering/sync problems. Will be a noop if client
// already closed. As this method runs a separate goroutine client
// connection will be closed eventually (i.e. not immediately).
func (c *Client) Disconnect(disconnect *Disconnect) error {
func (c *Client) Disconnect(disconnect *Disconnect) {
go func() {
_ = c.close(disconnect)
}()
return nil
}

func (c *Client) close(disconnect *Disconnect) error {
Expand Down Expand Up @@ -1000,7 +999,7 @@ func (c *Client) handleRefresh(params protocol.Raw, rw *replyWriter) error {
}

if reply.Expired {
_ = c.Disconnect(DisconnectExpired)
c.Disconnect(DisconnectExpired)
return
}

Expand Down Expand Up @@ -1086,7 +1085,7 @@ func (c *Client) handleSubscribe(params protocol.Raw, rw *replyWriter) error {
ctx := c.subscribeCmd(cmd, reply, rw, false)

if ctx.disconnect != nil {
_ = c.Disconnect(ctx.disconnect)
c.Disconnect(ctx.disconnect)
return
}
if ctx.err != nil {
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func TestClientAliveHandler(t *testing.T) {
if numCalls >= 50 && !closed {
close(done)
closed = true
require.NoError(t, client.Disconnect(DisconnectForceNoReconnect))
client.Disconnect(DisconnectForceNoReconnect)
}
})

Expand Down Expand Up @@ -2168,7 +2168,7 @@ func TestCloseNoRace(t *testing.T) {
done := make(chan struct{})

node.OnConnect(func(client *Client) {
require.NoError(t, client.Disconnect(DisconnectForceNoReconnect))
client.Disconnect(DisconnectForceNoReconnect)
time.Sleep(time.Second)
client.OnDisconnect(func(_ DisconnectEvent) {
close(done)
Expand Down
2 changes: 1 addition & 1 deletion handler_sockjs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSockjsHandler(t *testing.T) {
n.OnConnect(func(client *Client) {
err := client.Send([]byte(`{"SockJS write": 1}`))
require.NoError(t, err)
_ = client.Disconnect(DisconnectForceReconnect)
client.Disconnect(DisconnectForceReconnect)
})

mux.Handle("/connection/sockjs/", NewSockjsHandler(n, SockjsConfig{
Expand Down
20 changes: 14 additions & 6 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,23 @@ func (h *Hub) shutdown(ctx context.Context) error {
}
}

func (h *Hub) disconnect(user string, reconnect bool) error {
userConnections := h.userConnections(user)
advice := DisconnectForceNoReconnect
if reconnect {
advice = DisconnectForceReconnect
func stringInSlice(str string, slice []string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}

func (h *Hub) disconnect(user string, disconnect *Disconnect, whitelist []string) error {
userConnections := h.userConnections(user)
for _, c := range userConnections {
if stringInSlice(c.ID(), whitelist) {
continue
}
go func(cc *Client) {
_ = cc.close(advice)
_ = cc.close(disconnect)
}(c)
}
return nil
Expand Down
56 changes: 53 additions & 3 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func TestHubDisconnect(t *testing.T) {
}

// Disconnect not existed user.
err := n.hub.disconnect("1", false)
err := n.hub.disconnect("1", DisconnectForceNoReconnect, nil)
require.NoError(t, err)

// Disconnect subscribed user.
err = n.hub.disconnect("42", false)
err = n.hub.disconnect("42", DisconnectForceNoReconnect, nil)
require.NoError(t, err)
select {
case <-client.transport.(*testTransport).closeCh:
Expand All @@ -179,7 +179,7 @@ func TestHubDisconnect(t *testing.T) {
require.NotContains(t, n.hub.subs, "test_channel")

// Disconnect subscribed user with reconnect.
err = n.hub.disconnect("24", true)
err = n.hub.disconnect("24", DisconnectForceReconnect, nil)
require.NoError(t, err)
select {
case <-clientWithReconnect.transport.(*testTransport).closeCh:
Expand All @@ -197,6 +197,56 @@ func TestHubDisconnect(t *testing.T) {
require.Len(t, n.hub.subs, 0)
}

func TestHubDisconnect_ClientWhitelist(t *testing.T) {
n := nodeWithMemoryEngineNoHandlers()
defer func() { _ = n.Shutdown(context.Background()) }()

n.OnConnect(func(client *Client) {
client.OnSubscribe(func(event SubscribeEvent, cb SubscribeCallback) {
cb(SubscribeReply{}, nil)
})
})

client := newTestSubscribedClient(t, n, "12", "test_channel")
clientToKeep := newTestSubscribedClient(t, n, "12", "test_channel")

require.Len(t, n.hub.conns, 2)
require.Len(t, n.hub.users, 1)
require.Len(t, n.hub.subs, 1)
require.Len(t, n.hub.subs["test_channel"], 2)

shouldBeClosed := make(chan struct{})
shouldNotBeClosed := make(chan struct{})

client.eventHub.disconnectHandler = func(e DisconnectEvent) {
close(shouldBeClosed)
}

clientToKeep.eventHub.disconnectHandler = func(e DisconnectEvent) {
close(shouldNotBeClosed)
}

whitelist := []string{clientToKeep.ID()}

// Disconnect not existed user.
err := n.hub.disconnect("12", DisconnectConnectionLimit, whitelist)
require.NoError(t, err)

select {
case <-shouldBeClosed:
select {
case <-shouldNotBeClosed:
require.Fail(t, "client should not be disconnected")
case <-time.After(time.Second):
require.Len(t, n.hub.conns, 1)
require.Len(t, n.hub.users, 1)
require.Len(t, n.hub.subs["test_channel"], 1)
}
case <-time.After(time.Second):
require.Fail(t, "timeout waiting for channel close")
}
}

func TestHubBroadcastPublication(t *testing.T) {
tcs := []struct {
name string
Expand Down
Loading

0 comments on commit bc633f1

Please sign in to comment.