From 1a987d26022ffc6b4657a830c560e813ebe630d0 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 11 Nov 2024 16:46:27 -0800 Subject: [PATCH 1/5] feat: use ulong for topic sequence page and number Prefer using ulong for topic sequence number and page. Even though this is not CLS compliant, both of these are represented as protobuf `uint64`s. The topic sequence page can grow unboundedly and we should retain precision. --- src/Momento.Sdk/Internal/ScsTopicClient.cs | 28 +++++++++---------- .../Responses/Topic/TopicMessage.cs | 12 ++++---- .../Responses/Topic/TopicSystemEvent.cs | 8 +++--- .../Momento.Sdk.Tests/Topics/TopicTest.cs | 2 +- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 48a1481d..60728cec 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -161,9 +161,10 @@ private class SubscriptionWrapper : IDisposable private readonly ILogger _logger; private AsyncServerStreamingCall<_SubscriptionItem>? _subscription; - private ulong? _lastSequenceNumber; - private ulong? _lastSequencePage; + private ulong _lastSequenceNumber; + private ulong _lastSequencePage; private bool _subscribed; + private bool _firstSubscribeCall = true; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, string topicName, CacheExceptionMapper exceptionMapper, ILogger logger) @@ -183,14 +184,12 @@ public async Task Subscribe() Topic = _topicName }; - if (_lastSequenceNumber != null) + // Use the caller supplied sequence number and page on the initial subscribe. + // Otherwise we will resume from the last known sequence number and page. + if (!_firstSubscribeCall) { - request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value; - } - - if (_lastSequencePage != null) - { - request.SequencePage = _lastSequencePage.Value; + request.ResumeAtTopicSequenceNumber = _lastSequenceNumber; + request.SequencePage = _lastSequencePage; } _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName); @@ -207,6 +206,7 @@ public async Task Subscribe() _subscription = subscription; _subscribed = true; + _firstSubscribeCall = false; } public async ValueTask GetNextEventFromGrpcStreamAsync( @@ -264,10 +264,10 @@ public async Task Subscribe() { case _TopicValue.KindOneofCase.Text: _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName); - return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.Binary: _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName); - return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.None: default: _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName); @@ -280,9 +280,9 @@ public async Task Subscribe() message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage); _lastSequenceNumber = message.Discontinuity.NewTopicSequence; _lastSequencePage = message.Discontinuity.NewSequencePage; - return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence), - checked((long)message.Discontinuity.NewTopicSequence), - checked((long)message.Discontinuity.NewSequencePage)); + return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence, + message.Discontinuity.NewTopicSequence, + message.Discontinuity.NewSequencePage); case _SubscriptionItem.KindOneofCase.Heartbeat: _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName); return new TopicSystemEvent.Heartbeat(); diff --git a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs index 13ad5148..da899dca 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs @@ -42,7 +42,7 @@ public class Text : TopicMessage /// /// A topic message containing a text value. /// - public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) + public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Text; TopicSequenceNumber = topicSequenceNumber; @@ -58,12 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequence /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } /// /// The sequence page of this message. /// - public long TopicSequencePage { get; } + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. @@ -86,7 +86,7 @@ public class Binary : TopicMessage /// /// A topic message containing a binary value. /// - public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) + public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Binary.ToByteArray(); TopicSequenceNumber = topicSequenceNumber; @@ -103,12 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequen /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } /// /// The sequence page of this message. /// - public long TopicSequencePage { get; } + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. diff --git a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs index 6fe00b8a..5e2c0fdc 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs @@ -33,7 +33,7 @@ public class Discontinuity : TopicSystemEvent /// The last known sequence number before the discontinuity. /// The sequence number of the discontinuity. /// The sequence page of the discontinuity. - public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage) + public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage) { LastKnownSequenceNumber = lastKnownSequenceNumber; SequenceNumber = sequenceNumber; @@ -43,17 +43,17 @@ public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long seq /// /// The last known sequence number before the discontinuity. /// - public long LastKnownSequenceNumber { get; } + public ulong LastKnownSequenceNumber { get; } /// /// The sequence number of the discontinuity. /// - public long SequenceNumber { get; } + public ulong SequenceNumber { get; } /// /// The sequence page of the discontinuity. /// - public long SequencePage { get; } + public ulong SequencePage { get; } /// public override string ToString() diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index 72578f18..928582cb 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds() { var textMessage = (TopicMessage.Text)consumedMessages[i]; Assert.Equal(textMessage.Value, valuesToSend[i]); - Assert.Equal(textMessage.TopicSequenceNumber, i + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1)); } } From 69f4e290c3c212fbdd90c3a434303be38559f536 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 11 Nov 2024 17:02:34 -0800 Subject: [PATCH 2/5] fix: pass resumeAtSequenceNumber/Page to subscription wrapper Previously we dropped the `resumeAtSequence...` values from the public API. This commit plumbs through the values to the subscription wrapper. --- src/Momento.Sdk/Internal/ScsTopicClient.cs | 33 ++++++---------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 60728cec..0b21f8d2 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -118,25 +118,12 @@ private async Task SendPublish(string cacheName, string to private async Task SendSubscribe(string cacheName, string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage) { - var request = new _SubscriptionRequest - { - CacheName = cacheName, - Topic = topicName - }; - if (resumeAtTopicSequenceNumber != null) - { - request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value; - } - if (resumeAtTopicSequencePage != null) - { - request.SequencePage = resumeAtTopicSequencePage.Value; - } - SubscriptionWrapper subscriptionWrapper; try { _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName); - subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger); + subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, + resumeAtTopicSequenceNumber ?? 0, resumeAtTopicSequencePage ?? 0, _exceptionMapper, _logger); await subscriptionWrapper.Subscribe(); } catch (Exception e) @@ -164,14 +151,16 @@ private class SubscriptionWrapper : IDisposable private ulong _lastSequenceNumber; private ulong _lastSequencePage; private bool _subscribed; - private bool _firstSubscribeCall = true; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, - string topicName, CacheExceptionMapper exceptionMapper, ILogger logger) + string topicName, ulong resumeAtTopicSequenceNumber, ulong resumeAtTopicSequencePage, + CacheExceptionMapper exceptionMapper, ILogger logger) { _grpcManager = grpcManager; _cacheName = cacheName; _topicName = topicName; + _lastSequenceNumber = resumeAtTopicSequenceNumber; + _lastSequencePage = resumeAtTopicSequencePage; _exceptionMapper = exceptionMapper; _logger = logger; } @@ -184,13 +173,8 @@ public async Task Subscribe() Topic = _topicName }; - // Use the caller supplied sequence number and page on the initial subscribe. - // Otherwise we will resume from the last known sequence number and page. - if (!_firstSubscribeCall) - { - request.ResumeAtTopicSequenceNumber = _lastSequenceNumber; - request.SequencePage = _lastSequencePage; - } + request.ResumeAtTopicSequenceNumber = _lastSequenceNumber; + request.SequencePage = _lastSequencePage; _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName); var subscription = _grpcManager.Client.subscribe(request, new CallOptions()); @@ -206,7 +190,6 @@ public async Task Subscribe() _subscription = subscription; _subscribed = true; - _firstSubscribeCall = false; } public async ValueTask GetNextEventFromGrpcStreamAsync( From 74a3fddade50e6c98c454b71c5f04279135eeec3 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 11 Nov 2024 17:06:43 -0800 Subject: [PATCH 3/5] fix: missing paren --- tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index 928582cb..c255b328 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds() { var textMessage = (TopicMessage.Text)consumedMessages[i]; Assert.Equal(textMessage.Value, valuesToSend[i]); - Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1)); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1))); } } From 1ba706a17ef3cfef5dc87f6313174356b13d113e Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 11 Nov 2024 17:08:33 -0800 Subject: [PATCH 4/5] fix: another test case to assertion to cast --- tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index c255b328..ee8023a5 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds() { case TopicMessage.Text textMessage: Assert.Equal(textMessage.Value, valuesToSend[messageCount]); - Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1))); messageCount++; break; case TopicSystemEvent.Heartbeat: From f34b24ab50577d8d8bf0fdc855030986dc3d16d1 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Mon, 11 Nov 2024 17:19:01 -0800 Subject: [PATCH 5/5] refactor: push resumeAt defaults to subscription wrapper constructor --- src/Momento.Sdk/Internal/ScsTopicClient.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 0b21f8d2..9860b070 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -123,7 +123,7 @@ private async Task SendSubscribe(string cacheName, strin { _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName); subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, - resumeAtTopicSequenceNumber ?? 0, resumeAtTopicSequencePage ?? 0, _exceptionMapper, _logger); + resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger); await subscriptionWrapper.Subscribe(); } catch (Exception e) @@ -153,14 +153,14 @@ private class SubscriptionWrapper : IDisposable private bool _subscribed; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, - string topicName, ulong resumeAtTopicSequenceNumber, ulong resumeAtTopicSequencePage, + string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage, CacheExceptionMapper exceptionMapper, ILogger logger) { _grpcManager = grpcManager; _cacheName = cacheName; _topicName = topicName; - _lastSequenceNumber = resumeAtTopicSequenceNumber; - _lastSequencePage = resumeAtTopicSequencePage; + _lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0; + _lastSequencePage = resumeAtTopicSequencePage ?? 0; _exceptionMapper = exceptionMapper; _logger = logger; }