Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP3 PUSH support for MONITOR command #1426

Open
wants to merge 10 commits into
base: unstable
Choose a base branch
from
9 changes: 8 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,14 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
listRewind(monitors, &li);
while ((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor, cmdobj);
if (monitor->resp > 2) {
monitor->flag.pushing = 1;
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
addReplyPushLen(monitor, 2);
addReply(monitor, shared.monitorbulk);
addReply(monitor, cmdobj);
} else {
addReply(monitor, cmdobj);
}
updateClientMemUsageAndBucket(monitor);
}
decrRefCount(cmdobj);
Expand Down
8 changes: 6 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,7 @@ void createSharedObjects(void) {
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19);

Expand Down Expand Up @@ -6181,8 +6182,11 @@ void monitorCommand(client *c) {
return;
}

/* ignore MONITOR if already replica or in monitor mode */
if (c->flag.replica) return;
/* Gently notify the client that the monitor command has already been issued. */
if (c->flag.replica) {
addReplyError(c, "The connection is already in monitoring mode.");
return;
}

c->flag.replica = 1;
c->flag.monitor = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct {
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
*retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack,
*special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk,
*smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,40 @@ start_server {tags {"introspection"}} {
set _ $res
} {*"set" "foo"*"get" "foo"*}

test {MONITOR should support RESP3 protocol} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd monitor
$rd read ; # Consume the MONITOR reply
$rd readraw 1 ;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

r set foo bar
assert_equal ">2" [$rd read]
assert_equal "\$7" [$rd read]
assert_equal "monitor" [$rd read]
assert_match {*"set"*"foo"*"bar"*} [$rd read]

$rd close
}

test {multiple MONITOR commands should result in ERR} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd readraw 1 ;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

$rd monitor
assert_equal "+OK" [$rd read]

$rd monitor
assert_equal "-ERR The connection is already in monitoring mode." [$rd read]

$rd close
}

test {MONITOR can log commands issued by the scripting engine} {
set rd [valkey_deferring_client]
$rd monitor
Expand Down
Loading