-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster Keyspace Notification Support #813
Cluster Keyspace Notification Support #813
Conversation
…lowing subscription to multiple nodes
0820d60
to
c33da57
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a look here - I think the List<ServerEndPoint>
is the source of a lot of problems. Instead, use a Hashset
(look at how ServerSnapshot
works in ConnectionMultiplexer
). You get all the locking behavior there for free and need to deal with it very little. Happy to give it another look after such a pass.
I agree this is a needed feature, and sorry we've been so far behind on issues and PRs for quite a while now.
@@ -38,6 +39,7 @@ private RedisChannel(byte[] value, bool isPatternBased) | |||
{ | |||
Value = value; | |||
IsPatternBased = isPatternBased; | |||
IsKeyspaceChannel = value != null && Encoding.UTF8.GetString(value).ToLower().StartsWith("__key"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating a byte[]
for each comparison - it'd be far cheaper to pre-compute the byte array of "__key"
once in a static and compare them instead :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the fixed version.
https://pastebin.com/Q9T0Q99K
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE; | ||
var msg = Message.Create(-1, flags, cmd, channel); | ||
if (internalCall) msg.SetInternalCall(); | ||
return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState); | ||
foreach (var owner in owners) | ||
queuedTasks.Add(owner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
owners
isn't locked here, so this can go boom under load. The previous interlock above prevented this scenario.
internal ServerEndPoint GetOwner() | ||
{ | ||
var owner = owners?[0]; // we subscribe to arbitrary server, so why not return one | ||
return Interlocked.CompareExchange(ref owner, null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make sense anymore...would need to lock owners
here instead.
Somebody, please merge this PR. |
Wow this completely fell off my radar. I'll have a look at the feedback and
current state of things and update as appropriate.
…On Mon, Dec 2, 2019, 5:22 AM Davidnovarro ***@***.***> wrote:
Somebody, please merge this PR.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#813?email_source=notifications&email_token=ACD4CINGZJHL3SQNUG26FTDQWUD2TA5CNFSM4EY6BNLKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEFTPBDQ#issuecomment-560394382>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACD4CIMGBLUXSW36T3JJID3QWUD2TANCNFSM4EY6BNLA>
.
|
Will this feature by available in 2.1 ? its not on the release notes. |
Bumping this as it's been a while and would be a fantastic feature to be able to utilize. |
I adapted the feature with new code and created pull request. You can look at here |
Closing in favor of #1536 to get this going, though we still need to add testing there. |
Attempt at adding support for cluster keyspace notifications to subscriber by allowing subscription to multiple nodes.
The basic goal with this was to alter the internal logic of the
Subscription
class to accommodate connection to multiple endpoints for a single channel when a pattern-based keyspace/keyevent subscription is detected (rather than havingSubscription
simply represent a 1:1 relationship between channel/endpoint). The hope was that a subscriber could handle subscriptions uniformly, regardless of the special cases.However, this approach has been problematic for me so far. This could be my setup, but it seems that there's a significant delay for the subscription to connect, which is causing the initial
PSUBSCRIBE
command messages to fail. Other times, the application will intermittently work, receiving notifications from the various masters in the cluster, and then will just start throwing repeated "No connection found to handle X operation" exceptions, occasionally correcting itself. So, it seems like something somewhere doesn't like me forcing these connections. I'm not sure if there's a flaw with my understanding of how the Subscriber fits into the application, or just the implementation (or both).I appreciate any feedback/insights. I'm happy to put in the time to make this work, with any guidance. Also, wasn't sure how to proceed with the tests as (last I checked) there were a few failing already. Happy to implement those too.
implements: #789