Skip to content

Commit

Permalink
feat: Add the read concern header
Browse files Browse the repository at this point in the history
Add the read concern header to the top level Configuration.

Set the consistent read concern header in the cache client test fixture
if the CONSISTENT_READS env var is set.

Add a new make target for the github build to use that sets the header.

Update the test-cache-service make target to set the header so that the
canaries will use consistent reads.
  • Loading branch information
nand4011 committed Sep 25, 2024
1 parent 6f13407 commit 89a337b
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: make GRPC_WEB=${{ matrix.grpc-web }} build

- name: Test
run: make test
run: make prod-test

build_examples:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/on-push-to-main-branch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: make GRPC_WEB=${{ matrix.grpc-web }} build

- name: Test
run: make test
run: make prod-test

generate_readme:
runs-on: ubuntu-latest
Expand Down
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ ifneq (,$(findstring NT,$(OS)))
BUILD_TARGETS := build-dotnet6 build-dotnet-framework
TEST_TARGETS := test-dotnet6 test-dotnet-framework
TEST_TARGETS_AUTH_SERVICE := test-dotnet6-auth-service test-dotnet-framework-auth-service
TEST_TARGETS_CACHE_SERVICE := test-dotnet6-cache-service test-dotnet-framework-cache-service
TEST_TARGETS_TOPICS_SERVICE := test-dotnet6-topics-service test-dotnet-framework-topics-service
else
BUILD_TARGETS := build-dotnet6
TEST_TARGETS := test-dotnet6
TEST_TARGETS_AUTH_SERVICE := test-dotnet6-auth-service
TEST_TARGETS_CACHE_SERVICE := test-dotnet6-cache-service
TEST_TARGETS_TOPICS_SERVICE := test-dotnet6-topics-service
endif

Expand Down Expand Up @@ -98,6 +96,16 @@ restore:
test: ${TEST_TARGETS}


## Run unit and integration tests with consistent reads (conditioned by OS)
prod-test:
@echo "running tests with consistent reads..."
ifeq (,$(findstring NT,$(OS)))
@CONSISTENT_READS=1 $(MAKE) ${TEST_TARGETS}
else
@set CONSISTENT_READS=1 && $(MAKE) ${TEST_TARGETS}
endif


## Run unit and integration tests on the .NET 6.0 runtime
test-dotnet6:
@echo "Running unit and integration tests on the .NET 6.0 runtime..."
Expand Down Expand Up @@ -151,7 +159,12 @@ test-auth-service: ${TEST_TARGETS_AUTH_SERVICE}


## Run cache service tests
test-cache-service: ${TEST_TARGETS_CACHE_SERVICE}
test-cache-service:
ifeq (,$(findstring NT,$(OS)))
@CONSISTENT_READS=1 $(MAKE) test-dotnet6-cache-service
else
@set CONSISTENT_READS=1 && $(MAKE) test-dotnet6-cache-service test-dotnet-framework-cache-service
endif


## Run leaderboard service tests
Expand Down
76 changes: 58 additions & 18 deletions src/Momento.Sdk/Config/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Config.Middleware;
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Config.Transport;
Expand All @@ -14,18 +13,41 @@ public class Configuration : IConfiguration
{
/// <inheritdoc />
public ILoggerFactory LoggerFactory { get; }

/// <inheritdoc />
public IRetryStrategy RetryStrategy { get; }

/// <inheritdoc />
public IList<IMiddleware> Middlewares { get; }

/// <inheritdoc />
public ITransportStrategy TransportStrategy { get; }

/// <inheritdoc />
public ReadConcern ReadConcern { get; }

/// <inheritdoc cref="Momento.Sdk.Config.IConfiguration" />
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, ITransportStrategy transportStrategy)
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy,
ITransportStrategy transportStrategy)
: this(loggerFactory, retryStrategy, new List<IMiddleware>(), transportStrategy)
{
}

/// <summary>
/// Create a new instance of Configuration object with provided arguments: <see cref="Momento.Sdk.Config.IConfiguration.RetryStrategy" />, <see cref="Momento.Sdk.Config.IConfiguration.Middlewares" />, <see cref="Momento.Sdk.Config.IConfiguration.TransportStrategy"/>, and <see cref="Momento.Sdk.Config.IConfiguration.LoggerFactory"/>
/// </summary>
/// <param name="retryStrategy">Defines a contract for how and when to retry a request</param>
/// <param name="middlewares">The Middleware interface allows the Configuration to provide a higher-order function that wraps all requests.</param>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <param name="loggerFactory">This is responsible for configuring logging.</param>
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, IList<IMiddleware> middlewares,
ITransportStrategy transportStrategy)
{
LoggerFactory = loggerFactory;
RetryStrategy = retryStrategy;
Middlewares = middlewares;
TransportStrategy = transportStrategy;
ReadConcern = ReadConcern.Balanced;
}

/// <summary>
Expand All @@ -34,31 +56,34 @@ public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy,
/// <param name="retryStrategy">Defines a contract for how and when to retry a request</param>
/// <param name="middlewares">The Middleware interface allows the Configuration to provide a higher-order function that wraps all requests.</param>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <param name="loggerFactory">This is responsible for configuraing logging.</param>
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, IList<IMiddleware> middlewares, ITransportStrategy transportStrategy)
/// <param name="loggerFactory">This is responsible for configuring logging.</param>
/// <param name="readConcern">The client-wide setting for read-after-write consistency.</param>
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, IList<IMiddleware> middlewares,
ITransportStrategy transportStrategy, ReadConcern readConcern)
{
this.LoggerFactory = loggerFactory;
this.RetryStrategy = retryStrategy;
this.Middlewares = middlewares;
this.TransportStrategy = transportStrategy;
LoggerFactory = loggerFactory;
RetryStrategy = retryStrategy;
Middlewares = middlewares;
TransportStrategy = transportStrategy;
ReadConcern = readConcern;
}

/// <inheritdoc />
public IConfiguration WithRetryStrategy(IRetryStrategy retryStrategy)
{
return new Configuration(LoggerFactory, retryStrategy, Middlewares, TransportStrategy);
return new Configuration(LoggerFactory, retryStrategy, Middlewares, TransportStrategy, ReadConcern);
}

/// <inheritdoc />
public IConfiguration WithMiddlewares(IList<IMiddleware> middlewares)
{
return new Configuration(LoggerFactory, RetryStrategy, middlewares, TransportStrategy);
return new Configuration(LoggerFactory, RetryStrategy, middlewares, TransportStrategy, ReadConcern);
}

/// <inheritdoc />
public IConfiguration WithTransportStrategy(ITransportStrategy transportStrategy)
{
return new Configuration(LoggerFactory, RetryStrategy, Middlewares, transportStrategy);
return new Configuration(LoggerFactory, RetryStrategy, Middlewares, transportStrategy, ReadConcern);
}

/// <summary>
Expand All @@ -72,12 +97,13 @@ public Configuration WithAdditionalMiddlewares(IList<IMiddleware> additionalMidd
retryStrategy: RetryStrategy,
middlewares: Middlewares.Concat(additionalMiddlewares).ToList(),
transportStrategy: TransportStrategy,
loggerFactory: LoggerFactory
loggerFactory: LoggerFactory,
readConcern: ReadConcern
);
}

/// <summary>
/// Add the specified client timeout to an existing instance of Configuration object as an addiion to the existing transport strategy.
/// Add the specified client timeout to an existing instance of Configuration object as an addition to the existing transport strategy.
/// </summary>
/// <param name="clientTimeout">The amount of time to wait before cancelling the request.</param>
/// <returns>Configuration object with client timeout provided</returns>
Expand All @@ -87,7 +113,20 @@ public Configuration WithClientTimeout(TimeSpan clientTimeout)
retryStrategy: RetryStrategy,
middlewares: Middlewares,
transportStrategy: TransportStrategy.WithClientTimeout(clientTimeout),
loggerFactory: LoggerFactory
loggerFactory: LoggerFactory,
readConcern: ReadConcern
);
}

/// <inheritdoc />
public IConfiguration WithReadConcern(ReadConcern readConcern)
{
return new Configuration(
retryStrategy: RetryStrategy,
middlewares: Middlewares,
transportStrategy: TransportStrategy,
loggerFactory: LoggerFactory,
readConcern: readConcern
);
}

Expand All @@ -106,14 +145,15 @@ public override bool Equals(object obj)

var other = (Configuration)obj;
return RetryStrategy.Equals(other.RetryStrategy) &&
Middlewares.SequenceEqual(other.Middlewares) &&
TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory);
Middlewares.SequenceEqual(other.Middlewares) &&
TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory) &&
ReadConcern.Equals(other.ReadConcern);
}

/// <inheritdoc />
public override int GetHashCode()
{
return base.GetHashCode();
}
}
}
9 changes: 9 additions & 0 deletions src/Momento.Sdk/Config/IConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface IConfiguration
public IList<IMiddleware> Middlewares { get; }
/// <inheritdoc cref="Momento.Sdk.Config.Transport.ITransportStrategy" />
public ITransportStrategy TransportStrategy { get; }
/// <inheritdoc cref="Momento.Sdk.Config.ReadConcern" />
public ReadConcern ReadConcern { get; }

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified retry strategy.
Expand Down Expand Up @@ -49,4 +51,11 @@ public interface IConfiguration
/// <param name="clientTimeout">The amount of time to wait before cancelling the request.</param>
/// <returns>Configuration object with custom client timeout provided</returns>
public IConfiguration WithClientTimeout(TimeSpan clientTimeout);

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified read concern.
/// </summary>
/// <param name="readConcern">The read concern setting.</param>
/// <returns>Configuration object with custom read concern provided</returns>
public IConfiguration WithReadConcern(ReadConcern readConcern);
}
41 changes: 41 additions & 0 deletions src/Momento.Sdk/Config/ReadConcern.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;

namespace Momento.Sdk.Config;

/// <summary>
/// The read consistency setting for the cache client. Consistent guarantees read after write consistency, but applies a
/// 6x multiplier to your operation usage.
/// </summary>
public enum ReadConcern
{
/// <summary>
/// Balanced is the default read concern for the cache client.
/// </summary>
Balanced,
/// <summary>
/// Consistent read concern guarantees read after write consistency.
/// </summary>
Consistent
}

/// <summary>
/// Extension methods for the ReadConcern enum.
/// </summary>
public static class ReadConcernExtensions
{
/// <summary>
/// Converts the read concern to a string value.
/// </summary>
/// <param name="readConcern">to convert to a string</param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException">if given an unknown read concern</exception>
public static string ToStringValue(this ReadConcern readConcern)
{
return readConcern switch
{
ReadConcern.Balanced => "balanced",
ReadConcern.Consistent => "consistent",
_ => throw new ArgumentOutOfRangeException()
};
}
}
13 changes: 5 additions & 8 deletions src/Momento.Sdk/Internal/DataGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;
using System.Net.Http;
#if USE_GRPC_WEB
using Grpc.Net.Client.Web;
#endif
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient;
using Momento.Protos.CachePing;
Expand All @@ -17,7 +12,6 @@
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.Middleware;
using static System.Reflection.Assembly;

namespace Momento.Sdk.Internal;

Expand Down Expand Up @@ -256,10 +250,13 @@ public class DataGrpcManager : GrpcManager

internal DataGrpcManager(IConfiguration config, string authToken, string endpoint): base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "DataGrpcManager")
{
var readConcernHeader = new Header(Header.ReadConcern, config.ReadConcern.ToStringValue());
headers.Add(readConcernHeader);

var middlewares = config.Middlewares.Concat(
new List<IMiddleware> {
new RetryMiddleware(config.LoggerFactory, config.RetryStrategy),
new HeaderMiddleware(config.LoggerFactory, this.headers),
new HeaderMiddleware(config.LoggerFactory, headers),
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests)
}
).ToList();
Expand All @@ -280,7 +277,7 @@ await pingClient.PingAsync(new _PingRequest(),
catch (RpcException ex)
{
MomentoErrorTransportDetails transportDetails = new MomentoErrorTransportDetails(
new MomentoGrpcErrorDetails(ex.StatusCode, ex.Message, null)
new MomentoGrpcErrorDetails(ex.StatusCode, ex.Message)
);
throw new ConnectionException("Eager connection to server failed", transportDetails, ex);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Middleware;

namespace Momento.Sdk.Internal.Middleware
Expand All @@ -14,6 +15,7 @@ internal class Header
public const string AuthorizationKey = "authorization";
public const string AgentKey = "agent";
public const string RuntimeVersionKey = "runtime-version";
public const string ReadConcern = "read-concern";
public readonly List<string> onceOnlyHeaders = new List<string> { Header.AgentKey, Header.RuntimeVersionKey };
public string Name;
public string Value;
Expand Down
34 changes: 22 additions & 12 deletions tests/Integration/Momento.Sdk.Tests/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ public class CacheClientFixture : IDisposable
public CacheClientFixture()
{
AuthProvider = new EnvMomentoTokenProvider("MOMENTO_API_KEY");

// Enable consistent reads if the CONSISTENT_READS env var is set to anything
var consistentReads = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("CONSISTENT_READS"));

var config = Configurations.Laptop.Latest(LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
builder.AddFilter("Grpc.Net.Client", LogLevel.Error);
builder.SetMinimumLevel(LogLevel.Information);
}));

if (consistentReads)
{
config = config.WithReadConcern(ReadConcern.Consistent);
}

CacheName = $"dotnet-integration-{Utils.NewGuidString()}";
Client = new TestCacheClient(Configurations.Laptop.Latest(LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
builder.AddFilter("Grpc.Net.Client", LogLevel.Error);
builder.SetMinimumLevel(LogLevel.Information);
})),
AuthProvider, defaultTtl: DefaultTtl);
Client = new TestCacheClient(config, AuthProvider, defaultTtl: DefaultTtl);
Utils.CreateCacheForTest(Client, CacheName);
}

Expand Down

0 comments on commit 89a337b

Please sign in to comment.