diff --git a/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs b/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs
index 65cc7b014..7b339bb99 100644
--- a/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs
+++ b/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs
@@ -38,6 +38,7 @@ private RedisChannel(byte[] value, bool isPatternBased)
{
Value = value;
IsPatternBased = isPatternBased;
+ IsKeyspaceChannel = value != null && Encoding.UTF8.GetString(value).ToLower().StartsWith("__key");
}
private static bool DeterminePatternBased(byte[] value, PatternMode mode)
@@ -182,6 +183,9 @@ internal void AssertNotNull()
internal RedisChannel Clone() => (byte[])Value?.Clone();
+ internal readonly bool IsPatternBased;
+ internal readonly bool IsKeyspaceChannel;
+
///
/// The matching pattern for this channel
///
diff --git a/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs b/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs
index 2f9eca6c1..b76949f9e 100644
--- a/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs
+++ b/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
@@ -144,7 +145,7 @@ internal long ValidateSubscriptions()
private sealed class Subscription
{
private Action handler;
- private ServerEndPoint owner;
+ private List owners = new List();
public Subscription(Action value) => handler = value;
@@ -170,33 +171,80 @@ public bool Remove(Action value)
}
public Task SubscribeToServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
+ {
+ // subscribe to all masters in cluster for keyspace/keyevent notifications
+ if (channel.IsKeyspaceChannel) {
+ return SubscribeToMasters(multiplexer, channel, flags, asyncState, internalCall);
+ }
+ return SubscribeToSingleServer(multiplexer, channel, flags, asyncState, internalCall);
+ }
+
+ private Task SubscribeToSingleServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
{
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
var selected = multiplexer.SelectServer(-1, cmd, flags, default(RedisKey));
- if (selected == null || Interlocked.CompareExchange(ref owner, selected, null) != null) return null;
+ lock (owners)
+ {
+ if (selected == null || owners.Contains(selected)) return null;
+ owners.Add(selected);
+ }
var msg = Message.Create(-1, flags, cmd, channel);
-
return selected.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
}
+ private Task SubscribeToMasters(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
+ {
+ List subscribeTasks = new List();
+ var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
+ var masters = multiplexer.GetServerSnapshot().Where(s => !s.IsSlave && s.EndPoint.Equals(s.ClusterConfiguration.Origin));
+
+ lock (owners)
+ {
+ foreach (var master in masters)
+ {
+ if (owners.Contains(master)) continue;
+ owners.Add(master);
+ var msg = Message.Create(-1, flags, cmd, channel);
+ if (internalCall) msg.FlagsRaw = msg.FlagsRaw | (CommandFlags)128;
+ subscribeTasks.Add(master.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
+ }
+ }
+
+ return Task.WhenAll(subscribeTasks);
+ }
+
public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
{
- var oldOwner = Interlocked.Exchange(ref owner, null);
- if (oldOwner == null) return null;
+ if (owners.Count == 0) return null;
+ List queuedTasks = new List();
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE;
var msg = Message.Create(-1, flags, cmd, channel);
if (internalCall) msg.SetInternalCall();
- return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
+ foreach (var owner in owners)
+ queuedTasks.Add(owner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
+ owners.Clear();
+ return Task.WhenAll(queuedTasks.ToArray());
}
- internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null);
+ internal ServerEndPoint GetOwner()
+ {
+ var owner = owners?[0]; // we subscribe to arbitrary server, so why not return one
+ return Interlocked.CompareExchange(ref owner, null, null);
+ }
internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
{
- if (server != null && Interlocked.CompareExchange(ref owner, server, server) == server)
+ bool hasOwner;
+
+ lock (owners)
+ {
+ hasOwner = owners.Contains(server);
+ }
+
+ if (server != null && hasOwner)
{
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel);
@@ -208,16 +256,15 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
{
bool changed = false;
- var oldOwner = Interlocked.CompareExchange(ref owner, null, null);
- if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE))
+ if (owners.Count != 0 && !owners.All(o => o.IsSelectable(RedisCommand.PSUBSCRIBE)))
{
if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null)
{
changed = true;
}
- oldOwner = null;
+ owners.Clear();
}
- if (oldOwner == null && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
+ if (owners.Count == 0 && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
{
changed = true;
}