diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 48a1481d..9860b070 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -118,25 +118,12 @@ private async Task SendPublish(string cacheName, string to private async Task SendSubscribe(string cacheName, string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage) { - var request = new _SubscriptionRequest - { - CacheName = cacheName, - Topic = topicName - }; - if (resumeAtTopicSequenceNumber != null) - { - request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value; - } - if (resumeAtTopicSequencePage != null) - { - request.SequencePage = resumeAtTopicSequencePage.Value; - } - SubscriptionWrapper subscriptionWrapper; try { _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName); - subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger); + subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, + resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger); await subscriptionWrapper.Subscribe(); } catch (Exception e) @@ -161,16 +148,19 @@ private class SubscriptionWrapper : IDisposable private readonly ILogger _logger; private AsyncServerStreamingCall<_SubscriptionItem>? _subscription; - private ulong? _lastSequenceNumber; - private ulong? _lastSequencePage; + private ulong _lastSequenceNumber; + private ulong _lastSequencePage; private bool _subscribed; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, - string topicName, CacheExceptionMapper exceptionMapper, ILogger logger) + string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage, + CacheExceptionMapper exceptionMapper, ILogger logger) { _grpcManager = grpcManager; _cacheName = cacheName; _topicName = topicName; + _lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0; + _lastSequencePage = resumeAtTopicSequencePage ?? 0; _exceptionMapper = exceptionMapper; _logger = logger; } @@ -183,15 +173,8 @@ public async Task Subscribe() Topic = _topicName }; - if (_lastSequenceNumber != null) - { - request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value; - } - - if (_lastSequencePage != null) - { - request.SequencePage = _lastSequencePage.Value; - } + request.ResumeAtTopicSequenceNumber = _lastSequenceNumber; + request.SequencePage = _lastSequencePage; _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName); var subscription = _grpcManager.Client.subscribe(request, new CallOptions()); @@ -264,10 +247,10 @@ public async Task Subscribe() { case _TopicValue.KindOneofCase.Text: _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName); - return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.Binary: _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName); - return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.None: default: _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName); @@ -280,9 +263,9 @@ public async Task Subscribe() message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage); _lastSequenceNumber = message.Discontinuity.NewTopicSequence; _lastSequencePage = message.Discontinuity.NewSequencePage; - return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence), - checked((long)message.Discontinuity.NewTopicSequence), - checked((long)message.Discontinuity.NewSequencePage)); + return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence, + message.Discontinuity.NewTopicSequence, + message.Discontinuity.NewSequencePage); case _SubscriptionItem.KindOneofCase.Heartbeat: _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName); return new TopicSystemEvent.Heartbeat(); diff --git a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs index 13ad5148..da899dca 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs @@ -42,7 +42,7 @@ public class Text : TopicMessage /// /// A topic message containing a text value. /// - public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) + public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Text; TopicSequenceNumber = topicSequenceNumber; @@ -58,12 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequence /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } /// /// The sequence page of this message. /// - public long TopicSequencePage { get; } + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. @@ -86,7 +86,7 @@ public class Binary : TopicMessage /// /// A topic message containing a binary value. /// - public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) + public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Binary.ToByteArray(); TopicSequenceNumber = topicSequenceNumber; @@ -103,12 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequen /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } /// /// The sequence page of this message. /// - public long TopicSequencePage { get; } + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. diff --git a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs index 6fe00b8a..5e2c0fdc 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs @@ -33,7 +33,7 @@ public class Discontinuity : TopicSystemEvent /// The last known sequence number before the discontinuity. /// The sequence number of the discontinuity. /// The sequence page of the discontinuity. - public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage) + public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage) { LastKnownSequenceNumber = lastKnownSequenceNumber; SequenceNumber = sequenceNumber; @@ -43,17 +43,17 @@ public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long seq /// /// The last known sequence number before the discontinuity. /// - public long LastKnownSequenceNumber { get; } + public ulong LastKnownSequenceNumber { get; } /// /// The sequence number of the discontinuity. /// - public long SequenceNumber { get; } + public ulong SequenceNumber { get; } /// /// The sequence page of the discontinuity. /// - public long SequencePage { get; } + public ulong SequencePage { get; } /// public override string ToString() diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index 72578f18..ee8023a5 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds() { var textMessage = (TopicMessage.Text)consumedMessages[i]; Assert.Equal(textMessage.Value, valuesToSend[i]); - Assert.Equal(textMessage.TopicSequenceNumber, i + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1))); } } @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds() { case TopicMessage.Text textMessage: Assert.Equal(textMessage.Value, valuesToSend[messageCount]); - Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1))); messageCount++; break; case TopicSystemEvent.Heartbeat: