Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP3 PUSH support for MONITOR command #1426

Open
wants to merge 10 commits into
base: unstable
Choose a base branch
from
9 changes: 8 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,14 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
listRewind(monitors, &li);
while ((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor, cmdobj);
if (monitor->resp > 2) {
monitor->flag.pushing = 1;
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
addReplyPushLen(monitor, 2);
addReply(monitor, shared.monitorbulk);
addReply(monitor, cmdobj);
} else {
addReply(monitor, cmdobj);
}
updateClientMemUsageAndBucket(monitor);
}
decrRefCount(cmdobj);
Expand Down
5 changes: 3 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,7 @@ void createSharedObjects(void) {
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19);

Expand Down Expand Up @@ -4258,10 +4259,10 @@ int processCommand(client *c) {
return C_OK;
}

/* Prevent a replica from sending commands that access the keyspace.
/* Prevent a replica (but not a monitor client) from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if (c->flag.replica && (is_may_replicate_command || is_write_command || is_read_command)) {
if ((c->flag.replica && !c->flag.monitor) && (is_may_replicate_command || is_write_command || is_read_command)) {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
rejectCommandFormat(c, "Replica can't interact with the keyspace");
return C_OK;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct {
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
*retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack,
*special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk,
*smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
Expand Down
64 changes: 64 additions & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,70 @@ start_server {tags {"introspection"}} {
set _ $res
} {*"set" "foo"*"get" "foo"*}

test {MONITOR should support RESP3 protocol} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd monitor
$rd read ; # Consume the MONITOR reply
$rd readraw 1 ;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

r set foo bar
assert_equal ">2" [$rd read]
assert_equal "\$7" [$rd read]
assert_equal "monitor" [$rd read]
assert_match {*"set"*"foo"*"bar"*} [$rd read]

$rd close
}

test {MONITOR and CLIENT TRACKING should work on the same connection with RESP3} {
set rd1 [valkey_deferring_client]
set rd2 [valkey_deferring_client]

$rd1 HELLO 3
$rd1 read ; # Consume the HELLO reply

$rd1 client tracking on
$rd1 read ; # Consume the TRACKING reply

$rd1 monitor
$rd1 read ; # Consume the MONITOR reply

$rd1 set foo bar
assert_equal "OK" [$rd1 read]
assert_match {monitor*"set"*"foo"*"bar"*} [$rd1 read]

$rd1 readraw 1 ;
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

$rd1 get foo
assert_equal "\$3" [$rd1 read]
assert_equal "bar" [$rd1 read]

assert_equal ">2" [$rd1 read]
assert_equal "\$7" [$rd1 read]
assert_equal "monitor" [$rd1 read]
assert_match {*"get"*"foo"*} [$rd1 read]

$rd2 set foo baz

assert_equal ">2" [$rd1 read]
assert_equal "\$10" [$rd1 read]
assert_equal "invalidate" [$rd1 read]
assert_equal "*1" [$rd1 read]
assert_equal "\$3" [$rd1 read]
assert_equal "foo" [$rd1 read]

assert_equal ">2" [$rd1 read]
assert_equal "\$7" [$rd1 read]
assert_equal "monitor" [$rd1 read]
assert_match {*"set"*"foo"*"baz"*} [$rd1 read]

$rd1 close
$rd2 close
}

test {MONITOR can log commands issued by the scripting engine} {
set rd [valkey_deferring_client]
$rd monitor
Expand Down
Loading