Skip to content

Commit

Permalink
feat: Add the read concern header (#581)
Browse files Browse the repository at this point in the history
* feat: Add the read concern header

Add the read concern header to the top level Configuration.

Set the consistent read concern header in the cache client test fixture
if the CONSISTENT_READS env var is set.

Add a new make target for the github build to use that sets the header.

Update the test-cache-service make target to set the header so that the
canaries will use consistent reads.

Stop using the TestCacheClient as it is no longer needed.

* Don't send the read-concern header if it's balanced
  • Loading branch information
nand4011 authored Sep 26, 2024
1 parent 6f13407 commit 84886d7
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 424 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: make GRPC_WEB=${{ matrix.grpc-web }} build

- name: Test
run: make test
run: make prod-test

build_examples:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/on-push-to-main-branch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: make GRPC_WEB=${{ matrix.grpc-web }} build

- name: Test
run: make test
run: make prod-test

generate_readme:
runs-on: ubuntu-latest
Expand Down
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ ifneq (,$(findstring NT,$(OS)))
BUILD_TARGETS := build-dotnet6 build-dotnet-framework
TEST_TARGETS := test-dotnet6 test-dotnet-framework
TEST_TARGETS_AUTH_SERVICE := test-dotnet6-auth-service test-dotnet-framework-auth-service
TEST_TARGETS_CACHE_SERVICE := test-dotnet6-cache-service test-dotnet-framework-cache-service
TEST_TARGETS_TOPICS_SERVICE := test-dotnet6-topics-service test-dotnet-framework-topics-service
else
BUILD_TARGETS := build-dotnet6
TEST_TARGETS := test-dotnet6
TEST_TARGETS_AUTH_SERVICE := test-dotnet6-auth-service
TEST_TARGETS_CACHE_SERVICE := test-dotnet6-cache-service
TEST_TARGETS_TOPICS_SERVICE := test-dotnet6-topics-service
endif

Expand Down Expand Up @@ -98,6 +96,16 @@ restore:
test: ${TEST_TARGETS}


## Run unit and integration tests with consistent reads (conditioned by OS)
prod-test:
@echo "running tests with consistent reads..."
ifeq (,$(findstring NT,$(OS)))
@CONSISTENT_READS=1 $(MAKE) ${TEST_TARGETS}
else
@set CONSISTENT_READS=1 && $(MAKE) ${TEST_TARGETS}
endif


## Run unit and integration tests on the .NET 6.0 runtime
test-dotnet6:
@echo "Running unit and integration tests on the .NET 6.0 runtime..."
Expand Down Expand Up @@ -151,7 +159,12 @@ test-auth-service: ${TEST_TARGETS_AUTH_SERVICE}


## Run cache service tests
test-cache-service: ${TEST_TARGETS_CACHE_SERVICE}
test-cache-service:
ifeq (,$(findstring NT,$(OS)))
@CONSISTENT_READS=1 $(MAKE) test-dotnet6-cache-service
else
@set CONSISTENT_READS=1 && $(MAKE) test-dotnet6-cache-service test-dotnet-framework-cache-service
endif


## Run leaderboard service tests
Expand Down
61 changes: 42 additions & 19 deletions src/Momento.Sdk/Config/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Momento.Sdk.Config.Middleware;
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Config.Transport;
Expand All @@ -14,18 +13,24 @@ public class Configuration : IConfiguration
{
/// <inheritdoc />
public ILoggerFactory LoggerFactory { get; }

/// <inheritdoc />
public IRetryStrategy RetryStrategy { get; }

/// <inheritdoc />
public IList<IMiddleware> Middlewares { get; }

/// <inheritdoc />
public ITransportStrategy TransportStrategy { get; }

/// <inheritdoc />
public ReadConcern ReadConcern { get; }

/// <inheritdoc cref="Momento.Sdk.Config.IConfiguration" />
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, ITransportStrategy transportStrategy)
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy,
ITransportStrategy transportStrategy)
: this(loggerFactory, retryStrategy, new List<IMiddleware>(), transportStrategy)
{

}

/// <summary>
Expand All @@ -34,31 +39,34 @@ public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy,
/// <param name="retryStrategy">Defines a contract for how and when to retry a request</param>
/// <param name="middlewares">The Middleware interface allows the Configuration to provide a higher-order function that wraps all requests.</param>
/// <param name="transportStrategy">This is responsible for configuring network tunables.</param>
/// <param name="loggerFactory">This is responsible for configuraing logging.</param>
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, IList<IMiddleware> middlewares, ITransportStrategy transportStrategy)
/// <param name="loggerFactory">This is responsible for configuring logging.</param>
/// <param name="readConcern">The client-wide setting for read-after-write consistency. Defaults to Balanced.</param>
public Configuration(ILoggerFactory loggerFactory, IRetryStrategy retryStrategy, IList<IMiddleware> middlewares,
ITransportStrategy transportStrategy, ReadConcern readConcern = ReadConcern.Balanced)
{
this.LoggerFactory = loggerFactory;
this.RetryStrategy = retryStrategy;
this.Middlewares = middlewares;
this.TransportStrategy = transportStrategy;
LoggerFactory = loggerFactory;
RetryStrategy = retryStrategy;
Middlewares = middlewares;
TransportStrategy = transportStrategy;
ReadConcern = readConcern;
}

/// <inheritdoc />
public IConfiguration WithRetryStrategy(IRetryStrategy retryStrategy)
{
return new Configuration(LoggerFactory, retryStrategy, Middlewares, TransportStrategy);
return new Configuration(LoggerFactory, retryStrategy, Middlewares, TransportStrategy, ReadConcern);
}

/// <inheritdoc />
public IConfiguration WithMiddlewares(IList<IMiddleware> middlewares)
{
return new Configuration(LoggerFactory, RetryStrategy, middlewares, TransportStrategy);
return new Configuration(LoggerFactory, RetryStrategy, middlewares, TransportStrategy, ReadConcern);
}

/// <inheritdoc />
public IConfiguration WithTransportStrategy(ITransportStrategy transportStrategy)
{
return new Configuration(LoggerFactory, RetryStrategy, Middlewares, transportStrategy);
return new Configuration(LoggerFactory, RetryStrategy, Middlewares, transportStrategy, ReadConcern);
}

/// <summary>
Expand All @@ -72,12 +80,13 @@ public Configuration WithAdditionalMiddlewares(IList<IMiddleware> additionalMidd
retryStrategy: RetryStrategy,
middlewares: Middlewares.Concat(additionalMiddlewares).ToList(),
transportStrategy: TransportStrategy,
loggerFactory: LoggerFactory
loggerFactory: LoggerFactory,
readConcern: ReadConcern
);
}

/// <summary>
/// Add the specified client timeout to an existing instance of Configuration object as an addiion to the existing transport strategy.
/// Add the specified client timeout to an existing instance of Configuration object as an addition to the existing transport strategy.
/// </summary>
/// <param name="clientTimeout">The amount of time to wait before cancelling the request.</param>
/// <returns>Configuration object with client timeout provided</returns>
Expand All @@ -87,7 +96,20 @@ public Configuration WithClientTimeout(TimeSpan clientTimeout)
retryStrategy: RetryStrategy,
middlewares: Middlewares,
transportStrategy: TransportStrategy.WithClientTimeout(clientTimeout),
loggerFactory: LoggerFactory
loggerFactory: LoggerFactory,
readConcern: ReadConcern
);
}

/// <inheritdoc />
public IConfiguration WithReadConcern(ReadConcern readConcern)
{
return new Configuration(
retryStrategy: RetryStrategy,
middlewares: Middlewares,
transportStrategy: TransportStrategy,
loggerFactory: LoggerFactory,
readConcern: readConcern
);
}

Expand All @@ -106,14 +128,15 @@ public override bool Equals(object obj)

var other = (Configuration)obj;
return RetryStrategy.Equals(other.RetryStrategy) &&
Middlewares.SequenceEqual(other.Middlewares) &&
TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory);
Middlewares.SequenceEqual(other.Middlewares) &&
TransportStrategy.Equals(other.TransportStrategy) &&
LoggerFactory.Equals(other.LoggerFactory) &&
ReadConcern.Equals(other.ReadConcern);
}

/// <inheritdoc />
public override int GetHashCode()
{
return base.GetHashCode();
}
}
}
9 changes: 9 additions & 0 deletions src/Momento.Sdk/Config/IConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface IConfiguration
public IList<IMiddleware> Middlewares { get; }
/// <inheritdoc cref="Momento.Sdk.Config.Transport.ITransportStrategy" />
public ITransportStrategy TransportStrategy { get; }
/// <inheritdoc cref="Momento.Sdk.Config.ReadConcern" />
public ReadConcern ReadConcern { get; }

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified retry strategy.
Expand Down Expand Up @@ -49,4 +51,11 @@ public interface IConfiguration
/// <param name="clientTimeout">The amount of time to wait before cancelling the request.</param>
/// <returns>Configuration object with custom client timeout provided</returns>
public IConfiguration WithClientTimeout(TimeSpan clientTimeout);

/// <summary>
/// Creates a new instance of the Configuration object, updated to use the specified read concern.
/// </summary>
/// <param name="readConcern">The read concern setting.</param>
/// <returns>Configuration object with custom read concern provided</returns>
public IConfiguration WithReadConcern(ReadConcern readConcern);
}
41 changes: 41 additions & 0 deletions src/Momento.Sdk/Config/ReadConcern.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;

namespace Momento.Sdk.Config;

/// <summary>
/// The read consistency setting for the cache client. Consistent guarantees read after write consistency, but applies a
/// 6x multiplier to your operation usage.
/// </summary>
public enum ReadConcern
{
/// <summary>
/// Balanced is the default read concern for the cache client.
/// </summary>
Balanced,
/// <summary>
/// Consistent read concern guarantees read after write consistency.
/// </summary>
Consistent
}

/// <summary>
/// Extension methods for the ReadConcern enum.
/// </summary>
public static class ReadConcernExtensions
{
/// <summary>
/// Converts the read concern to a string value.
/// </summary>
/// <param name="readConcern">to convert to a string</param>
/// <returns></returns>
/// <exception cref="ArgumentOutOfRangeException">if given an unknown read concern</exception>
public static string ToStringValue(this ReadConcern readConcern)
{
return readConcern switch
{
ReadConcern.Balanced => "balanced",
ReadConcern.Consistent => "consistent",
_ => throw new ArgumentOutOfRangeException()
};
}
}
17 changes: 9 additions & 8 deletions src/Momento.Sdk/Internal/DataGrpcManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;
using System.Net.Http;
#if USE_GRPC_WEB
using Grpc.Net.Client.Web;
#endif
using Microsoft.Extensions.Logging;
using Momento.Protos.CacheClient;
using Momento.Protos.CachePing;
Expand All @@ -17,7 +12,6 @@
using Momento.Sdk.Config.Retry;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Internal.Middleware;
using static System.Reflection.Assembly;

namespace Momento.Sdk.Internal;

Expand Down Expand Up @@ -256,10 +250,17 @@ public class DataGrpcManager : GrpcManager

internal DataGrpcManager(IConfiguration config, string authToken, string endpoint): base(config.TransportStrategy.GrpcConfig, config.LoggerFactory, authToken, endpoint, "DataGrpcManager")
{
// Not sending a head concern header is treated the same as sending a balanced read concern header
if (config.ReadConcern != ReadConcern.Balanced)
{
var readConcernHeader = new Header(Header.ReadConcern, config.ReadConcern.ToStringValue());
headers.Add(readConcernHeader);
}

var middlewares = config.Middlewares.Concat(
new List<IMiddleware> {
new RetryMiddleware(config.LoggerFactory, config.RetryStrategy),
new HeaderMiddleware(config.LoggerFactory, this.headers),
new HeaderMiddleware(config.LoggerFactory, headers),
new MaxConcurrentRequestsMiddleware(config.LoggerFactory, config.TransportStrategy.MaxConcurrentRequests)
}
).ToList();
Expand All @@ -280,7 +281,7 @@ await pingClient.PingAsync(new _PingRequest(),
catch (RpcException ex)
{
MomentoErrorTransportDetails transportDetails = new MomentoErrorTransportDetails(
new MomentoGrpcErrorDetails(ex.StatusCode, ex.Message, null)
new MomentoGrpcErrorDetails(ex.StatusCode, ex.Message)
);
throw new ConnectionException("Eager connection to server failed", transportDetails, ex);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Momento.Sdk/Internal/Middleware/HeaderMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Logging;
using Momento.Sdk.Config;
using Momento.Sdk.Config.Middleware;

namespace Momento.Sdk.Internal.Middleware
Expand All @@ -14,6 +15,7 @@ internal class Header
public const string AuthorizationKey = "authorization";
public const string AgentKey = "agent";
public const string RuntimeVersionKey = "runtime-version";
public const string ReadConcern = "read-concern";
public readonly List<string> onceOnlyHeaders = new List<string> { Header.AgentKey, Header.RuntimeVersionKey };
public string Name;
public string Value;
Expand Down
34 changes: 22 additions & 12 deletions tests/Integration/Momento.Sdk.Tests/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ public class CacheClientFixture : IDisposable
public CacheClientFixture()
{
AuthProvider = new EnvMomentoTokenProvider("MOMENTO_API_KEY");

// Enable consistent reads if the CONSISTENT_READS env var is set to anything
var consistentReads = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("CONSISTENT_READS"));

var config = Configurations.Laptop.Latest(LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
builder.AddFilter("Grpc.Net.Client", LogLevel.Error);
builder.SetMinimumLevel(LogLevel.Information);
}));

if (consistentReads)
{
config = config.WithReadConcern(ReadConcern.Consistent);
}

CacheName = $"dotnet-integration-{Utils.NewGuidString()}";
Client = new TestCacheClient(Configurations.Laptop.Latest(LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
builder.AddFilter("Grpc.Net.Client", LogLevel.Error);
builder.SetMinimumLevel(LogLevel.Information);
})),
AuthProvider, defaultTtl: DefaultTtl);
Client = new CacheClient(config, AuthProvider, defaultTtl: DefaultTtl);
Utils.CreateCacheForTest(Client, CacheName);
}

Expand Down
Loading

0 comments on commit 84886d7

Please sign in to comment.