Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating IsPrimaryOnly command map for new commands #2101

Merged
merged 9 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
698 changes: 466 additions & 232 deletions src/StackExchange.Redis/Enums/RedisCommand.cs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/StackExchange.Redis/ExceptionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ internal static Exception NoConnectionAvailable(
// This can happen in cloud environments often, where user disables abort and has the wrong config
initialMessage = $"Connection to Redis never succeeded (attempts: {attempts} - check your config), unable to service operation: ";
}
else if (message is not null && message.IsPrimaryOnly())
slorello89 marked this conversation as resolved.
Show resolved Hide resolved
{
// If we know it's a primary-only command, indicate that in the error message
initialMessage = "No connection (requires writable - not eligible for replica) is active/available to service this operation: ";
}
else
{
// Default if we don't have a more useful error message here based on circumstances
Expand Down
85 changes: 1 addition & 84 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected Message(int db, CommandFlags flags, RedisCommand command)
}
}

bool primaryOnly = IsPrimaryOnly(command);
bool primaryOnly = command.IsPrimaryOnly();
Db = db;
this.command = command;
Flags = flags & UserSelectableFlags;
Expand Down Expand Up @@ -324,89 +324,6 @@ public static Message Create(int db, CommandFlags flags, RedisCommand command, i
public static Message CreateInSlot(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values) =>
new CommandSlotValuesMessage(db, slot, flags, command, values);

public static bool IsPrimaryOnly(RedisCommand command)
{
switch (command)
{
case RedisCommand.APPEND:
case RedisCommand.BITOP:
case RedisCommand.BLPOP:
case RedisCommand.BRPOP:
case RedisCommand.BRPOPLPUSH:
case RedisCommand.DECR:
case RedisCommand.DECRBY:
case RedisCommand.DEL:
case RedisCommand.EXPIRE:
case RedisCommand.EXPIREAT:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.GETDEL:
case RedisCommand.GETEX:
case RedisCommand.GETSET:
case RedisCommand.HDEL:
case RedisCommand.HINCRBY:
case RedisCommand.HINCRBYFLOAT:
case RedisCommand.HMSET:
case RedisCommand.HSET:
case RedisCommand.HSETNX:
case RedisCommand.INCR:
case RedisCommand.INCRBY:
case RedisCommand.INCRBYFLOAT:
case RedisCommand.LINSERT:
case RedisCommand.LPOP:
case RedisCommand.LPUSH:
case RedisCommand.LPUSHX:
case RedisCommand.LREM:
case RedisCommand.LSET:
case RedisCommand.LTRIM:
case RedisCommand.MIGRATE:
case RedisCommand.MOVE:
case RedisCommand.MSET:
case RedisCommand.MSETNX:
case RedisCommand.PERSIST:
case RedisCommand.PEXPIRE:
case RedisCommand.PEXPIREAT:
case RedisCommand.PFADD:
case RedisCommand.PFMERGE:
case RedisCommand.PSETEX:
case RedisCommand.RENAME:
case RedisCommand.RENAMENX:
case RedisCommand.RESTORE:
case RedisCommand.RPOP:
case RedisCommand.RPOPLPUSH:
case RedisCommand.RPUSH:
case RedisCommand.RPUSHX:
case RedisCommand.SADD:
case RedisCommand.SDIFFSTORE:
case RedisCommand.SET:
case RedisCommand.SETBIT:
case RedisCommand.SETEX:
case RedisCommand.SETNX:
case RedisCommand.SETRANGE:
case RedisCommand.SINTERSTORE:
case RedisCommand.SMOVE:
case RedisCommand.SPOP:
case RedisCommand.SREM:
case RedisCommand.SUNIONSTORE:
case RedisCommand.SWAPDB:
case RedisCommand.TOUCH:
case RedisCommand.UNLINK:
case RedisCommand.ZADD:
case RedisCommand.ZINTERSTORE:
case RedisCommand.ZINCRBY:
case RedisCommand.ZPOPMAX:
case RedisCommand.ZPOPMIN:
case RedisCommand.ZREM:
case RedisCommand.ZREMRANGEBYLEX:
case RedisCommand.ZREMRANGEBYRANK:
case RedisCommand.ZREMRANGEBYSCORE:
case RedisCommand.ZUNIONSTORE:
return true;
default:
return false;
}
}

/// <summary>Gets whether this is primary-only.</summary>
/// <remarks>
/// Note that the constructor runs the switch statement above, so
Expand Down
2 changes: 1 addition & 1 deletion tests/StackExchange.Redis.Tests/AsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void AsyncTasksReportFailureIfServerUnavailable()
Assert.NotNull(c.Exception);
var ex = c.Exception.InnerExceptions.Single();
Assert.IsType<RedisConnectionException>(ex);
Assert.StartsWith("No connection is active/available to service this operation: SADD " + key.ToString(), ex.Message);
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available to service this operation: SADD " + key.ToString(), ex.Message);
}

[Fact]
Expand Down
15 changes: 15 additions & 0 deletions tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,21 @@ public void NoConnectionException(bool abortOnConnect, int connCount, int comple
}
}

[Fact]
public void NoConnectionPrimaryOnlyException()
{
using var conn = ConnectionMultiplexer.Connect(TestConfig.Current.ReplicaServerAndPort, Writer);

var msg = Message.Create(0, CommandFlags.None, RedisCommand.SET, (RedisKey)Me(), (RedisValue)"test");
Assert.True(msg.IsPrimaryOnly());
var rawEx = ExceptionFactory.NoConnectionAvailable(conn, msg, null);
var ex = Assert.IsType<RedisConnectionException>(rawEx);
Writer.WriteLine("Exception: " + ex.Message);

// Ensure a primary-only operation like SET gives the additional context
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available to service this operation: SET", ex.Message);
}

[Theory]
[InlineData(true, ConnectionFailureType.ProtocolFailure, "ProtocolFailure on [0]:GET myKey (StringProcessor), my annotation")]
[InlineData(true, ConnectionFailureType.ConnectionDisposed, "ConnectionDisposed on [0]:GET myKey (StringProcessor), my annotation")]
Expand Down
18 changes: 12 additions & 6 deletions tests/StackExchange.Redis.Tests/Naming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,39 @@ public void CheckSignatures(Type type, bool isAsync)
}
}

/// <summary>
/// This test iterates over all <see cref="RedisCommand"/>s to ensure we have everything accounted for as primary-only or not.
/// </summary>
[Fact]
public void ShowReadOnlyOperations()
public void CheckReadOnlyOperations()
{
List<object> primaryReplica = new List<object>();
List<object> primaryOnly = new List<object>();
List<RedisCommand> primaryReplica = new(),
primaryOnly = new();
foreach (var val in (RedisCommand[])Enum.GetValues(typeof(RedisCommand)))
{
bool isPrimaryOnly = Message.IsPrimaryOnly(val);
bool isPrimaryOnly = val.IsPrimaryOnly();
(isPrimaryOnly ? primaryOnly : primaryReplica).Add(val);

if (!isPrimaryOnly)
{
Log(val.ToString());
}
}
// Ensure an unknown command from nowhere would violate the check above, as any not-yet-added one would.
Assert.Throws<ArgumentOutOfRangeException>(() => ((RedisCommand)99999).IsPrimaryOnly());

Log("primary-only: {0}, vs primary/replica: {1}", primaryOnly.Count, primaryReplica.Count);
Log("");
Log("primary-only:");
foreach (var val in primaryOnly)
{
Log(val?.ToString());
Log(val.ToString());
}
Log("");
Log("primary/replica:");
foreach (var val in primaryReplica)
{
Log(val?.ToString());
Log(val.ToString());
}
}

Expand Down
16 changes: 16 additions & 0 deletions tests/StackExchange.Redis.Tests/SortedSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,22 @@ public void SortedSetRangeStoreFailExclude()
Assert.Equal("exclude", exception.ParamName);
}

[Fact]
public void SortedSetRangeStoreFailForReplica()
{
using var conn = Create(require: RedisFeatures.v6_2_0);

var db = conn.GetDatabase();
var me = Me();
var sourceKey = $"{me}:ZSetSource";
var destinationKey = $"{me}:ZSetDestination";

db.KeyDelete(new RedisKey[] { sourceKey, destinationKey }, CommandFlags.FireAndForget);
db.SortedSetAdd(sourceKey, lexEntries, CommandFlags.FireAndForget);
var exception = Assert.Throws<RedisCommandException>(() => db.SortedSetRangeAndStore(sourceKey, destinationKey, 0, -1, flags: CommandFlags.DemandReplica));
Assert.Contains("Command cannot be issued to a replica", exception.Message);
}

[Fact]
public void SortedSetScoresSingle()
{
Expand Down
88 changes: 88 additions & 0 deletions tests/StackExchange.Redis.Tests/Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,94 @@ public void IsStreamType()
Assert.Equal(RedisType.Stream, keyType);
}

[Fact]
public void StreamOpsFailOnReplica()
{
using var conn = Create(configuration: TestConfig.Current.PrimaryServerAndPort, require: RedisFeatures.v5_0_0);
using var replicaConn = Create(configuration: TestConfig.Current.ReplicaServerAndPort, require: RedisFeatures.v5_0_0);

var db = conn.GetDatabase();
var replicaDb = replicaConn.GetDatabase();

// XADD: Works on primary, not secondary
db.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1");
var ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1"));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// Add stream content to primary
var key = GetUniqueKey("group_ack");
const string groupName1 = "test_group1",
groupName2 = "test_group2",
consumer1 = "test_consumer1",
consumer2 = "test_consumer2";

// Add for primary
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");

// XGROUP: Works on primary, not replica
db.StreamCreateConsumerGroup(key, groupName1, StreamPosition.Beginning);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamCreateConsumerGroup(key, groupName2, StreamPosition.Beginning));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// Create the second group on the primary, for the rest of the tests.
db.StreamCreateConsumerGroup(key, groupName2, StreamPosition.Beginning);

// XREADGROUP: Works on primary, not replica
// Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName1, consumer1, StreamPosition.NewMessages);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamReadGroup(key, groupName2, consumer2, StreamPosition.NewMessages));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XACK: Works on primary, not secondary
var oneAck = db.StreamAcknowledge(key, groupName1, id1);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamAcknowledge(key, groupName2, id1));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XPENDING: Works on primary and replica
// Get the pending messages for consumer2.
var pendingMessages = db.StreamPendingMessages(key, groupName1, 10, consumer1);
var pendingMessages2 = replicaDb.StreamPendingMessages(key, groupName2, 10, consumer2);

// XCLAIM: Works on primary, not replica
// Claim the messages for consumer1.
var messages = db.StreamClaim(key, groupName1, consumer1, 0, messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray());
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamClaim(key, groupName2, consumer2, 0, messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray()));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XDEL: Works on primary, not replica
db.StreamDelete(key, new RedisValue[] { id4 });
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamDelete(key, new RedisValue[] { id3 }));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XINFO: Works on primary and replica
db.StreamInfo(key);
replicaDb.StreamInfo(key);

// XLEN: Works on primary and replica
db.StreamLength(key);
replicaDb.StreamLength(key);

// XRANGE: Works on primary and replica
db.StreamRange(key);
replicaDb.StreamRange(key);

// XREVRANGE: Works on primary and replica
db.StreamRange(key, messageOrder: Order.Descending);
replicaDb.StreamRange(key, messageOrder: Order.Descending);

// XREAD: Works on primary and replica
db.StreamRead(key, "0-1");
replicaDb.StreamRead(key, "0-1");

// XTRIM: Works on primary, not replica
db.StreamTrim(key, 10);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamTrim(key, 10));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);
}

[Fact]
public void StreamAddSinglePairWithAutoId()
{
Expand Down