Skip to content

Commit

Permalink
Add SetClient/Connect count to RetryClient stats (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Jan 13, 2023
1 parent 7211191 commit 78fc549
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
16 changes: 16 additions & 0 deletions retryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type RetryStats struct {
TotalTasks int
// Total number of retries.
TotalRetries int
// Count of SetClient.
CountSetClient int
// Count of Connect.
CountConnect int
// Count of error on Connect.
CountConnectError int
}

// Handle registers the message handler.
Expand Down Expand Up @@ -276,6 +282,9 @@ func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient) {
}
c.chConnSwitch = make(chan struct{})
c.mu.Unlock()
c.muStats.Lock()
c.stats.CountSetClient++
c.muStats.Unlock()

if c.chTask != nil {
return
Expand Down Expand Up @@ -395,8 +404,15 @@ func (c *RetryClient) Connect(ctx context.Context, clientID string, opts ...Conn
chConnectErr := c.chConnectErr
c.mu.Unlock()

c.muStats.Lock()
c.stats.CountConnect++
c.muStats.Unlock()

present, err := cli.Connect(ctx, clientID, opts...)
if err != nil {
c.muStats.Lock()
c.stats.CountConnectError++
c.muStats.Unlock()
chConnectErr <- err
}
close(chConnectErr)
Expand Down
52 changes: 39 additions & 13 deletions retryclient_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func expectRetryStats(t *testing.T, expected, actual RetryStats) {
if expected.TotalRetries != actual.TotalRetries {
t.Errorf("Expected total retries: %d, actual: %d", expected.TotalRetries, actual.TotalRetries)
}
if expected.CountSetClient != actual.CountSetClient {
t.Errorf("Expected count of SetClient: %d, actual: %d", expected.CountSetClient, actual.CountSetClient)
}
if expected.CountConnect != actual.CountConnect {
t.Errorf("Expected count of Connect: %d, actual: %d", expected.CountConnect, actual.CountConnect)
}
if expected.CountConnectError != actual.CountConnectError {
t.Errorf("Expected count of Connect error: %d, actual: %d", expected.CountConnectError, actual.CountConnectError)
}
}

func TestIntegration_RetryClient(t *testing.T) {
Expand Down Expand Up @@ -73,7 +82,9 @@ func TestIntegration_RetryClient(t *testing.T) {

time.Sleep(50 * time.Millisecond)
expectRetryStats(t, RetryStats{
TotalTasks: 1,
TotalTasks: 1,
CountSetClient: 1,
CountConnect: 1,
}, cli.Stats())

if err := cli.Disconnect(ctx); err != nil {
Expand Down Expand Up @@ -238,7 +249,9 @@ func TestIntegration_RetryClient_TaskQueue(t *testing.T) {

if pubAt == pubBeforeSetClient || pubAt == pubBeforeConnect {
expectRetryStats(t, RetryStats{
QueuedTasks: 100,
QueuedTasks: 100,
CountSetClient: 2,
CountConnect: 0,
}, cli.Stats())
}
}
Expand All @@ -258,7 +271,9 @@ func TestIntegration_RetryClient_TaskQueue(t *testing.T) {
}

expectRetryStats(t, RetryStats{
TotalTasks: 100,
TotalTasks: 100,
CountSetClient: 2,
CountConnect: 1,
}, cli.Stats())

if err := cli.Disconnect(ctx); err != nil {
Expand Down Expand Up @@ -321,8 +336,11 @@ func TestIntegration_RetryClient_RetryInitialRequest(t *testing.T) {
// Mosquitto WebSocket sometimes requires extra time to connect
// and retry number may be increased.
expectRetryStats(t, RetryStats{
TotalTasks: 1, // first try to subscribe (failed)
QueuedRetries: 1,
TotalTasks: 1, // first try to subscribe (failed)
QueuedRetries: 1,
CountSetClient: 3,
CountConnect: 3,
CountConnectError: 3,
}, cli.Stats())
}

Expand Down Expand Up @@ -421,8 +439,10 @@ func TestIntegration_RetryClient_ResponseTimeout(t *testing.T) {
// Client must be closed due to response timeout.
}
expectRetryStats(t, RetryStats{
QueuedRetries: 1,
TotalTasks: 1,
QueuedRetries: 1,
TotalTasks: 1,
CountSetClient: 1,
CountConnect: 1,
}, cli.Stats())

cli.SetClient(ctx, baseCliRemovePacket(ctx, t, func([]byte) bool {
Expand All @@ -435,8 +455,10 @@ func TestIntegration_RetryClient_ResponseTimeout(t *testing.T) {

time.Sleep(150 * time.Millisecond)
expectRetryStats(t, RetryStats{
TotalRetries: 1,
TotalTasks: 2,
TotalRetries: 1,
TotalTasks: 2,
CountSetClient: 2,
CountConnect: 2,
}, cli.Stats())

if err := cli.Disconnect(ctx); err != nil {
Expand Down Expand Up @@ -474,8 +496,10 @@ func TestIntegration_RetryClient_DirectlyPublishQoS0(t *testing.T) {

time.Sleep(150 * time.Millisecond)
expectRetryStats(t, RetryStats{
TotalRetries: 0,
TotalTasks: 1,
TotalRetries: 0,
TotalTasks: 1,
CountSetClient: 1,
CountConnect: 1,
}, cli.Stats())

if err := cli.Publish(ctx, &Message{
Expand All @@ -492,7 +516,9 @@ func TestIntegration_RetryClient_DirectlyPublishQoS0(t *testing.T) {
t.Fatal("Timeout")
}
expectRetryStats(t, RetryStats{
TotalRetries: 0,
TotalTasks: 1,
TotalRetries: 0,
TotalTasks: 1,
CountSetClient: 1,
CountConnect: 1,
}, cli.Stats())
}

0 comments on commit 78fc549

Please sign in to comment.