Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
2 parents e363750 + dd6a6af commit ea53ac8
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 46 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2024 Momento Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
5 changes: 3 additions & 2 deletions src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public interface ITopicClient : IDisposable
/// </summary>
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.</param>
/// <param name="resumeAtSequencePage">The sequence page of the last message.</param>
/// If provided, the client will attempt to start the stream from that sequence number.</param>

Check warning on line 48 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / publish

XML comment has badly formed XML -- 'End tag was not expected at this location.'

Check warning on line 48 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / publish

XML comment has badly formed XML -- 'End tag was not expected at this location.'

Check warning on line 48 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / publish

XML comment has badly formed XML -- 'End tag was not expected at this location.'

Check warning on line 48 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / publish

XML comment has badly formed XML -- 'End tag was not expected at this location.'

Check warning on line 48 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / publish

XML comment has badly formed XML -- 'End tag was not expected at this location.'
/// <returns>
/// Task object representing the result of the subscribe operation. The
Expand All @@ -62,5 +63,5 @@ public interface ITopicClient : IDisposable
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null);
}
5 changes: 3 additions & 2 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,12 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
/// <param name="topicName"></param>
/// <param name="lastSequenceNumber"></param>
/// <param name="newSequenceNumber"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
/// <param name="newSequencePage"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber, ulong newSequencePage)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}, newSequencePage: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber, newSequencePage);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/Momento.Sdk/Internal/ScsDataClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient;
using Momento.Protos.Common;
using Momento.Sdk.Config;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.ExtensionMethods;
Expand Down
48 changes: 23 additions & 25 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public async Task<TopicPublishResponse> Publish(string cacheName, string topicNa
}

public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber = null)
ulong? resumeAtTopicSequenceNumber = null, ulong? resumeAtTopicSequencePage = null)
{
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber);
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage);
}

private const string RequestTypeTopicPublish = "TOPIC_PUBLISH";
Expand Down Expand Up @@ -116,23 +116,14 @@ private async Task<TopicPublishResponse> SendPublish(string cacheName, string to
}

private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber)
ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage)
{
var request = new _SubscriptionRequest
{
CacheName = cacheName,
Topic = topicName
};
if (resumeAtTopicSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName,
resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger);
await subscriptionWrapper.Subscribe();
}
catch (Exception e)
Expand All @@ -157,15 +148,19 @@ private class SubscriptionWrapper : IDisposable
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private ulong _lastSequenceNumber;
private ulong _lastSequencePage;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage,
CacheExceptionMapper exceptionMapper, ILogger logger)
{
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0;
_lastSequencePage = resumeAtTopicSequencePage ?? 0;
_exceptionMapper = exceptionMapper;
_logger = logger;
}
Expand All @@ -177,10 +172,9 @@ public async Task Subscribe()
CacheName = _cacheName,
Topic = _topicName
};
if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

request.ResumeAtTopicSequenceNumber = _lastSequenceNumber;
request.SequencePage = _lastSequencePage;

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());
Expand Down Expand Up @@ -247,14 +241,16 @@ public async Task Subscribe()
{
case _SubscriptionItem.KindOneofCase.Item:
_lastSequenceNumber = message.Item.TopicSequenceNumber;
_lastSequencePage = message.Item.SequencePage;

switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -264,10 +260,12 @@ public async Task Subscribe()
break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence));
_lastSequencePage = message.Discontinuity.NewSequencePage;
return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence,
message.Discontinuity.NewTopicSequence,
message.Discontinuity.NewSequencePage);
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
return new TopicSystemEvent.Heartbeat();
Expand Down
3 changes: 2 additions & 1 deletion src/Momento.Sdk/Internal/TopicGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#endif
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient.Pubsub;
using Momento.Protos.Common;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Middleware;
using Momento.Sdk.Config.Retry;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class TopicGrpcManager : GrpcManager
{
public readonly IPubsubClient Client;

internal TopicGrpcManager(ITopicConfiguration config, string authToken, string endpoint): base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "TopicGrpcManager")
internal TopicGrpcManager(ITopicConfiguration config, string authToken, string endpoint) : base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "TopicGrpcManager")
{
var middlewares = new List<IMiddleware>
{
Expand Down
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Momento.Sdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.63.0" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="7.0.0" />
<PackageReference Include="Momento.Protos" Version="0.106.0" />
<PackageReference Include="Momento.Protos" Version="0.119.2" />
<PackageReference Include="JWT" Version="9.0.3" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
Expand Down
25 changes: 19 additions & 6 deletions src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public class Text : TopicMessage
/// <summary>
/// A topic message containing a text value.
/// </summary>
public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null)
public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Text;
TopicSequenceNumber = topicSequenceNumber;
TopicSequencePage = topicSequencePage;
TokenId = tokenId;
}

Expand All @@ -57,7 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand All @@ -68,7 +74,7 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\"";
}
}

Expand All @@ -80,13 +86,15 @@ public class Binary : TopicMessage
/// <summary>
/// A topic message containing a binary value.
/// </summary>
public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null)
public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Binary.ToByteArray();
TopicSequenceNumber = topicSequenceNumber;
TopicSequencePage = topicSequencePage;
TokenId = tokenId;
}


/// <summary>
/// The binary value of this message.
/// </summary>
Expand All @@ -95,7 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand All @@ -106,7 +119,7 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\"";
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,33 @@ public class Discontinuity : TopicSystemEvent
/// </summary>
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber)
/// <param name="sequencePage">The sequence page of the discontinuity.</param>
public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
SequencePage = sequencePage;
}

/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
public ulong LastKnownSequenceNumber { get; }

/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }
public ulong SequenceNumber { get; }

/// <summary>
/// The sequence page of the discontinuity.
/// </summary>
public ulong SequencePage { get; }

/// <inheritdoc/>
public override string ToString()
{
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}";
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber} SequencePage: {SequencePage}";
}
}
}
4 changes: 2 additions & 2 deletions src/Momento.Sdk/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task<TopicPublishResponse> PublishAsync(string cacheName, string to
}

/// <inheritdoc />
public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null)
public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null)
{
try
{
Expand All @@ -70,7 +70,7 @@ public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, strin
{
return new TopicSubscribeResponse.Error(new InvalidArgumentException(e.Message));
}
return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber);
return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber, resumeAtSequencePage);
}

/// <inheritdoc />
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds()
{
var textMessage = (TopicMessage.Text)consumedMessages[i];
Assert.Equal(textMessage.Value, valuesToSend[i]);
Assert.Equal(textMessage.TopicSequenceNumber, i + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1)));
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds()
{
case TopicMessage.Text textMessage:
Assert.Equal(textMessage.Value, valuesToSend[messageCount]);
Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1)));
messageCount++;
break;
case TopicSystemEvent.Heartbeat:
Expand Down

0 comments on commit ea53ac8

Please sign in to comment.