Skip to content

Commit

Permalink
Add error handling for closed connection in sdk update listener
Browse files Browse the repository at this point in the history
  • Loading branch information
tbazen committed Jan 26, 2024
1 parent 18105cc commit 7aedd85
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/Memphis.Client.IntegrationTests/FullFlowTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Specialized;
using System.Diagnostics;
using System.Text;
using Memphis.Client.Consumer;
using Memphis.Client.Producer;
Expand Down Expand Up @@ -201,9 +202,11 @@ public static async Task<int> CountMessagesConsumedByGroup(MemphisConsumer consu
args.MessageList.ForEach(msg => msg.Ack());
};

_ = consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey });
_ = consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey });
await Task.Delay(TimeSpan.FromSeconds(10));
await Task.WhenAny(
consumer1.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }),
consumer2.ConsumeAsync(new ConsumeOptions { PartitionKey = partitionKey }),
Task.Delay(TimeSpan.FromSeconds(30)));

return count;
}
}
4 changes: 4 additions & 0 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,10 @@ void SyncSdkClientUpdate()
}
}
}
catch when (!IsConnected())
{
// Connection is closed
}
catch (System.Exception exception)
{
throw new MemphisException(exception.Message, exception);
Expand Down

0 comments on commit 7aedd85

Please sign in to comment.