diff --git a/examples/mqtts-client-cert/main.go b/examples/mqtts-client-cert/main.go index 3fbcf5b..3398ec3 100644 --- a/examples/mqtts-client-cert/main.go +++ b/examples/mqtts-client-cert/main.go @@ -59,15 +59,20 @@ func main() { }, }, "sample", // Client ID - mqtt.WithKeepAlive(30), - mqtt.WithCleanSession(true), - mqtt.WithWill( - &mqtt.Message{ - Topic: "test", - QoS: mqtt.QoS1, - Payload: []byte("{\"message\": \"Bye\"}"), - }, + mqtt.WithConnectOption( + mqtt.WithKeepAlive(30), + mqtt.WithCleanSession(true), + mqtt.WithWill( + &mqtt.Message{ + Topic: "test", + QoS: mqtt.QoS1, + Payload: []byte("{\"message\": \"Bye\"}"), + }, + ), ), + mqtt.WithPingInterval(10*time.Second), + mqtt.WithTimeout(5*time.Second), + mqtt.WithReconnectWait(1*time.Second, 15*time.Second), ) if err != nil { fmt.Printf("Error: %v\n", err) diff --git a/reconnclient.go b/reconnclient.go index 091794b..58fa165 100644 --- a/reconnclient.go +++ b/reconnclient.go @@ -20,52 +20,56 @@ import ( "time" ) -type reconnectClient struct { - Client -} - // NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features. -func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opts ...ConnectOption) (Client, error) { +func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opts ...ReconnectOption) (Client, error) { rc := &RetryClient{} - options := &ConnectOptions{CleanSession: true} + options := &ReconnectOptions{} for _, opt := range opts { if err := opt(options); err != nil { return nil, err } } + connOptions := &ConnectOptions{ + CleanSession: true, + } + for _, opt := range options.ConnectOptions { + if err := opt(connOptions); err != nil { + return nil, err + } + } + if options.PingInterval == time.Duration(0) { + options.PingInterval = time.Duration(connOptions.KeepAlive) * time.Second + } + if options.Timeout == time.Duration(0) { + options.Timeout = options.PingInterval + } done := make(chan struct{}) var doneOnce sync.Once go func() { - clean := options.CleanSession - reconnWaitBase := time.Second - reconnWaitMax := 10 * time.Second - reconnWait := reconnWaitBase + clean := connOptions.CleanSession + reconnWait := options.ReconnectWaitBase for { if c, err := dialer.Dial(); err == nil { - optsCurr := append([]ConnectOption{}, opts...) + optsCurr := append([]ConnectOption{}, options.ConnectOptions...) optsCurr = append(optsCurr, WithCleanSession(clean)) clean = false // Clean only first time. rc.SetClient(ctx, c) if present, err := rc.Connect(ctx, clientID, optsCurr...); err == nil { - reconnWait = reconnWaitBase // Reset reconnect wait. + reconnWait = options.ReconnectWaitBase // Reset reconnect wait. doneOnce.Do(func() { close(done) }) if present { rc.Resubscribe(ctx) } - if options.KeepAlive > 0 { + if options.PingInterval > time.Duration(0) { // Start keep alive. go func() { - timeout := time.Duration(options.KeepAlive) * time.Second / 2 - if timeout < time.Second { - timeout = time.Second - } _ = KeepAlive( ctx, c, - time.Duration(options.KeepAlive)*time.Second, - timeout, + options.PingInterval, + options.Timeout, ) }() } @@ -88,8 +92,8 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt return } reconnWait *= 2 - if reconnWait > reconnWaitMax { - reconnWait = reconnWaitMax + if reconnWait > options.ReconnectWaitMax { + reconnWait = options.ReconnectWaitMax } } }() @@ -100,3 +104,50 @@ func NewReconnectClient(ctx context.Context, dialer Dialer, clientID string, opt } return rc, nil } + +// ReconnectOptions represents options for Connect. +type ReconnectOptions struct { + ConnectOptions []ConnectOption + Timeout time.Duration + ReconnectWaitBase time.Duration + ReconnectWaitMax time.Duration + PingInterval time.Duration +} + +// ReconnectOption sets option for Connect. +type ReconnectOption func(*ReconnectOptions) error + +// WithConnectOption sets ConnectOption to ReconnectClient. +func WithConnectOption(connOpts ...ConnectOption) ReconnectOption { + return func(o *ReconnectOptions) error { + o.ConnectOptions = connOpts + return nil + } +} + +// WithTimeout sets timeout duration of server response. +// Default value is PingInterval. +func WithTimeout(timeout time.Duration) ReconnectOption { + return func(o *ReconnectOptions) error { + o.Timeout = timeout + return nil + } +} + +// WithReconnectWait sets parameters of incremental reconnect wait. +func WithReconnectWait(base, max time.Duration) ReconnectOption { + return func(o *ReconnectOptions) error { + o.ReconnectWaitBase = base + o.ReconnectWaitMax = max + return nil + } +} + +// WithPingInterval sets ping request interval. +// Default value is KeepAlive value set by ConnectOption. +func WithPingInterval(interval time.Duration) ReconnectOption { + return func(o *ReconnectOptions) error { + o.PingInterval = interval + return nil + } +} diff --git a/reconnclient_integration_test.go b/reconnclient_integration_test.go index 2394a3e..5356ebf 100644 --- a/reconnclient_integration_test.go +++ b/reconnclient_integration_test.go @@ -39,8 +39,13 @@ func TestIntegration_ReconnectClient(t *testing.T) { }, }, "ReconnectClient"+name, - WithKeepAlive(10), - WithCleanSession(true), + WithConnectOption( + WithKeepAlive(10), + WithCleanSession(true), + ), + WithPingInterval(time.Second), + WithTimeout(time.Second), + WithReconnectWait(time.Second, 10*time.Second), ) if err != nil { t.Fatalf("Unexpected error: '%v'", err)