Skip to content

Commit

Permalink
Add option to specify protocol level (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 23, 2019
1 parent e97ce26 commit 2cda1b5
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 49 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ install:

before_script:
- docker-compose up -d
- echo 'replace github.com/at-wat/mqtt-go => ../' >> paho/go.mod

script:
- go vet -tags integration ./...
Expand Down
33 changes: 0 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,6 @@ func (c *BaseClient) Handle(handler Handler) {
c.handler = handler
}

// WithUserNamePassword sets plain text auth information used in Connect.
func WithUserNamePassword(userName, password string) ConnectOption {
return func(o *ConnectOptions) error {
o.UserName = userName
o.Password = password
return nil
}
}

// WithKeepAlive sets keep alive interval in seconds.
func WithKeepAlive(interval uint16) ConnectOption {
return func(o *ConnectOptions) error {
o.KeepAlive = interval
return nil
}
}

// WithCleanSession sets clean session flag.
func WithCleanSession(cleanSession bool) ConnectOption {
return func(o *ConnectOptions) error {
o.CleanSession = cleanSession
return nil
}
}

// WithWill sets will message.
func WithWill(will *Message) ConnectOption {
return func(o *ConnectOptions) error {
o.Will = will
return nil
}
}

func (c *BaseClient) write(b []byte) error {
l := len(b)
c.muWrite.Lock()
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestConnect(t *testing.T) {
WithUserNamePassword("user", "pass"),
WithKeepAlive(0x0123),
WithCleanSession(true),
WithProtocolLevel(ProtocolLevel4),
WithWill(&Message{QoS: QoS1, Topic: "topic", Payload: []byte{0x01}}),
)
}()
Expand Down
67 changes: 63 additions & 4 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"io"
)

type protocolLevel byte
// ProtocolLevel represents MQTT protocol level.
type ProtocolLevel byte

// ProtocolLevel values.
const (
protocol311 protocolLevel = 0x04
ProtocolLevel3 ProtocolLevel = 0x03 // MQTT 3.1
ProtocolLevel4 ProtocolLevel = 0x04 // MQTT 3.1.1 (default)
)

type connectFlag byte
Expand All @@ -27,7 +30,9 @@ const (

// Connect to the broker.
func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error) {
o := &ConnectOptions{}
o := &ConnectOptions{
ProtocolLevel: ProtocolLevel4,
}
for _, opt := range opts {
if err := opt(o); err != nil {
return false, err
Expand Down Expand Up @@ -86,7 +91,7 @@ func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...Conne
packetConnect.b(),
[]byte{
0x00, 0x04, 0x4D, 0x51, 0x54, 0x54,
byte(protocol311),
byte(o.ProtocolLevel),
flag,
},
packUint16(o.KeepAlive),
Expand All @@ -113,3 +118,57 @@ func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...Conne
return connAck.SessionPresent, nil
}
}

// ConnectOptions represents options for Connect.
type ConnectOptions struct {
UserName string
Password string
CleanSession bool
KeepAlive uint16
Will *Message
ProtocolLevel ProtocolLevel
}

// ConnectOption sets option for Connect.
type ConnectOption func(*ConnectOptions) error

// WithUserNamePassword sets plain text auth information used in Connect.
func WithUserNamePassword(userName, password string) ConnectOption {
return func(o *ConnectOptions) error {
o.UserName = userName
o.Password = password
return nil
}
}

// WithKeepAlive sets keep alive interval in seconds.
func WithKeepAlive(interval uint16) ConnectOption {
return func(o *ConnectOptions) error {
o.KeepAlive = interval
return nil
}
}

// WithCleanSession sets clean session flag.
func WithCleanSession(cleanSession bool) ConnectOption {
return func(o *ConnectOptions) error {
o.CleanSession = cleanSession
return nil
}
}

// WithWill sets will message.
func WithWill(will *Message) ConnectOption {
return func(o *ConnectOptions) error {
o.Will = will
return nil
}
}

// WithProtocolLevel sets protocol level.
func WithProtocolLevel(level ProtocolLevel) ConnectOption {
return func(o *ConnectOptions) error {
o.ProtocolLevel = level
return nil
}
}
12 changes: 0 additions & 12 deletions mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ type Subscription struct {
QoS QoS
}

// ConnectOptions represents options for Connect.
type ConnectOptions struct {
UserName string
Password string
CleanSession bool
KeepAlive uint16
Will *Message
}

// ConnectOption sets option for Connect.
type ConnectOption func(*ConnectOptions) error

// Client is the interface of MQTT client.
type Client interface {
Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error)
Expand Down
1 change: 1 addition & 0 deletions paho/paho.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (c *pahoWrapper) Connect() paho.Token {
mqtt.WithUserNamePassword(c.pahoConfig.Username, c.pahoConfig.Password),
mqtt.WithCleanSession(c.pahoConfig.CleanSession),
mqtt.WithKeepAlive(uint16(c.pahoConfig.KeepAlive)),
mqtt.WithProtocolLevel(mqtt.ProtocolLevel(c.pahoConfig.ProtocolVersion)),
}
if c.pahoConfig.WillEnabled {
opts = append(opts, mqtt.WithWill(&mqtt.Message{
Expand Down

0 comments on commit 2cda1b5

Please sign in to comment.