Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support topic sequence page #588

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Momento.Sdk/ITopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
/// <param name="cacheName">Name of the cache containing the topic.</param>
/// <param name="topicName">Name of the topic.</param>
/// <param name="resumeAtSequenceNumber">The sequence number of the last message.
/// <param name="resumeAtSequencePage">The sequence page of the last message.
malandis marked this conversation as resolved.
Show resolved Hide resolved
/// If provided, the client will attempt to start the stream from that sequence number.</param>
/// <returns>
/// Task object representing the result of the subscribe operation. The
Expand All @@ -62,5 +63,5 @@
/// }
/// </code>
/// </returns>
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null);
public Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null);

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, false)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, true)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (ubuntu-latest, true)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, false)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, true)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'

Check warning on line 66 in src/Momento.Sdk/ITopicClient.cs

View workflow job for this annotation

GitHub Actions / build_csharp (windows-latest, true)

XML comment has badly formed XML -- 'Expected an end tag for element 'param'.'
}
5 changes: 3 additions & 2 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,12 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
/// <param name="topicName"></param>
/// <param name="lastSequenceNumber"></param>
/// <param name="newSequenceNumber"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
/// <param name="newSequencePage"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber, ulong newSequencePage)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}, newSequencePage: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber, newSequencePage);
}
}

Expand Down
29 changes: 22 additions & 7 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public async Task<TopicPublishResponse> Publish(string cacheName, string topicNa
}

public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber = null)
ulong? resumeAtTopicSequenceNumber = null, ulong? resumeAtTopicSequencePage = null)
{
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber);
return await SendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage);
}

private const string RequestTypeTopicPublish = "TOPIC_PUBLISH";
Expand Down Expand Up @@ -116,7 +116,7 @@ private async Task<TopicPublishResponse> SendPublish(string cacheName, string to
}

private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, string topicName,
ulong? resumeAtTopicSequenceNumber)
ulong? resumeAtTopicSequenceNumber, ulong? resumeAtTopicSequencePage)
{
var request = new _SubscriptionRequest
{
Expand All @@ -127,6 +127,10 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
{
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}
if (resumeAtTopicSequencePage != null)
{
request.SequencePage = resumeAtTopicSequencePage.Value;
}

SubscriptionWrapper subscriptionWrapper;
try
Expand Down Expand Up @@ -158,6 +162,7 @@ private class SubscriptionWrapper : IDisposable

private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private ulong? _lastSequencePage;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
Expand All @@ -177,11 +182,17 @@ public async Task Subscribe()
CacheName = _cacheName,
Topic = _topicName
};

if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

if (_lastSequencePage != null)
{
request.SequencePage = _lastSequencePage.Value;
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());

Expand Down Expand Up @@ -247,14 +258,16 @@ public async Task Subscribe()
{
case _SubscriptionItem.KindOneofCase.Item:
_lastSequenceNumber = message.Item.TopicSequenceNumber;
_lastSequencePage = message.Item.SequencePage;

switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Text(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
return new TopicMessage.Binary(message.Item.Value, checked((long)_lastSequenceNumber), checked((long)_lastSequencePage), message.Item.PublisherId == "" ? null : message.Item.PublisherId);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
Expand All @@ -264,10 +277,12 @@ public async Task Subscribe()
break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence, message.Discontinuity.NewSequencePage);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
_lastSequencePage = message.Discontinuity.NewSequencePage;
return new TopicSystemEvent.Discontinuity(checked((long)message.Discontinuity.LastTopicSequence),
checked((long)message.Discontinuity.NewTopicSequence));
checked((long)message.Discontinuity.NewTopicSequence),
checked((long)message.Discontinuity.NewSequencePage));
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
return new TopicSystemEvent.Heartbeat();
Expand Down
21 changes: 17 additions & 4 deletions src/Momento.Sdk/Responses/Topic/TopicMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public class Text : TopicMessage
/// <summary>
/// A topic message containing a text value.
/// </summary>
public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null)
public Text(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
{
Value = topicValue.Text;
TopicSequenceNumber = topicSequenceNumber;
TopicSequencePage = topicSequencePage;
TokenId = tokenId;
}

Expand All @@ -59,6 +60,11 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// </summary>
public long TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
/// This can be used to securely identify the sender of a message.
Expand All @@ -68,7 +74,7 @@ public Text(_TopicValue topicValue, long topicSequenceNumber, string? tokenId =
/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
return $"{base.ToString()}: Value: \"{this.Value.Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\"";
}
}

Expand All @@ -80,13 +86,15 @@ public class Binary : TopicMessage
/// <summary>
/// A topic message containing a binary value.
/// </summary>
public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId = null)
public Binary(_TopicValue topicValue, long topicSequenceNumber, long topicSequencePage, string? tokenId = null)
{
Value = topicValue.Binary.ToByteArray();
TopicSequenceNumber = topicSequenceNumber;
TopicSequencePage = topicSequencePage;
TokenId = tokenId;
}


/// <summary>
/// The binary value of this message.
/// </summary>
Expand All @@ -97,6 +105,11 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// </summary>
public long TopicSequenceNumber { get; }

/// <summary>
/// The sequence page of this message.
/// </summary>
public long TopicSequencePage { get; }

/// <summary>
/// The TokenId that was used to publish the message, or null if the token did not have an id.
/// This can be used to securely identify the sender of a message.
Expand All @@ -106,7 +119,7 @@ public Binary(_TopicValue topicValue, long topicSequenceNumber, string? tokenId
/// <inheritdoc />
public override string ToString()
{
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} TokenId: \"{this.TokenId}\"";
return $"{base.ToString()}: Value: \"{Value.ToPrettyHexString().Truncate()}\" SequenceNumber: {this.TopicSequenceNumber} SequencePage: {this.TopicSequencePage} TokenId: \"{this.TokenId}\"";
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/Momento.Sdk/Responses/Topic/TopicSystemEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,33 @@ public class Discontinuity : TopicSystemEvent
/// </summary>
/// <param name="lastKnownSequenceNumber">The last known sequence number before the discontinuity.</param>
/// <param name="sequenceNumber">The sequence number of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber)
/// <param name="sequencePage">The sequence page of the discontinuity.</param>
public Discontinuity(long lastKnownSequenceNumber, long sequenceNumber, long sequencePage)
{
LastKnownSequenceNumber = lastKnownSequenceNumber;
SequenceNumber = sequenceNumber;
SequencePage = sequencePage;
}

/// <summary>
/// The last known sequence number before the discontinuity.
/// </summary>
public long LastKnownSequenceNumber { get; }

/// <summary>
/// The sequence number of the discontinuity.
/// </summary>
public long SequenceNumber { get; }

/// <summary>
/// The sequence page of the discontinuity.
/// </summary>
public long SequencePage { get; }

/// <inheritdoc/>
public override string ToString()
{
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber}";
return $"{base.ToString()}: LastKnownSequenceNumber: {LastKnownSequenceNumber} SequenceNumber: {SequenceNumber} SequencePage: {SequencePage}";
}
}
}
4 changes: 2 additions & 2 deletions src/Momento.Sdk/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task<TopicPublishResponse> PublishAsync(string cacheName, string to
}

/// <inheritdoc />
public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null)
public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, string topicName, ulong? resumeAtSequenceNumber = null, ulong? resumeAtSequencePage = null)
{
try
{
Expand All @@ -70,7 +70,7 @@ public async Task<TopicSubscribeResponse> SubscribeAsync(string cacheName, strin
{
return new TopicSubscribeResponse.Error(new InvalidArgumentException(e.Message));
}
return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber);
return await scsTopicClient.Subscribe(cacheName, topicName, resumeAtSequenceNumber, resumeAtSequencePage);
}

/// <inheritdoc />
Expand Down
Loading