diff --git a/src/Momento.Sdk/ITopicClient.cs b/src/Momento.Sdk/ITopicClient.cs index f00f0b1d..0030b61e 100644 --- a/src/Momento.Sdk/ITopicClient.cs +++ b/src/Momento.Sdk/ITopicClient.cs @@ -43,7 +43,8 @@ public interface ITopicClient : IDisposable /// /// Name of the cache containing the topic. /// Name of the topic. - /// The sequence number of the last message. + /// The sequence number of the last message. + /// The sequence page of the last message. /// If provided, the client will attempt to start the stream from that sequence number. /// /// Task object representing the result of the subscribe operation. The @@ -62,5 +63,5 @@ public interface ITopicClient : IDisposable /// } /// /// - public Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null); + public Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null); } diff --git a/src/Momento.Sdk/Internal/LoggingUtils.cs b/src/Momento.Sdk/Internal/LoggingUtils.cs index e6f84c13..62b1732b 100644 --- a/src/Momento.Sdk/Internal/LoggingUtils.cs +++ b/src/Momento.Sdk/Internal/LoggingUtils.cs @@ -457,11 +457,12 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess /// /// /// - public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber) + /// + public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber, ulong newSequencePage) { if (logger.IsEnabled(LogLevel.Trace)) { - logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber); + logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}, newSequencePage: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber, newSequencePage); } } diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 7517c108..48a1481d 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -83,9 +83,9 @@ public async Task Publish(string cacheName, string topicNa } public async Task Subscribe(string cacheName, string topicName, - ulong? resumeAtTopicSequenceNumber = null) + ulong? resumeAtTopicSequenceNumber = null, ulong? resumeAtTopicSequencePage = null) { - return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber); + return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage); } private const string RequestTypeTopicPublish = "TOPIC_PUBLISH"; @@ -116,7 +116,7 @@ private async Task SendPublish(string cacheName, string to } private async Task SendSubscribe(string cacheName, string topicName, - ulong? resumeAtTopicSequenceNumber) + ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage) { var request = new _SubscriptionRequest { @@ -127,6 +127,10 @@ private async Task SendSubscribe(string cacheName, strin { request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value; } + if (resumeAtTopicSequencePage != null) + { + request.SequencePage = resumeAtTopicSequencePage.Value; + } SubscriptionWrapper subscriptionWrapper; try @@ -158,6 +162,7 @@ private class SubscriptionWrapper : IDisposable private AsyncServerStreamingCall<_SubscriptionItem>? _subscription; private ulong? _lastSequenceNumber; + private ulong? _lastSequencePage; private bool _subscribed; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, @@ -177,11 +182,17 @@ public async Task Subscribe() CacheName = _cacheName, Topic = _topicName }; + if (_lastSequenceNumber != null) { request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value; } + if (_lastSequencePage != null) + { + request.SequencePage = _lastSequencePage.Value; + } + _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName); var subscription = _grpcManager.Client.subscribe(request, new CallOptions()); @@ -247,14 +258,16 @@ public async Task Subscribe() { case _SubscriptionItem.KindOneofCase.Item: _lastSequenceNumber = message.Item.TopicSequenceNumber; + _lastSequencePage = message.Item.SequencePage; + switch (message.Item.Value.KindCase) { case _TopicValue.KindOneofCase.Text: _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName); - return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.Binary: _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName); - return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.None: default: _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName); @@ -264,10 +277,12 @@ public async Task Subscribe() break; case _SubscriptionItem.KindOneofCase.Discontinuity: _logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName, - message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence); + message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage); _lastSequenceNumber = message.Discontinuity.NewTopicSequence; + _lastSequencePage = message.Discontinuity.NewSequencePage; return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence), - checked((long)message.Discontinuity.NewTopicSequence)); + checked((long)message.Discontinuity.NewTopicSequence), + checked((long)message.Discontinuity.NewSequencePage)); case _SubscriptionItem.KindOneofCase.Heartbeat: _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName); return new TopicSystemEvent.Heartbeat(); diff --git a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs index 0b132cb3..13ad5148 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs @@ -42,10 +42,11 @@ public class Text : TopicMessage /// /// A topic message containing a text value. /// - public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null) + public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) { Value = topicValue.Text; TopicSequenceNumber = topicSequenceNumber; + TopicSequencePage = topicSequencePage; TokenId = tokenId; } @@ -59,6 +60,11 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = /// public long TopicSequenceNumber { get; } + /// + /// The sequence page of this message. + /// + public long TopicSequencePage { get; } + /// /// The TokenId that was used to publish the message, or null if the token did not have an id. /// This can be used to securely identify the sender of a message. @@ -68,7 +74,7 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = /// public override string ToString() { - return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\""; } } @@ -80,13 +86,15 @@ public class Binary : TopicMessage /// /// A topic message containing a binary value. /// - public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null) + public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null) { Value = topicValue.Binary.ToByteArray(); TopicSequenceNumber = topicSequenceNumber; + TopicSequencePage = topicSequencePage; TokenId = tokenId; } + /// /// The binary value of this message. /// @@ -97,6 +105,11 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId /// public long TopicSequenceNumber { get; } + /// + /// The sequence page of this message. + /// + public long TopicSequencePage { get; } + /// /// The TokenId that was used to publish the message, or null if the token did not have an id. /// This can be used to securely identify the sender of a message. @@ -106,7 +119,7 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId /// public override string ToString() { - return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\""; } } diff --git a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs index e281382d..6fe00b8a 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs @@ -32,25 +32,33 @@ public class Discontinuity : TopicSystemEvent /// /// The last known sequence number before the discontinuity. /// The sequence number of the discontinuity. - public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber) + /// The sequence page of the discontinuity. + public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage) { LastKnownSequenceNumber = lastKnownSequenceNumber; SequenceNumber = sequenceNumber; + SequencePage = sequencePage; } /// /// The last known sequence number before the discontinuity. /// public long LastKnownSequenceNumber { get; } + /// /// The sequence number of the discontinuity. /// public long SequenceNumber { get; } + /// + /// The sequence page of the discontinuity. + /// + public long SequencePage { get; } + /// public override string ToString() { - return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}"; + return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber} SequencePage: {SequencePage}"; } } } diff --git a/src/Momento.Sdk/TopicClient.cs b/src/Momento.Sdk/TopicClient.cs index fb70cb4f..c5c542f1 100644 --- a/src/Momento.Sdk/TopicClient.cs +++ b/src/Momento.Sdk/TopicClient.cs @@ -59,7 +59,7 @@ public async Task PublishAsync(string cacheName, string to } /// - public async Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null) + public async Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null) { try { @@ -70,7 +70,7 @@ public async Task SubscribeAsync(string cacheName, strin { return new TopicSubscribeResponse.Error(new InvalidArgumentException(e.Message)); } - return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber); + return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber, resumeAtSequencePage); } ///