diff --git a/LICENSE b/LICENSE index 261eeb9e..38ffdcdc 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2024 Momento Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/src/Momento.Sdk/ITopicClient.cs b/src/Momento.Sdk/ITopicClient.cs index f00f0b1d..0030b61e 100644 --- a/src/Momento.Sdk/ITopicClient.cs +++ b/src/Momento.Sdk/ITopicClient.cs @@ -43,7 +43,8 @@ public interface ITopicClient : IDisposable /// /// Name of the cache containing the topic. /// Name of the topic. - /// The sequence number of the last message. + /// The sequence number of the last message. + /// The sequence page of the last message. /// If provided, the client will attempt to start the stream from that sequence number. /// /// Task object representing the result of the subscribe operation. The @@ -62,5 +63,5 @@ public interface ITopicClient : IDisposable /// } /// /// - public Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null); + public Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null); } diff --git a/src/Momento.Sdk/Internal/LoggingUtils.cs b/src/Momento.Sdk/Internal/LoggingUtils.cs index e6f84c13..62b1732b 100644 --- a/src/Momento.Sdk/Internal/LoggingUtils.cs +++ b/src/Momento.Sdk/Internal/LoggingUtils.cs @@ -457,11 +457,12 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess /// /// /// - public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber) + /// + public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber, ulong newSequencePage) { if (logger.IsEnabled(LogLevel.Trace)) { - logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber); + logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}, newSequencePage: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber, newSequencePage); } } diff --git a/src/Momento.Sdk/Internal/ScsDataClient.cs b/src/Momento.Sdk/Internal/ScsDataClient.cs index 59e64383..091e2c2f 100644 --- a/src/Momento.Sdk/Internal/ScsDataClient.cs +++ b/src/Momento.Sdk/Internal/ScsDataClient.cs @@ -9,6 +9,7 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using Momento.Protos.CacheClient; +using Momento.Protos.Common; using Momento.Sdk.Config; using Momento.Sdk.Exceptions; using Momento.Sdk.Internal.ExtensionMethods; diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs index 7517c108..9860b070 100644 --- a/src/Momento.Sdk/Internal/ScsTopicClient.cs +++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs @@ -83,9 +83,9 @@ public async Task Publish(string cacheName, string topicNa } public async Task Subscribe(string cacheName, string topicName, - ulong? resumeAtTopicSequenceNumber = null) + ulong? resumeAtTopicSequenceNumber = null, ulong? resumeAtTopicSequencePage = null) { - return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber); + return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage); } private const string RequestTypeTopicPublish = "TOPIC_PUBLISH"; @@ -116,23 +116,14 @@ private async Task SendPublish(string cacheName, string to } private async Task SendSubscribe(string cacheName, string topicName, - ulong? resumeAtTopicSequenceNumber) + ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage) { - var request = new _SubscriptionRequest - { - CacheName = cacheName, - Topic = topicName - }; - if (resumeAtTopicSequenceNumber != null) - { - request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value; - } - SubscriptionWrapper subscriptionWrapper; try { _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName); - subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger); + subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, + resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger); await subscriptionWrapper.Subscribe(); } catch (Exception e) @@ -157,15 +148,19 @@ private class SubscriptionWrapper : IDisposable private readonly ILogger _logger; private AsyncServerStreamingCall<_SubscriptionItem>? _subscription; - private ulong? _lastSequenceNumber; + private ulong _lastSequenceNumber; + private ulong _lastSequencePage; private bool _subscribed; public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName, - string topicName, CacheExceptionMapper exceptionMapper, ILogger logger) + string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage, + CacheExceptionMapper exceptionMapper, ILogger logger) { _grpcManager = grpcManager; _cacheName = cacheName; _topicName = topicName; + _lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0; + _lastSequencePage = resumeAtTopicSequencePage ?? 0; _exceptionMapper = exceptionMapper; _logger = logger; } @@ -177,10 +172,9 @@ public async Task Subscribe() CacheName = _cacheName, Topic = _topicName }; - if (_lastSequenceNumber != null) - { - request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value; - } + + request.ResumeAtTopicSequenceNumber = _lastSequenceNumber; + request.SequencePage = _lastSequencePage; _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName); var subscription = _grpcManager.Client.subscribe(request, new CallOptions()); @@ -247,14 +241,16 @@ public async Task Subscribe() { case _SubscriptionItem.KindOneofCase.Item: _lastSequenceNumber = message.Item.TopicSequenceNumber; + _lastSequencePage = message.Item.SequencePage; + switch (message.Item.Value.KindCase) { case _TopicValue.KindOneofCase.Text: _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName); - return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.Binary: _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName); - return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId); + return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId); case _TopicValue.KindOneofCase.None: default: _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName); @@ -264,10 +260,12 @@ public async Task Subscribe() break; case _SubscriptionItem.KindOneofCase.Discontinuity: _logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName, - message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence); + message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage); _lastSequenceNumber = message.Discontinuity.NewTopicSequence; - return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence), - checked((long)message.Discontinuity.NewTopicSequence)); + _lastSequencePage = message.Discontinuity.NewSequencePage; + return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence, + message.Discontinuity.NewTopicSequence, + message.Discontinuity.NewSequencePage); case _SubscriptionItem.KindOneofCase.Heartbeat: _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName); return new TopicSystemEvent.Heartbeat(); diff --git a/src/Momento.Sdk/Internal/TopicGrpcManager.cs b/src/Momento.Sdk/Internal/TopicGrpcManager.cs index c3e4f8ad..1dc4d368 100644 --- a/src/Momento.Sdk/Internal/TopicGrpcManager.cs +++ b/src/Momento.Sdk/Internal/TopicGrpcManager.cs @@ -12,6 +12,7 @@ #endif using Microsoft.Extensions.Logging; using Momento.Protos.CacheClient.Pubsub; +using Momento.Protos.Common; using Momento.Sdk.Config; using Momento.Sdk.Config.Middleware; using Momento.Sdk.Config.Retry; @@ -69,7 +70,7 @@ public class TopicGrpcManager : GrpcManager { public readonly IPubsubClient Client; - internal TopicGrpcManager(ITopicConfiguration config, string authToken, string endpoint): base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "TopicGrpcManager") + internal TopicGrpcManager(ITopicConfiguration config, string authToken, string endpoint) : base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "TopicGrpcManager") { var middlewares = new List { diff --git a/src/Momento.Sdk/Momento.Sdk.csproj b/src/Momento.Sdk/Momento.Sdk.csproj index 14151f7e..060bd2a8 100644 --- a/src/Momento.Sdk/Momento.Sdk.csproj +++ b/src/Momento.Sdk/Momento.Sdk.csproj @@ -69,7 +69,7 @@ - + diff --git a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs index 0b132cb3..da899dca 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicMessage.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicMessage.cs @@ -42,10 +42,11 @@ public class Text : TopicMessage /// /// A topic message containing a text value. /// - public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null) + public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Text; TopicSequenceNumber = topicSequenceNumber; + TopicSequencePage = topicSequencePage; TokenId = tokenId; } @@ -57,7 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } + + /// + /// The sequence page of this message. + /// + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. @@ -68,7 +74,7 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = /// public override string ToString() { - return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\""; } } @@ -80,13 +86,15 @@ public class Binary : TopicMessage /// /// A topic message containing a binary value. /// - public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null) + public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null) { Value = topicValue.Binary.ToByteArray(); TopicSequenceNumber = topicSequenceNumber; + TopicSequencePage = topicSequencePage; TokenId = tokenId; } + /// /// The binary value of this message. /// @@ -95,7 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId /// /// The sequence number of this message. /// - public long TopicSequenceNumber { get; } + public ulong TopicSequenceNumber { get; } + + /// + /// The sequence page of this message. + /// + public ulong TopicSequencePage { get; } /// /// The TokenId that was used to publish the message, or null if the token did not have an id. @@ -106,7 +119,7 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId /// public override string ToString() { - return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\""; + return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\""; } } diff --git a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs index e281382d..5e2c0fdc 100644 --- a/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs +++ b/src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs @@ -32,25 +32,33 @@ public class Discontinuity : TopicSystemEvent /// /// The last known sequence number before the discontinuity. /// The sequence number of the discontinuity. - public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber) + /// The sequence page of the discontinuity. + public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage) { LastKnownSequenceNumber = lastKnownSequenceNumber; SequenceNumber = sequenceNumber; + SequencePage = sequencePage; } /// /// The last known sequence number before the discontinuity. /// - public long LastKnownSequenceNumber { get; } + public ulong LastKnownSequenceNumber { get; } + /// /// The sequence number of the discontinuity. /// - public long SequenceNumber { get; } + public ulong SequenceNumber { get; } + + /// + /// The sequence page of the discontinuity. + /// + public ulong SequencePage { get; } /// public override string ToString() { - return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}"; + return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber} SequencePage: {SequencePage}"; } } } diff --git a/src/Momento.Sdk/TopicClient.cs b/src/Momento.Sdk/TopicClient.cs index fb70cb4f..c5c542f1 100644 --- a/src/Momento.Sdk/TopicClient.cs +++ b/src/Momento.Sdk/TopicClient.cs @@ -59,7 +59,7 @@ public async Task PublishAsync(string cacheName, string to } /// - public async Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null) + public async Task SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null) { try { @@ -70,7 +70,7 @@ public async Task SubscribeAsync(string cacheName, strin { return new TopicSubscribeResponse.Error(new InvalidArgumentException(e.Message)); } - return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber); + return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber, resumeAtSequencePage); } /// diff --git a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs index 72578f18..ee8023a5 100644 --- a/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs +++ b/tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds() { var textMessage = (TopicMessage.Text)consumedMessages[i]; Assert.Equal(textMessage.Value, valuesToSend[i]); - Assert.Equal(textMessage.TopicSequenceNumber, i + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1))); } } @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds() { case TopicMessage.Text textMessage: Assert.Equal(textMessage.Value, valuesToSend[messageCount]); - Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1); + Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1))); messageCount++; break; case TopicSystemEvent.Heartbeat: