Skip to content

Commit

Permalink
Run publish/subscribe of RetryClient in single routine (#93)
Browse files Browse the repository at this point in the history
* Run publish/subscribe of RetryClient in single routine
* Unexpose RetryClient.Client
  • Loading branch information
at-wat authored Feb 8, 2020
1 parent 38b1c6f commit 39b4b5c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
2 changes: 1 addition & 1 deletion reconnclient_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestIntegration_ReconnectClient(t *testing.T) {

// Close underlying client.
time.Sleep(time.Millisecond)
cli.(*reconnectClient).Client.(ClientCloser).Close()
cli.(*reconnectClient).cli.(ClientCloser).Close()

if err := cli.Subscribe(ctx, Subscription{Topic: "test", QoS: QoS1}); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
Expand Down
79 changes: 52 additions & 27 deletions retryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// RetryClient queues unacknowledged messages and retry on reconnect.
type RetryClient struct {
Client
cli Client

pubQueue []*Message // unacknoledged messages
subQueue [][]Subscription // unacknoledged subscriptions
Expand All @@ -30,52 +30,42 @@ type RetryClient struct {
mu sync.Mutex
muQueue sync.Mutex
handler Handler
cancelTaskLoop func()
chTask chan func(cli Client)
}

// Handle registers the message handler.
func (c *RetryClient) Handle(handler Handler) {
c.mu.Lock()
defer c.mu.Unlock()
c.handler = handler
if c.Client != nil {
c.Client.Handle(handler)
if c.cli != nil {
c.cli.Handle(handler)
}
}

// Publish tries to publish the message and immediately return nil.
// If it is not acknowledged to be published, the message will be queued.
func (c *RetryClient) Publish(ctx context.Context, message *Message) error {
go func() {
c.mu.Lock()
cli := c.Client
c.mu.Unlock()
return c.pushTask(ctx, func(cli Client) {
c.publish(ctx, false, cli, message)
}()
return nil
})
}

// Subscribe tries to subscribe the topic and immediately return nil.
// If it is not acknowledged to be subscribed, the request will be queued.
func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) error {
go func() {
c.mu.Lock()
cli := c.Client
c.mu.Unlock()
return c.pushTask(ctx, func(cli Client) {
c.subscribe(ctx, false, cli, subs...)
}()
return nil
})
}

// Unsubscribe tries to unsubscribe the topic and immediately return nil.
// If it is not acknowledged to be unsubscribed, the request will be queued.
func (c *RetryClient) Unsubscribe(ctx context.Context, topics ...string) error {
go func() {
c.mu.Lock()
cli := c.Client
c.mu.Unlock()
return c.pushTask(ctx, func(cli Client) {
c.unsubscribe(ctx, false, cli, topics...)
}()
return nil
})
}

func (c *RetryClient) publish(ctx context.Context, retry bool, cli Client, message *Message) {
Expand Down Expand Up @@ -160,15 +150,18 @@ func (c *RetryClient) removeEstablished(topics ...string) {
// Disconnect from the broker.
func (c *RetryClient) Disconnect(ctx context.Context) error {
c.mu.Lock()
cli := c.Client
cli := c.cli
if c.cancelTaskLoop != nil {
c.cancelTaskLoop()
}
c.mu.Unlock()
return cli.Disconnect(ctx)
}

// Ping to the broker.
func (c *RetryClient) Ping(ctx context.Context) error {
c.mu.Lock()
cli := c.Client
cli := c.cli
c.mu.Unlock()
return cli.Ping(ctx)
}
Expand All @@ -178,13 +171,45 @@ func (c *RetryClient) Ping(ctx context.Context) error {
func (c *RetryClient) SetClient(ctx context.Context, cli Client) {
c.mu.Lock()
defer c.mu.Unlock()
c.Client = cli
c.cli = cli

if c.chTask == nil {
c.chTask = make(chan func(cli Client))
}

ctx, cancel := context.WithCancel(ctx)
if c.cancelTaskLoop != nil {
c.cancelTaskLoop()
}
c.cancelTaskLoop = cancel
go func() {
for {
select {
case <-ctx.Done():
return
case task := <-c.chTask:
task(cli)
}
}
}()
}

func (c *RetryClient) pushTask(ctx context.Context, task func(cli Client)) error {
c.mu.Lock()
chTask := c.chTask
c.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case chTask <- task:
}
return nil
}

// Connect to the broker.
func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...ConnectOption) (sessionPresent bool, err error) {
c.mu.Lock()
cli := c.Client
cli := c.cli
cli.Handle(c.handler)
c.mu.Unlock()

Expand All @@ -202,7 +227,7 @@ func (c *RetryClient) Resubscribe(ctx context.Context) {

if len(oldSubEstablished) > 0 {
c.mu.Lock()
cli := c.Client
cli := c.cli
c.mu.Unlock()
for _, sub := range oldSubEstablished {
c.subscribe(ctx, true, cli, sub)
Expand All @@ -213,7 +238,7 @@ func (c *RetryClient) Resubscribe(ctx context.Context) {
// Retry all queued publish/subscribe requests.
func (c *RetryClient) Retry(ctx context.Context) {
c.mu.Lock()
cli := c.Client
cli := c.cli
c.mu.Unlock()

c.muQueue.Lock()
Expand Down

0 comments on commit 39b4b5c

Please sign in to comment.