Skip to content

Commit

Permalink
Add ReconnectOptions (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 23, 2019
1 parent b597ede commit 7171ca6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
21 changes: 13 additions & 8 deletions examples/mqtts-client-cert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,20 @@ func main() {
},
},
"sample", // Client ID
mqtt.WithKeepAlive(30),
mqtt.WithCleanSession(true),
mqtt.WithWill(
&mqtt.Message{
Topic: "test",
QoS: mqtt.QoS1,
Payload: []byte("{\"message\": \"Bye\"}"),
},
mqtt.WithConnectOption(
mqtt.WithKeepAlive(30),
mqtt.WithCleanSession(true),
mqtt.WithWill(
&mqtt.Message{
Topic: "test",
QoS: mqtt.QoS1,
Payload: []byte("{\"message\": \"Bye\"}"),
},
),
),
mqtt.WithPingInterval(10*time.Second),
mqtt.WithTimeout(5*time.Second),
mqtt.WithReconnectWait(1*time.Second, 15*time.Second),
)
if err != nil {
fmt.Printf("Error: %v\n", err)
Expand Down
93 changes: 72 additions & 21 deletions reconnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,56 @@ import (
"time"
)

type reconnectClient struct {
Client
}

// NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.
func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opts ...ConnectOption) (Client, error) {
func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opts ...ReconnectOption) (Client, error) {
rc := &RetryClient{}

options := &ConnectOptions{CleanSession: true}
options := &ReconnectOptions{}
for _, opt := range opts {
if err := opt(options); err != nil {
return nil, err
}
}
connOptions := &ConnectOptions{
CleanSession: true,
}
for _, opt := range options.ConnectOptions {
if err := opt(connOptions); err != nil {
return nil, err
}
}
if options.PingInterval == time.Duration(0) {
options.PingInterval = time.Duration(connOptions.KeepAlive) * time.Second
}
if options.Timeout == time.Duration(0) {
options.Timeout = options.PingInterval
}

done := make(chan struct{})
var doneOnce sync.Once
go func() {
clean := options.CleanSession
reconnWaitBase := time.Second
reconnWaitMax := 10 * time.Second
reconnWait := reconnWaitBase
clean := connOptions.CleanSession
reconnWait := options.ReconnectWaitBase
for {
if c, err := dialer.Dial(); err == nil {
optsCurr := append([]ConnectOption{}, opts...)
optsCurr := append([]ConnectOption{}, options.ConnectOptions...)
optsCurr = append(optsCurr, WithCleanSession(clean))
clean = false // Clean only first time.
rc.SetClient(ctx, c)

if present, err := rc.Connect(ctx, clientID, optsCurr...); err == nil {
reconnWait = reconnWaitBase // Reset reconnect wait.
reconnWait = options.ReconnectWaitBase // Reset reconnect wait.
doneOnce.Do(func() { close(done) })
if present {
rc.Resubscribe(ctx)
}
if options.KeepAlive > 0 {
if options.PingInterval > time.Duration(0) {
// Start keep alive.
go func() {
timeout := time.Duration(options.KeepAlive) * time.Second / 2
if timeout < time.Second {
timeout = time.Second
}
_ = KeepAlive(
ctx, c,
time.Duration(options.KeepAlive)*time.Second,
timeout,
options.PingInterval,
options.Timeout,
)
}()
}
Expand All @@ -88,8 +92,8 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt
return
}
reconnWait *= 2
if reconnWait > reconnWaitMax {
reconnWait = reconnWaitMax
if reconnWait > options.ReconnectWaitMax {
reconnWait = options.ReconnectWaitMax
}
}
}()
Expand All @@ -100,3 +104,50 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt
}
return rc, nil
}

// ReconnectOptions represents options for Connect.
type ReconnectOptions struct {
ConnectOptions []ConnectOption
Timeout time.Duration
ReconnectWaitBase time.Duration
ReconnectWaitMax time.Duration
PingInterval time.Duration
}

// ReconnectOption sets option for Connect.
type ReconnectOption func(*ReconnectOptions) error

// WithConnectOption sets ConnectOption to ReconnectClient.
func WithConnectOption(connOpts ...ConnectOption) ReconnectOption {
return func(o *ReconnectOptions) error {
o.ConnectOptions = connOpts
return nil
}
}

// WithTimeout sets timeout duration of server response.
// Default value is PingInterval.
func WithTimeout(timeout time.Duration) ReconnectOption {
return func(o *ReconnectOptions) error {
o.Timeout = timeout
return nil
}
}

// WithReconnectWait sets parameters of incremental reconnect wait.
func WithReconnectWait(base, max time.Duration) ReconnectOption {
return func(o *ReconnectOptions) error {
o.ReconnectWaitBase = base
o.ReconnectWaitMax = max
return nil
}
}

// WithPingInterval sets ping request interval.
// Default value is KeepAlive value set by ConnectOption.
func WithPingInterval(interval time.Duration) ReconnectOption {
return func(o *ReconnectOptions) error {
o.PingInterval = interval
return nil
}
}
9 changes: 7 additions & 2 deletions reconnclient_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ func TestIntegration_ReconnectClient(t *testing.T) {
},
},
"ReconnectClient"+name,
WithKeepAlive(10),
WithCleanSession(true),
WithConnectOption(
WithKeepAlive(10),
WithCleanSession(true),
),
WithPingInterval(time.Second),
WithTimeout(time.Second),
WithReconnectWait(time.Second, 10*time.Second),
)
if err != nil {
t.Fatalf("Unexpected error: '%v'", err)
Expand Down

0 comments on commit 7171ca6

Please sign in to comment.