Skip to content

Commit

Permalink
feat: use ulong and topics and plumb through resumeAt subscription …
Browse files Browse the repository at this point in the history
…arguments (#589)

Changes TopicItem and TopicSystemEvent to store the sequence number/page as ulong. This removes a checked cast under the hood.

Also plumbs through resumeAtTopicSequenceNumber (page) to the subscription wrapper.

I tested these changes on both topics examples successfully without any changes on the example side.
  • Loading branch information
malandis authored Nov 12, 2024
1 parent 5d7b6d3 commit dd6a6af
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 44 deletions.
47 changes: 15 additions & 32 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,12 @@ private async Task<TopicPublishResponse> SendPublish(string cacheName, string to
private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage)
{
var request = new _SubscriptionRequest
{
CacheName = cacheName,
Topic = topicName
};
if (resumeAtTopicSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}
if (resumeAtTopicSequencePage != null)
{
request.SequencePage = resumeAtTopicSequencePage.Value;
}

SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName,
resumeAtTopicSequenceNumber, resumeAtTopicSequencePage, _exceptionMapper, _logger);
await subscriptionWrapper.Subscribe();
}
catch (Exception e)
Expand All @@ -161,16 +148,19 @@ private class SubscriptionWrapper : IDisposable
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private ulong? _lastSequencePage;
private ulong _lastSequenceNumber;
private ulong _lastSequencePage;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
string topicName, ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage,
CacheExceptionMapper exceptionMapper, ILogger logger)
{
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_lastSequenceNumber = resumeAtTopicSequenceNumber ?? 0;
_lastSequencePage = resumeAtTopicSequencePage ?? 0;
_exceptionMapper = exceptionMapper;
_logger = logger;
}
Expand All @@ -183,15 +173,8 @@ public async Task Subscribe()
Topic = _topicName
};

if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

if (_lastSequencePage != null)
{
request.SequencePage = _lastSequencePage.Value;
}
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber;
request.SequencePage = _lastSequencePage;

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());
Expand Down Expand Up @@ -264,10 +247,10 @@ public async Task Subscribe()
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, _lastSequenceNumber, _lastSequencePage, message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -280,9 +263,9 @@ public async Task Subscribe()
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
_lastSequencePage = message.Discontinuity.NewSequencePage;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence),
checked((long)message.Discontinuity.NewSequencePage));
return new TopicSystemEvent.Discontinuity(message.Discontinuity.LastTopicSequence,
message.Discontinuity.NewTopicSequence,
message.Discontinuity.NewSequencePage);
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
return new TopicSystemEvent.Heartbeat();
Expand Down
12 changes: 6 additions & 6 deletions src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Text : TopicMessage
/// <summary>
/// A topic message containing a text value.
/// </summary>
public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
public Text(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Text;
TopicSequenceNumber = topicSequenceNumber;
Expand All @@ -58,12 +58,12 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequence
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand All @@ -86,7 +86,7 @@ public class Binary : TopicMessage
/// <summary>
/// A topic message containing a binary value.
/// </summary>
public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
public Binary(_TopicValue topicValue, ulong topicSequenceNumber, ulong topicSequencePage, string? tokenId = null)
{
Value = topicValue.Binary.ToByteArray();
TopicSequenceNumber = topicSequenceNumber;
Expand All @@ -103,12 +103,12 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequen
/// <summary>
/// The sequence number of this message.
/// </summary>
public long TopicSequenceNumber { get; }
public ulong TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }
public ulong TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
Expand Down
8 changes: 4 additions & 4 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Discontinuity : TopicSystemEvent
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
/// <param name="sequencePage">The sequence page of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage)
public Discontinuity(ulong lastKnownSequenceNumber, ulong sequenceNumber, ulong sequencePage)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
Expand All @@ -43,17 +43,17 @@ public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long seq
/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }
public ulong LastKnownSequenceNumber { get; }

/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }
public ulong SequenceNumber { get; }

/// <summary>
/// The sequence page of the discontinuity.
/// </summary>
public long SequencePage { get; }
public ulong SequencePage { get; }

/// <inheritdoc/>
public override string ToString()
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration/Momento.Sdk.Tests/Topics/TopicTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public async Task PublishAndSubscribe_String_Succeeds()
{
var textMessage = (TopicMessage.Text)consumedMessages[i];
Assert.Equal(textMessage.Value, valuesToSend[i]);
Assert.Equal(textMessage.TopicSequenceNumber, i + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(i + 1)));
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public async Task PublishAndSubscribe_AllEventsString_Succeeds()
{
case TopicMessage.Text textMessage:
Assert.Equal(textMessage.Value, valuesToSend[messageCount]);
Assert.Equal(textMessage.TopicSequenceNumber, messageCount + 1);
Assert.Equal(textMessage.TopicSequenceNumber, checked((ulong)(messageCount + 1)));
messageCount++;
break;
case TopicSystemEvent.Heartbeat:
Expand Down

0 comments on commit dd6a6af

Please sign in to comment.