From 2cda1b544396beba16c8bee06e6159460a8b676a Mon Sep 17 00:00:00 2001 From: Atsushi Watanabe Date: Mon, 23 Dec 2019 22:21:31 +0900 Subject: [PATCH] Add option to specify protocol level (#26) --- .travis.yml | 1 + client.go | 33 ------------------------- client_test.go | 1 + connect.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++--- mqtt.go | 12 --------- paho/paho.go | 1 + 6 files changed, 66 insertions(+), 49 deletions(-) diff --git a/.travis.yml b/.travis.yml index b2a5629..a2db8b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,7 @@ install: before_script: - docker-compose up -d + - echo 'replace github.com/at-wat/mqtt-go => ../' >> paho/go.mod script: - go vet -tags integration ./... diff --git a/client.go b/client.go index 712cabc..f47d8c3 100644 --- a/client.go +++ b/client.go @@ -30,39 +30,6 @@ func (c *BaseClient) Handle(handler Handler) { c.handler = handler } -// WithUserNamePassword sets plain text auth information used in Connect. -func WithUserNamePassword(userName, password string) ConnectOption { - return func(o *ConnectOptions) error { - o.UserName = userName - o.Password = password - return nil - } -} - -// WithKeepAlive sets keep alive interval in seconds. -func WithKeepAlive(interval uint16) ConnectOption { - return func(o *ConnectOptions) error { - o.KeepAlive = interval - return nil - } -} - -// WithCleanSession sets clean session flag. -func WithCleanSession(cleanSession bool) ConnectOption { - return func(o *ConnectOptions) error { - o.CleanSession = cleanSession - return nil - } -} - -// WithWill sets will message. -func WithWill(will *Message) ConnectOption { - return func(o *ConnectOptions) error { - o.Will = will - return nil - } -} - func (c *BaseClient) write(b []byte) error { l := len(b) c.muWrite.Lock() diff --git a/client_test.go b/client_test.go index 56d6a0e..a3745fe 100644 --- a/client_test.go +++ b/client_test.go @@ -20,6 +20,7 @@ func TestConnect(t *testing.T) { WithUserNamePassword("user", "pass"), WithKeepAlive(0x0123), WithCleanSession(true), + WithProtocolLevel(ProtocolLevel4), WithWill(&Message{QoS: QoS1, Topic: "topic", Payload: []byte{0x01}}), ) }() diff --git a/connect.go b/connect.go index c3888dc..ff86951 100644 --- a/connect.go +++ b/connect.go @@ -6,10 +6,13 @@ import ( "io" ) -type protocolLevel byte +// ProtocolLevel represents MQTT protocol level. +type ProtocolLevel byte +// ProtocolLevel values. const ( - protocol311 protocolLevel = 0x04 + ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1 + ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default) ) type connectFlag byte @@ -27,7 +30,9 @@ const ( // Connect to the broker. func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error) { - o := &ConnectOptions{} + o := &ConnectOptions{ + ProtocolLevel: ProtocolLevel4, + } for _, opt := range opts { if err := opt(o); err != nil { return false, err @@ -86,7 +91,7 @@ func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...Conne packetConnect.b(), []byte{ 0x00, 0x04, 0x4D, 0x51, 0x54, 0x54, - byte(protocol311), + byte(o.ProtocolLevel), flag, }, packUint16(o.KeepAlive), @@ -113,3 +118,57 @@ func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...Conne return connAck.SessionPresent, nil } } + +// ConnectOptions represents options for Connect. +type ConnectOptions struct { + UserName string + Password string + CleanSession bool + KeepAlive uint16 + Will *Message + ProtocolLevel ProtocolLevel +} + +// ConnectOption sets option for Connect. +type ConnectOption func(*ConnectOptions) error + +// WithUserNamePassword sets plain text auth information used in Connect. +func WithUserNamePassword(userName, password string) ConnectOption { + return func(o *ConnectOptions) error { + o.UserName = userName + o.Password = password + return nil + } +} + +// WithKeepAlive sets keep alive interval in seconds. +func WithKeepAlive(interval uint16) ConnectOption { + return func(o *ConnectOptions) error { + o.KeepAlive = interval + return nil + } +} + +// WithCleanSession sets clean session flag. +func WithCleanSession(cleanSession bool) ConnectOption { + return func(o *ConnectOptions) error { + o.CleanSession = cleanSession + return nil + } +} + +// WithWill sets will message. +func WithWill(will *Message) ConnectOption { + return func(o *ConnectOptions) error { + o.Will = will + return nil + } +} + +// WithProtocolLevel sets protocol level. +func WithProtocolLevel(level ProtocolLevel) ConnectOption { + return func(o *ConnectOptions) error { + o.ProtocolLevel = level + return nil + } +} diff --git a/mqtt.go b/mqtt.go index 80f2b09..3ee390a 100644 --- a/mqtt.go +++ b/mqtt.go @@ -31,18 +31,6 @@ type Subscription struct { QoS QoS } -// ConnectOptions represents options for Connect. -type ConnectOptions struct { - UserName string - Password string - CleanSession bool - KeepAlive uint16 - Will *Message -} - -// ConnectOption sets option for Connect. -type ConnectOption func(*ConnectOptions) error - // Client is the interface of MQTT client. type Client interface { Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error) diff --git a/paho/paho.go b/paho/paho.go index d204d52..8eafe14 100644 --- a/paho/paho.go +++ b/paho/paho.go @@ -159,6 +159,7 @@ func (c *pahoWrapper) Connect() paho.Token { mqtt.WithUserNamePassword(c.pahoConfig.Username, c.pahoConfig.Password), mqtt.WithCleanSession(c.pahoConfig.CleanSession), mqtt.WithKeepAlive(uint16(c.pahoConfig.KeepAlive)), + mqtt.WithProtocolLevel(mqtt.ProtocolLevel(c.pahoConfig.ProtocolVersion)), } if c.pahoConfig.WillEnabled { opts = append(opts, mqtt.WithWill(&mqtt.Message{