diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b9866413..7dfd3369 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,8 @@ jobs: golangci: name: Lint runs-on: ubuntu-latest + # Prevent duplicate builds on internal PRs. + if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository steps: - uses: actions/checkout@v2 - name: golangci-lint @@ -13,6 +15,8 @@ jobs: build: name: Test with Go ${{ matrix.go-version }} runs-on: ubuntu-latest + # Prevent duplicate builds on internal PRs. + if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository strategy: matrix: go-version: [1.14, 1.15] diff --git a/_examples/single_connection/index.html b/_examples/single_connection/index.html new file mode 100644 index 00000000..5418083f --- /dev/null +++ b/_examples/single_connection/index.html @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + diff --git a/_examples/single_connection/main.go b/_examples/single_connection/main.go new file mode 100644 index 00000000..98328b49 --- /dev/null +++ b/_examples/single_connection/main.go @@ -0,0 +1,114 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + "os/signal" + "syscall" + + _ "net/http/pprof" + + "github.com/centrifugal/centrifuge" +) + +func handleLog(e centrifuge.LogEntry) { + log.Printf("%s: %v", e.Message, e.Fields) +} + +func authMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ + UserID: "42", + Info: []byte(`{"name": "Alexander"}`), + }) + r = r.WithContext(newCtx) + h.ServeHTTP(w, r) + }) +} + +func waitExitSignal(n *centrifuge.Node) { + sigCh := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + _ = n.Shutdown(context.Background()) + done <- true + }() + <-done +} + +func main() { + cfg := centrifuge.DefaultConfig + cfg.LogLevel = centrifuge.LogLevelInfo + cfg.LogHandler = handleLog + + node, _ := centrifuge.New(cfg) + + node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { + cred, _ := centrifuge.GetCredentials(ctx) + return centrifuge.ConnectReply{ + // Subscribe to personal several server-side channel. + Subscriptions: map[string]centrifuge.SubscribeOptions{ + "#" + cred.UserID: {Presence: true}, + }, + }, nil + }) + + node.OnConnect(func(client *centrifuge.Client) { + presenceStats, err := node.PresenceStats("#" + client.UserID()) + if err != nil { + client.Disconnect(centrifuge.DisconnectServerError) + return + } + if presenceStats.NumClients >= 2 { + err = node.Disconnect( + client.UserID(), + centrifuge.WithDisconnect(centrifuge.DisconnectConnectionLimit), + centrifuge.WithClientWhitelist([]string{client.ID()}), + ) + if err != nil { + client.Disconnect(centrifuge.DisconnectServerError) + return + } + } + + transport := client.Transport() + log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) + + client.OnDisconnect(func(e centrifuge.DisconnectEvent) { + log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) + }) + }) + + if err := node.Run(); err != nil { + log.Fatal(err) + } + + websocketHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{ + ReadBufferSize: 1024, + UseWriteBufferPool: true, + }) + http.Handle("/connection/websocket", authMiddleware(websocketHandler)) + + sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{ + URL: "https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js", + HandlerPrefix: "/connection/sockjs", + WebsocketReadBufferSize: 1024, + WebsocketWriteBufferSize: 1024, + }) + http.Handle("/connection/sockjs/", authMiddleware(sockjsHandler)) + http.Handle("/", http.FileServer(http.Dir("./"))) + + go func() { + if err := http.ListenAndServe(":8000", nil); err != nil { + log.Fatal(err) + } + }() + + waitExitSignal(node) + log.Println("bye!") +} diff --git a/_examples/single_connection/readme.md b/_examples/single_connection/readme.md new file mode 100644 index 00000000..f2585570 --- /dev/null +++ b/_examples/single_connection/readme.md @@ -0,0 +1,13 @@ +This example demonstrates how to keep single connection from the same user globally over all Centrifuge nodes. + +As soon as user connects we subscribe it to a personal channel with presence enabled. Then inside `OnConnect` handler we check whether user has more than 1 connection inside personal channel at the moment. If yes – we disconnect other user connections (except current one) from a server. + +We also could disconnect all other user connections without using channel presence at all, but this results in more unnecessary disconnect messages travelling around Centrifuge nodes. + +To start example run the following command from example directory: + +``` +go run main.go +``` + +Then go to http://localhost:8000 to see it in action. Then open another browser tab – as soon as the new connection establishes the previous one will be closed. diff --git a/changelog.md b/changelog.md index 79aa9e36..09c59564 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,30 @@ +v0.14.0 +======= + +* Add possibility to disconnect user with custom `Disconnect` object, and with client ID whitelist. +* Thus fixing non-working `WithReconnect` option when calling `node.Disconnect` method. +* No error returned from `client.Disconnect` method anymore. It was always `nil` before. + +Here is what changed since v0.13.0: + +``` +gorelease -base v0.13.0 -version v0.14.0 +github.com/centrifugal/centrifuge +--------------------------------- +Incompatible changes: +- (*Client).Disconnect: changed from func(*Disconnect) error to func(*Disconnect) +- DisconnectOptions.Reconnect: removed +- DisconnectOptions: old is comparable, new is not +- WithReconnect: removed +Compatible changes: +- DisconnectOptions.ClientWhitelist: added +- DisconnectOptions.Disconnect: added +- WithClientWhitelist: added +- WithDisconnect: added + +v0.14.0 is a valid semantic version for this release. +``` + v0.13.0 ======= diff --git a/client.go b/client.go index d26452c8..036a256a 100644 --- a/client.go +++ b/client.go @@ -581,11 +581,10 @@ func (c *Client) sendUnsub(ch string, resubscribe bool) error { // and alive callback ordering/sync problems. Will be a noop if client // already closed. As this method runs a separate goroutine client // connection will be closed eventually (i.e. not immediately). -func (c *Client) Disconnect(disconnect *Disconnect) error { +func (c *Client) Disconnect(disconnect *Disconnect) { go func() { _ = c.close(disconnect) }() - return nil } func (c *Client) close(disconnect *Disconnect) error { @@ -1000,7 +999,7 @@ func (c *Client) handleRefresh(params protocol.Raw, rw *replyWriter) error { } if reply.Expired { - _ = c.Disconnect(DisconnectExpired) + c.Disconnect(DisconnectExpired) return } @@ -1086,7 +1085,7 @@ func (c *Client) handleSubscribe(params protocol.Raw, rw *replyWriter) error { ctx := c.subscribeCmd(cmd, reply, rw, false) if ctx.disconnect != nil { - _ = c.Disconnect(ctx.disconnect) + c.Disconnect(ctx.disconnect) return } if ctx.err != nil { diff --git a/client_test.go b/client_test.go index 6ae9e871..60c7c2f6 100644 --- a/client_test.go +++ b/client_test.go @@ -842,7 +842,7 @@ func TestClientAliveHandler(t *testing.T) { if numCalls >= 50 && !closed { close(done) closed = true - require.NoError(t, client.Disconnect(DisconnectForceNoReconnect)) + client.Disconnect(DisconnectForceNoReconnect) } }) @@ -2168,7 +2168,7 @@ func TestCloseNoRace(t *testing.T) { done := make(chan struct{}) node.OnConnect(func(client *Client) { - require.NoError(t, client.Disconnect(DisconnectForceNoReconnect)) + client.Disconnect(DisconnectForceNoReconnect) time.Sleep(time.Second) client.OnDisconnect(func(_ DisconnectEvent) { close(done) diff --git a/handler_sockjs_test.go b/handler_sockjs_test.go index 6b55fddb..830e79e8 100644 --- a/handler_sockjs_test.go +++ b/handler_sockjs_test.go @@ -41,7 +41,7 @@ func TestSockjsHandler(t *testing.T) { n.OnConnect(func(client *Client) { err := client.Send([]byte(`{"SockJS write": 1}`)) require.NoError(t, err) - _ = client.Disconnect(DisconnectForceReconnect) + client.Disconnect(DisconnectForceReconnect) }) mux.Handle("/connection/sockjs/", NewSockjsHandler(n, SockjsConfig{ diff --git a/hub.go b/hub.go index 49a8055e..5837f38a 100644 --- a/hub.go +++ b/hub.go @@ -86,15 +86,23 @@ func (h *Hub) shutdown(ctx context.Context) error { } } -func (h *Hub) disconnect(user string, reconnect bool) error { - userConnections := h.userConnections(user) - advice := DisconnectForceNoReconnect - if reconnect { - advice = DisconnectForceReconnect +func stringInSlice(str string, slice []string) bool { + for _, s := range slice { + if s == str { + return true + } } + return false +} + +func (h *Hub) disconnect(user string, disconnect *Disconnect, whitelist []string) error { + userConnections := h.userConnections(user) for _, c := range userConnections { + if stringInSlice(c.ID(), whitelist) { + continue + } go func(cc *Client) { - _ = cc.close(advice) + _ = cc.close(disconnect) }(c) } return nil diff --git a/hub_test.go b/hub_test.go index 47baf6da..281fb472 100644 --- a/hub_test.go +++ b/hub_test.go @@ -163,11 +163,11 @@ func TestHubDisconnect(t *testing.T) { } // Disconnect not existed user. - err := n.hub.disconnect("1", false) + err := n.hub.disconnect("1", DisconnectForceNoReconnect, nil) require.NoError(t, err) // Disconnect subscribed user. - err = n.hub.disconnect("42", false) + err = n.hub.disconnect("42", DisconnectForceNoReconnect, nil) require.NoError(t, err) select { case <-client.transport.(*testTransport).closeCh: @@ -179,7 +179,7 @@ func TestHubDisconnect(t *testing.T) { require.NotContains(t, n.hub.subs, "test_channel") // Disconnect subscribed user with reconnect. - err = n.hub.disconnect("24", true) + err = n.hub.disconnect("24", DisconnectForceReconnect, nil) require.NoError(t, err) select { case <-clientWithReconnect.transport.(*testTransport).closeCh: @@ -197,6 +197,56 @@ func TestHubDisconnect(t *testing.T) { require.Len(t, n.hub.subs, 0) } +func TestHubDisconnect_ClientWhitelist(t *testing.T) { + n := nodeWithMemoryEngineNoHandlers() + defer func() { _ = n.Shutdown(context.Background()) }() + + n.OnConnect(func(client *Client) { + client.OnSubscribe(func(event SubscribeEvent, cb SubscribeCallback) { + cb(SubscribeReply{}, nil) + }) + }) + + client := newTestSubscribedClient(t, n, "12", "test_channel") + clientToKeep := newTestSubscribedClient(t, n, "12", "test_channel") + + require.Len(t, n.hub.conns, 2) + require.Len(t, n.hub.users, 1) + require.Len(t, n.hub.subs, 1) + require.Len(t, n.hub.subs["test_channel"], 2) + + shouldBeClosed := make(chan struct{}) + shouldNotBeClosed := make(chan struct{}) + + client.eventHub.disconnectHandler = func(e DisconnectEvent) { + close(shouldBeClosed) + } + + clientToKeep.eventHub.disconnectHandler = func(e DisconnectEvent) { + close(shouldNotBeClosed) + } + + whitelist := []string{clientToKeep.ID()} + + // Disconnect not existed user. + err := n.hub.disconnect("12", DisconnectConnectionLimit, whitelist) + require.NoError(t, err) + + select { + case <-shouldBeClosed: + select { + case <-shouldNotBeClosed: + require.Fail(t, "client should not be disconnected") + case <-time.After(time.Second): + require.Len(t, n.hub.conns, 1) + require.Len(t, n.hub.users, 1) + require.Len(t, n.hub.subs["test_channel"], 1) + } + case <-time.After(time.Second): + require.Fail(t, "timeout waiting for channel close") + } +} + func TestHubBroadcastPublication(t *testing.T) { tcs := []struct { name string diff --git a/internal/controlpb/control.pb.go b/internal/controlpb/control.pb.go index f6fc80f4..06fcaf5c 100644 --- a/internal/controlpb/control.pb.go +++ b/internal/controlpb/control.pb.go @@ -206,7 +206,11 @@ func (m *Unsubscribe) GetUser() string { } type Disconnect struct { - User string `protobuf:"bytes,1,opt,name=user,proto3" json:"user"` + User string `protobuf:"bytes,1,opt,name=user,proto3" json:"user"` + Whitelist []string `protobuf:"bytes,2,rep,name=whitelist" json:"whitelist"` + Code uint32 `protobuf:"varint,3,opt,name=code,proto3" json:"code"` + Reason string `protobuf:"bytes,4,opt,name=reason,proto3" json:"reason"` + Reconnect bool `protobuf:"varint,5,opt,name=reconnect,proto3" json:"reconnect"` } func (m *Disconnect) Reset() { *m = Disconnect{} } @@ -221,6 +225,34 @@ func (m *Disconnect) GetUser() string { return "" } +func (m *Disconnect) GetWhitelist() []string { + if m != nil { + return m.Whitelist + } + return nil +} + +func (m *Disconnect) GetCode() uint32 { + if m != nil { + return m.Code + } + return 0 +} + +func (m *Disconnect) GetReason() string { + if m != nil { + return m.Reason + } + return "" +} + +func (m *Disconnect) GetReconnect() bool { + if m != nil { + return m.Reconnect + } + return false +} + func init() { proto.RegisterType((*Command)(nil), "controlpb.Command") proto.RegisterType((*Node)(nil), "controlpb.Node") @@ -385,6 +417,23 @@ func (this *Disconnect) Equal(that interface{}) bool { if this.User != that1.User { return false } + if len(this.Whitelist) != len(that1.Whitelist) { + return false + } + for i := range this.Whitelist { + if this.Whitelist[i] != that1.Whitelist[i] { + return false + } + } + if this.Code != that1.Code { + return false + } + if this.Reason != that1.Reason { + return false + } + if this.Reconnect != that1.Reconnect { + return false + } return true } func (m *Command) Marshal() (dAtA []byte, err error) { @@ -582,6 +631,42 @@ func (m *Disconnect) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintControl(dAtA, i, uint64(len(m.User))) i += copy(dAtA[i:], m.User) } + if len(m.Whitelist) > 0 { + for _, s := range m.Whitelist { + dAtA[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + if m.Code != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintControl(dAtA, i, uint64(m.Code)) + } + if len(m.Reason) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintControl(dAtA, i, uint64(len(m.Reason))) + i += copy(dAtA[i:], m.Reason) + } + if m.Reconnect { + dAtA[i] = 0x28 + i++ + if m.Reconnect { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } return i, nil } @@ -656,6 +741,14 @@ func NewPopulatedUnsubscribe(r randyControl, easy bool) *Unsubscribe { func NewPopulatedDisconnect(r randyControl, easy bool) *Disconnect { this := &Disconnect{} this.User = string(randStringControl(r)) + v4 := r.Intn(10) + this.Whitelist = make([]string, v4) + for i := 0; i < v4; i++ { + this.Whitelist[i] = string(randStringControl(r)) + } + this.Code = uint32(r.Uint32()) + this.Reason = string(randStringControl(r)) + this.Reconnect = bool(bool(r.Intn(2) == 0)) if !easy && r.Intn(10) != 0 { } return this @@ -680,9 +773,9 @@ func randUTF8RuneControl(r randyControl) rune { return rune(ru + 61) } func randStringControl(r randyControl) string { - v4 := r.Intn(100) - tmps := make([]rune, v4) - for i := 0; i < v4; i++ { + v5 := r.Intn(100) + tmps := make([]rune, v5) + for i := 0; i < v5; i++ { tmps[i] = randUTF8RuneControl(r) } return string(tmps) @@ -704,11 +797,11 @@ func randFieldControl(dAtA []byte, r randyControl, fieldNumber int, wire int) [] switch wire { case 0: dAtA = encodeVarintPopulateControl(dAtA, uint64(key)) - v5 := r.Int63() + v6 := r.Int63() if r.Intn(2) == 0 { - v5 *= -1 + v6 *= -1 } - dAtA = encodeVarintPopulateControl(dAtA, uint64(v5)) + dAtA = encodeVarintPopulateControl(dAtA, uint64(v6)) case 1: dAtA = encodeVarintPopulateControl(dAtA, uint64(key)) dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) @@ -820,6 +913,22 @@ func (m *Disconnect) Size() (n int) { if l > 0 { n += 1 + l + sovControl(uint64(l)) } + if len(m.Whitelist) > 0 { + for _, s := range m.Whitelist { + l = len(s) + n += 1 + l + sovControl(uint64(l)) + } + } + if m.Code != 0 { + n += 1 + sovControl(uint64(m.Code)) + } + l = len(m.Reason) + if l > 0 { + n += 1 + l + sovControl(uint64(l)) + } + if m.Reconnect { + n += 2 + } return n } @@ -1537,6 +1646,103 @@ func (m *Disconnect) Unmarshal(dAtA []byte) error { } m.User = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Whitelist", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowControl + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthControl + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Whitelist = append(m.Whitelist, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowControl + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Reason", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowControl + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthControl + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Reason = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Reconnect", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowControl + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Reconnect = bool(v != 0) default: iNdEx = preIndex skippy, err := skipControl(dAtA[iNdEx:]) @@ -1666,45 +1872,49 @@ var ( func init() { proto.RegisterFile("control.proto", fileDescriptorControl) } var fileDescriptorControl = []byte{ - // 634 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x3d, 0x6f, 0xdb, 0x3c, - 0x10, 0xc7, 0x4d, 0xdb, 0xf1, 0xcb, 0x39, 0xc9, 0x63, 0x10, 0x09, 0xa0, 0xc7, 0x48, 0x25, 0xc1, - 0x40, 0x01, 0xc1, 0x40, 0x9c, 0x22, 0x59, 0x92, 0x0e, 0x1d, 0x64, 0x7b, 0xf0, 0x50, 0x07, 0x60, - 0xe2, 0xb9, 0x90, 0x65, 0x36, 0x11, 0x6a, 0x91, 0x86, 0x5e, 0x52, 0xe4, 0x0b, 0x14, 0x45, 0xa6, - 0x2e, 0x1d, 0x33, 0x75, 0xe9, 0xd2, 0xbd, 0x1f, 0x21, 0x63, 0xe7, 0x0e, 0x6c, 0xab, 0x6e, 0xfa, - 0x04, 0x1d, 0x0b, 0x52, 0xb2, 0x95, 0xa0, 0x1d, 0xba, 0x90, 0xff, 0xfb, 0xdf, 0xcf, 0x3e, 0x92, - 0x77, 0x82, 0x2d, 0x97, 0xb3, 0x28, 0xe0, 0x8b, 0xfe, 0x32, 0xe0, 0x11, 0xc7, 0xcd, 0x3c, 0x5c, - 0xce, 0x3a, 0xfb, 0x17, 0x5e, 0x74, 0x19, 0xcf, 0xfa, 0x2e, 0xf7, 0x0f, 0x2e, 0xf8, 0x05, 0x3f, - 0x50, 0xc4, 0x2c, 0x7e, 0xa9, 0x22, 0x15, 0x28, 0x95, 0xfd, 0xb2, 0xfb, 0x1e, 0x41, 0x7d, 0xc0, - 0x7d, 0xdf, 0x61, 0x73, 0x6c, 0x42, 0x25, 0xf6, 0xe6, 0x1a, 0x32, 0x91, 0xd5, 0xb4, 0xb7, 0x13, - 0x61, 0x54, 0xa6, 0xe3, 0x61, 0x2a, 0x0c, 0xe9, 0x12, 0xb9, 0xe0, 0x13, 0xa8, 0xf9, 0x34, 0xba, - 0xe4, 0x73, 0xad, 0x6c, 0x22, 0x6b, 0xfb, 0x70, 0xb7, 0xbf, 0x2e, 0xdc, 0x7f, 0xae, 0x12, 0xe7, - 0xd7, 0x4b, 0x6a, 0x43, 0x2a, 0x8c, 0x1c, 0x24, 0xf9, 0x8e, 0xf7, 0xa1, 0xb6, 0x74, 0x02, 0xc7, - 0x0f, 0xb5, 0x8a, 0x89, 0xac, 0x4d, 0x7b, 0xf7, 0x4e, 0x18, 0xa5, 0xaf, 0xc2, 0xa8, 0x10, 0xe7, - 0xb5, 0xc4, 0xb3, 0x24, 0xc9, 0xf7, 0xee, 0xb7, 0x32, 0x54, 0x27, 0x7c, 0x4e, 0xff, 0xe1, 0x50, - 0x7b, 0x50, 0x65, 0x8e, 0x4f, 0xd5, 0x91, 0x9a, 0x76, 0x23, 0x15, 0x86, 0x8a, 0x89, 0x5a, 0xf1, - 0x63, 0xa8, 0x5f, 0xd1, 0x20, 0xf4, 0x38, 0x53, 0x85, 0x9b, 0x76, 0x2b, 0x15, 0xc6, 0xca, 0x22, - 0x2b, 0x81, 0x9f, 0x40, 0x8b, 0xc5, 0xfe, 0x0b, 0x77, 0xe1, 0x51, 0x16, 0x85, 0x5a, 0xd5, 0x44, - 0xd6, 0x96, 0xfd, 0x5f, 0x2a, 0x8c, 0xfb, 0x36, 0x01, 0x16, 0xfb, 0x83, 0x4c, 0xe3, 0x1e, 0x34, - 0x65, 0x2a, 0x0e, 0x69, 0x10, 0x6a, 0x1b, 0x8a, 0xdf, 0x4a, 0x85, 0x51, 0x98, 0xa4, 0xc1, 0x62, - 0x7f, 0x2a, 0x15, 0x3e, 0x82, 0x4d, 0xf5, 0x37, 0x97, 0x0e, 0x63, 0x74, 0x11, 0x6a, 0x35, 0x85, - 0xb7, 0x53, 0x61, 0x3c, 0xf0, 0x89, 0x2c, 0x36, 0xc8, 0x03, 0xdc, 0x85, 0x5a, 0xbc, 0x8c, 0x3c, - 0x9f, 0x6a, 0x75, 0x85, 0xab, 0x57, 0xcd, 0x1c, 0x92, 0xef, 0xf8, 0x04, 0xea, 0x3e, 0x8d, 0x02, - 0xcf, 0x0d, 0xb5, 0x86, 0x89, 0xac, 0xd6, 0x21, 0x7e, 0xd8, 0x11, 0x99, 0xc9, 0x6e, 0x9c, 0x63, - 0x64, 0x25, 0xba, 0x9f, 0x10, 0xd4, 0x73, 0x02, 0x5b, 0xd0, 0xf0, 0x58, 0x44, 0x83, 0x2b, 0x67, - 0xa1, 0x5e, 0x1a, 0xd9, 0x9b, 0xa9, 0x30, 0xd6, 0x1e, 0x59, 0x2b, 0xfc, 0x0c, 0x36, 0xbc, 0x88, - 0xfa, 0xa1, 0x56, 0x36, 0x2b, 0x56, 0xeb, 0xf0, 0xd1, 0x9f, 0xe5, 0xfa, 0x63, 0x99, 0x1f, 0xb1, - 0x28, 0xb8, 0xb6, 0x9b, 0xa9, 0x30, 0x32, 0x9e, 0x64, 0x5b, 0xe7, 0x18, 0xa0, 0xc8, 0xe3, 0x36, - 0x54, 0x5e, 0xd1, 0xeb, 0xac, 0xb9, 0x44, 0x4a, 0xbc, 0x03, 0x1b, 0x57, 0xce, 0x22, 0xce, 0xba, - 0x89, 0x48, 0x16, 0x3c, 0x2d, 0x1f, 0xa3, 0x2e, 0x81, 0xd6, 0x94, 0x85, 0xf1, 0x2c, 0x74, 0x03, - 0x6f, 0xa6, 0xfa, 0x9a, 0x3f, 0x5b, 0x3e, 0x1b, 0xea, 0x96, 0xb9, 0x45, 0x56, 0x42, 0x0e, 0x87, - 0x6c, 0xc6, 0xfd, 0xe1, 0x90, 0x31, 0x51, 0x6b, 0xb7, 0x07, 0x30, 0xf4, 0x42, 0x97, 0x33, 0x46, - 0xdd, 0x68, 0xcd, 0xa2, 0xbf, 0xb1, 0xbd, 0x37, 0x08, 0xa0, 0x98, 0x71, 0x09, 0x4f, 0x4e, 0x87, - 0xa3, 0x76, 0xa9, 0x83, 0x6f, 0x6e, 0xcd, 0xed, 0x22, 0xa3, 0xa6, 0xb6, 0x07, 0xad, 0xe9, 0xe4, - 0x6c, 0x6a, 0x9f, 0x0d, 0xc8, 0xd8, 0x1e, 0xb5, 0x51, 0xe7, 0xff, 0x9b, 0x5b, 0x73, 0xb7, 0x80, - 0xee, 0xdf, 0xc4, 0x02, 0x18, 0x8e, 0xcf, 0x06, 0xa7, 0x93, 0xc9, 0x68, 0x70, 0xde, 0x2e, 0x77, - 0xb4, 0x9b, 0x5b, 0x73, 0xa7, 0x40, 0x8b, 0x03, 0x76, 0xaa, 0x6f, 0x3f, 0xe8, 0x25, 0x7b, 0xef, - 0xd7, 0x0f, 0x1d, 0x7d, 0x4c, 0x74, 0xf4, 0x39, 0xd1, 0xd1, 0x5d, 0xa2, 0xa3, 0x2f, 0x89, 0x8e, - 0xbe, 0x27, 0x3a, 0x7a, 0xf7, 0x53, 0x2f, 0xcd, 0x6a, 0xea, 0xbb, 0x3e, 0xfa, 0x1d, 0x00, 0x00, - 0xff, 0xff, 0x98, 0x38, 0xec, 0xa4, 0x22, 0x04, 0x00, 0x00, + // 695 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x3f, 0x6f, 0xd3, 0x40, + 0x14, 0xcf, 0x25, 0x69, 0x12, 0xbf, 0x34, 0x25, 0x3a, 0xb5, 0x92, 0x89, 0x8a, 0x6d, 0x45, 0x42, + 0xb2, 0x8a, 0x9a, 0xa2, 0x76, 0x69, 0x19, 0x18, 0x9c, 0x64, 0xc8, 0x40, 0x2a, 0x5d, 0x9b, 0x19, + 0x39, 0xce, 0xd1, 0x58, 0xc4, 0xe7, 0xc8, 0x7f, 0x5a, 0xf5, 0x0b, 0x20, 0xd4, 0x89, 0x85, 0xb1, + 0x13, 0x0b, 0x0b, 0x3b, 0x9f, 0x00, 0x75, 0x64, 0x66, 0x30, 0x10, 0x36, 0x7f, 0x02, 0x46, 0x74, + 0x77, 0x4e, 0x9c, 0x0a, 0x06, 0x96, 0xf7, 0x7e, 0xef, 0xf7, 0x7e, 0xb9, 0x77, 0xf7, 0xde, 0x73, + 0xa0, 0xe1, 0xf8, 0x2c, 0x0a, 0xfc, 0x59, 0x67, 0x1e, 0xf8, 0x91, 0x8f, 0x95, 0x2c, 0x9c, 0x8f, + 0x5b, 0xfb, 0x17, 0x6e, 0x34, 0x8d, 0xc7, 0x1d, 0xc7, 0xf7, 0x0e, 0x2e, 0xfc, 0x0b, 0xff, 0x40, + 0x28, 0xc6, 0xf1, 0x2b, 0x11, 0x89, 0x40, 0x20, 0xf9, 0xcb, 0xf6, 0x7b, 0x04, 0xd5, 0xae, 0xef, + 0x79, 0x36, 0x9b, 0x60, 0x03, 0x4a, 0xb1, 0x3b, 0x51, 0x91, 0x81, 0x4c, 0xc5, 0xda, 0x5a, 0x24, + 0x7a, 0x69, 0x34, 0xe8, 0xa5, 0x89, 0xce, 0x59, 0xc2, 0x0d, 0x3e, 0x81, 0x8a, 0x47, 0xa3, 0xa9, + 0x3f, 0x51, 0x8b, 0x06, 0x32, 0xb7, 0x0e, 0x77, 0x3a, 0xab, 0xc2, 0x9d, 0x17, 0x22, 0x71, 0x7e, + 0x3d, 0xa7, 0x16, 0xa4, 0x89, 0x9e, 0x09, 0x49, 0xe6, 0xf1, 0x3e, 0x54, 0xe6, 0x76, 0x60, 0x7b, + 0xa1, 0x5a, 0x32, 0x90, 0xb9, 0x69, 0xed, 0xdc, 0x25, 0x7a, 0xe1, 0x5b, 0xa2, 0x97, 0x88, 0x7d, + 0xc5, 0xe5, 0x32, 0x49, 0x32, 0xdf, 0xfe, 0x5e, 0x84, 0xf2, 0xd0, 0x9f, 0xd0, 0xff, 0xb8, 0xd4, + 0x2e, 0x94, 0x99, 0xed, 0x51, 0x71, 0x25, 0xc5, 0xaa, 0xa5, 0x89, 0x2e, 0x62, 0x22, 0x2c, 0x7e, + 0x0c, 0xd5, 0x4b, 0x1a, 0x84, 0xae, 0xcf, 0x44, 0x61, 0xc5, 0xaa, 0xa7, 0x89, 0xbe, 0xa4, 0xc8, + 0x12, 0xe0, 0xa7, 0x50, 0x67, 0xb1, 0xf7, 0xd2, 0x99, 0xb9, 0x94, 0x45, 0xa1, 0x5a, 0x36, 0x90, + 0xd9, 0xb0, 0x1e, 0xa4, 0x89, 0xbe, 0x4e, 0x13, 0x60, 0xb1, 0xd7, 0x95, 0x18, 0xef, 0x81, 0xc2, + 0x53, 0x71, 0x48, 0x83, 0x50, 0xdd, 0x10, 0xfa, 0x46, 0x9a, 0xe8, 0x39, 0x49, 0x6a, 0x2c, 0xf6, + 0x46, 0x1c, 0xe1, 0x23, 0xd8, 0x14, 0xc7, 0x4c, 0x6d, 0xc6, 0xe8, 0x2c, 0x54, 0x2b, 0x42, 0xde, + 0x4c, 0x13, 0xfd, 0x1e, 0x4f, 0x78, 0xb1, 0x6e, 0x16, 0xe0, 0x36, 0x54, 0xe2, 0x79, 0xe4, 0x7a, + 0x54, 0xad, 0x0a, 0xb9, 0xe8, 0xaa, 0x64, 0x48, 0xe6, 0xf1, 0x09, 0x54, 0x3d, 0x1a, 0x05, 0xae, + 0x13, 0xaa, 0x35, 0x03, 0x99, 0xf5, 0x43, 0x7c, 0x7f, 0x22, 0x3c, 0x23, 0x5f, 0x9c, 0xc9, 0xc8, + 0x12, 0xb4, 0x3f, 0x21, 0xa8, 0x66, 0x0a, 0x6c, 0x42, 0xcd, 0x65, 0x11, 0x0d, 0x2e, 0xed, 0x99, + 0xe8, 0x34, 0xb2, 0x36, 0xd3, 0x44, 0x5f, 0x71, 0x64, 0x85, 0xf0, 0x73, 0xd8, 0x70, 0x23, 0xea, + 0x85, 0x6a, 0xd1, 0x28, 0x99, 0xf5, 0xc3, 0x47, 0x7f, 0x97, 0xeb, 0x0c, 0x78, 0xbe, 0xcf, 0xa2, + 0xe0, 0xda, 0x52, 0xd2, 0x44, 0x97, 0x7a, 0x22, 0x5d, 0xeb, 0x18, 0x20, 0xcf, 0xe3, 0x26, 0x94, + 0x5e, 0xd3, 0x6b, 0x39, 0x5c, 0xc2, 0x21, 0xde, 0x86, 0x8d, 0x4b, 0x7b, 0x16, 0xcb, 0x69, 0x22, + 0x22, 0x83, 0x67, 0xc5, 0x63, 0xd4, 0x26, 0x50, 0x1f, 0xb1, 0x30, 0x1e, 0x87, 0x4e, 0xe0, 0x8e, + 0xc5, 0x5c, 0xb3, 0xb6, 0x65, 0xbb, 0x21, 0x5e, 0x99, 0x51, 0x64, 0x09, 0xf8, 0x72, 0xf0, 0x61, + 0xac, 0x2f, 0x07, 0x8f, 0x89, 0xb0, 0xed, 0x2f, 0x08, 0xa0, 0xe7, 0x86, 0x8e, 0xcf, 0x18, 0x75, + 0xa2, 0x95, 0x18, 0xfd, 0x4b, 0x8c, 0x9f, 0x80, 0x72, 0x35, 0x75, 0x23, 0x3a, 0x73, 0xc3, 0x48, + 0x3c, 0x5f, 0x91, 0x03, 0x5f, 0x91, 0x24, 0x87, 0xfc, 0x28, 0xc7, 0x9f, 0x50, 0xb1, 0x73, 0x0d, + 0x79, 0x14, 0x8f, 0x89, 0xb0, 0x7c, 0xb4, 0x01, 0xb5, 0x43, 0x9f, 0x89, 0x45, 0x53, 0xe4, 0x68, + 0x25, 0x43, 0x32, 0xcf, 0xcb, 0x05, 0x34, 0xbb, 0x99, 0xd8, 0xaf, 0x9a, 0x2c, 0xb7, 0x22, 0x49, + 0x0e, 0xf7, 0xde, 0x20, 0x80, 0xfc, 0x03, 0xe4, 0xd5, 0x87, 0xa7, 0xbd, 0x7e, 0xb3, 0xd0, 0xc2, + 0x37, 0xb7, 0xc6, 0x56, 0x9e, 0x11, 0x9f, 0xd4, 0x1e, 0xd4, 0x47, 0xc3, 0xb3, 0x91, 0x75, 0xd6, + 0x25, 0x03, 0xab, 0xdf, 0x44, 0xad, 0x87, 0x37, 0xb7, 0xc6, 0x4e, 0x2e, 0x5a, 0x6f, 0xb3, 0x09, + 0xd0, 0x1b, 0x9c, 0x75, 0x4f, 0x87, 0xc3, 0x7e, 0xf7, 0xbc, 0x59, 0x6c, 0xa9, 0x37, 0xb7, 0xc6, + 0x76, 0x2e, 0xcd, 0x9b, 0xd7, 0x2a, 0xbf, 0xfd, 0xa0, 0x15, 0xac, 0xdd, 0xdf, 0x3f, 0x35, 0xf4, + 0x71, 0xa1, 0xa1, 0xcf, 0x0b, 0x0d, 0xdd, 0x2d, 0x34, 0xf4, 0x75, 0xa1, 0xa1, 0x1f, 0x0b, 0x0d, + 0xbd, 0xfb, 0xa5, 0x15, 0xc6, 0x15, 0xf1, 0xa7, 0x73, 0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x5c, + 0xa8, 0x7b, 0x13, 0xbf, 0x04, 0x00, 0x00, } diff --git a/internal/controlpb/control.proto b/internal/controlpb/control.proto index 1c5b6fe5..839623ab 100644 --- a/internal/controlpb/control.proto +++ b/internal/controlpb/control.proto @@ -50,4 +50,8 @@ message Unsubscribe { message Disconnect { string user = 1 [(gogoproto.jsontag) = "user"]; + repeated string whitelist = 2 [(gogoproto.jsontag) = "whitelist"]; + uint32 code = 3 [(gogoproto.jsontag) = "code"]; + string reason = 4 [(gogoproto.jsontag) = "reason"]; + bool reconnect = 5 [(gogoproto.jsontag) = "reconnect"]; } diff --git a/node.go b/node.go index 549e3d2d..43220168 100644 --- a/node.go +++ b/node.go @@ -416,7 +416,7 @@ func (n *Node) handleControl(data []byte) error { n.logger.log(newLogEntry(LogLevelError, "error decoding disconnect control params", map[string]interface{}{"error": err.Error()})) return err } - return n.hub.disconnect(cmd.User, false) + return n.hub.disconnect(cmd.User, &Disconnect{Code: cmd.Code, Reason: cmd.Reason, Reconnect: cmd.Reconnect}, cmd.Whitelist) default: n.logger.log(newLogEntry(LogLevelError, "unknown control message method", map[string]interface{}{"method": method})) return fmt.Errorf("control method not found: %d", method) @@ -589,12 +589,15 @@ func (n *Node) pubUnsubscribe(user string, ch string) error { // pubDisconnect publishes disconnect control message to all nodes – so all // nodes could disconnect user from server. -func (n *Node) pubDisconnect(user string, reconnect bool) error { - // TODO: handle reconnect flag. - disconnect := &controlpb.Disconnect{ - User: user, - } - params, _ := n.controlEncoder.EncodeDisconnect(disconnect) +func (n *Node) pubDisconnect(user string, disconnect *Disconnect, whitelist []string) error { + protoDisconnect := &controlpb.Disconnect{ + User: user, + Whitelist: whitelist, + Code: disconnect.Code, + Reason: disconnect.Reason, + Reconnect: disconnect.Reconnect, + } + params, _ := n.controlEncoder.EncodeDisconnect(protoDisconnect) cmd := &controlpb.Command{ UID: n.uid, Method: controlpb.MethodTypeDisconnect, @@ -697,12 +700,17 @@ func (n *Node) Disconnect(user string, opts ...DisconnectOption) error { opt(disconnectOpts) } // first disconnect user from this node - err := n.hub.disconnect(user, disconnectOpts.Reconnect) + customDisconnect := disconnectOpts.Disconnect + if customDisconnect == nil { + customDisconnect = DisconnectForceNoReconnect + } + + err := n.hub.disconnect(user, customDisconnect, disconnectOpts.ClientWhitelist) if err != nil { return err } // second send disconnect control message to other nodes - return n.pubDisconnect(user, disconnectOpts.Reconnect) + return n.pubDisconnect(user, customDisconnect, disconnectOpts.ClientWhitelist) } // addPresence proxies presence adding to engine. diff --git a/node_test.go b/node_test.go index 6cc2b5a8..265c3dba 100644 --- a/node_test.go +++ b/node_test.go @@ -399,7 +399,7 @@ func TestNode_pubDisconnect(t *testing.T) { testEngine, _ := node.broker.(*TestEngine) require.EqualValues(t, 1, testEngine.publishControlCount) - err := node.pubDisconnect("42", false) + err := node.pubDisconnect("42", DisconnectForceNoReconnect, nil) require.NoError(t, err) require.EqualValues(t, 2, testEngine.publishControlCount) } diff --git a/options.go b/options.go index 444490ca..c5a7dba2 100644 --- a/options.go +++ b/options.go @@ -54,17 +54,27 @@ func WithResubscribe(resubscribe bool) UnsubscribeOption { // DisconnectOptions define some fields to alter behaviour of Disconnect operation. type DisconnectOptions struct { - // Reconnect allows to set reconnect flag. - Reconnect bool + // Disconnect represents custom disconnect to use. + // By default DisconnectForceNoReconnect will be used. + Disconnect *Disconnect + // ClientWhitelist contains client IDs to keep. + ClientWhitelist []string } // DisconnectOption is a type to represent various Disconnect options. type DisconnectOption func(options *DisconnectOptions) -// WithReconnect allows to set Reconnect flag to true. -func WithReconnect(reconnect bool) DisconnectOption { +// WithDisconnect allows to set custom Disconnect. +func WithDisconnect(disconnect *Disconnect) DisconnectOption { return func(opts *DisconnectOptions) { - opts.Reconnect = reconnect + opts.Disconnect = disconnect + } +} + +// WithClientWhitelist allows to set ClientWhitelist. +func WithClientWhitelist(whitelist []string) DisconnectOption { + return func(opts *DisconnectOptions) { + opts.ClientWhitelist = whitelist } } diff --git a/options_test.go b/options_test.go index 94d8a64e..ec2e8065 100644 --- a/options_test.go +++ b/options_test.go @@ -22,11 +22,18 @@ func TestWithResubscribe(t *testing.T) { require.Equal(t, true, opts.Resubscribe) } -func TestWithReconnect(t *testing.T) { - opt := WithReconnect(true) +func TestWithDisconnect(t *testing.T) { + opt := WithDisconnect(DisconnectConnectionLimit) opts := &DisconnectOptions{} opt(opts) - require.Equal(t, true, opts.Reconnect) + require.Equal(t, DisconnectConnectionLimit, opts.Disconnect) +} + +func TestWithClientWhitelist(t *testing.T) { + opt := WithClientWhitelist([]string{"client"}) + opts := &DisconnectOptions{} + opt(opts) + require.Equal(t, []string{"client"}, opts.ClientWhitelist) } func TestWithLimit(t *testing.T) {