Skip to content

Commit

Permalink
Fix multi time publish/subscribe retry (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 25, 2019
1 parent 476a6d9 commit 1e4d6ed
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions retryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *RetryClient) Publish(ctx context.Context, message *Message) error {
c.mu.Lock()
cli := c.Client
c.mu.Unlock()
c.publish(ctx, cli, message)
c.publish(ctx, false, cli, message)
}()
return nil
}
Expand All @@ -60,39 +60,45 @@ func (c *RetryClient) Subscribe(ctx context.Context, subs ...Subscription) error
c.mu.Lock()
cli := c.Client
c.mu.Unlock()
c.subscribe(ctx, cli, subs...)
c.subscribe(ctx, false, cli, subs...)
}()
return nil
}

func (c *RetryClient) publish(ctx context.Context, cli Client, message *Message) {
func (c *RetryClient) publish(ctx context.Context, retry bool, cli Client, message *Message) {
if err := cli.Publish(ctx, message); err != nil {
select {
case <-ctx.Done():
// User cancelled; don't queue.
if !retry {
// User cancelled; don't queue.
return
}
default:
if message.QoS > QoS0 {
copyMsg := *message
}
if message.QoS > QoS0 {
copyMsg := *message

c.muQueue.Lock()
copyMsg.Dup = true
c.pubQueue = append(c.pubQueue, &copyMsg)
c.muQueue.Unlock()
}
c.muQueue.Lock()
copyMsg.Dup = true
c.pubQueue = append(c.pubQueue, &copyMsg)
c.muQueue.Unlock()
}
}
}

func (c *RetryClient) subscribe(ctx context.Context, cli Client, subs ...Subscription) {
func (c *RetryClient) subscribe(ctx context.Context, retry bool, cli Client, subs ...Subscription) {
if err := cli.Subscribe(ctx, subs...); err != nil {
select {
case <-ctx.Done():
// User cancelled; don't queue.
if !retry {
// User cancelled; don't queue.
return
}
default:
c.muQueue.Lock()
c.subQueue = append(c.subQueue, subs)
c.muQueue.Unlock()
}
c.muQueue.Lock()
c.subQueue = append(c.subQueue, subs)
c.muQueue.Unlock()
} else {
c.muQueue.Lock()
c.subEstablished = append(c.subEstablished, subs)
Expand Down Expand Up @@ -151,10 +157,10 @@ func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...Conn
// Retry publish.
go func() {
for _, sub := range oldSubQueue {
c.subscribe(ctx, cli, sub...)
c.subscribe(ctx, true, cli, sub...)
}
for _, msg := range oldPubQueue {
c.publish(ctx, cli, msg)
c.publish(ctx, true, cli, msg)
}
}()

Expand All @@ -173,7 +179,7 @@ func (c *RetryClient) Resubscribe(ctx context.Context) error {
cli := c.Client
c.mu.Unlock()
for _, sub := range oldSubEstablished {
c.subscribe(ctx, cli, sub...)
c.subscribe(ctx, true, cli, sub...)
}
}
return nil
Expand Down

0 comments on commit 1e4d6ed

Please sign in to comment.