Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP3 PUSH support for MONITOR command #1426

Open
wants to merge 10 commits into
base: unstable
Choose a base branch
from
11 changes: 10 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,16 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
listRewind(monitors, &li);
while ((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor, cmdobj);
if (monitor->resp > 2) {
struct ClientFlags old_flags = monitor->flag;
monitor->flag.pushing = 1;
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
addReplyPushLen(monitor, 2);
addReply(monitor, shared.monitorbulk);
addReply(monitor, cmdobj);
if (!old_flags.pushing) monitor->flag.pushing = 0;
} else {
addReply(monitor, cmdobj);
}
updateClientMemUsageAndBucket(monitor);
}
decrRefCount(cmdobj);
Expand Down
8 changes: 6 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,7 @@ void createSharedObjects(void) {
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19);

Expand Down Expand Up @@ -6181,8 +6182,11 @@ void monitorCommand(client *c) {
return;
}

/* ignore MONITOR if already replica or in monitor mode */
if (c->flag.replica) return;
/* Gently notify the client that the monitor command has already been issued. */
if (c->flag.replica) {
addReplyError(c, "The connection is already in monitoring mode.");
return;
}

c->flag.replica = 1;
c->flag.monitor = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct {
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
*retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack,
*special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk,
*smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,60 @@ start_server {tags {"introspection"}} {
set _ $res
} {*"set" "foo"*"get" "foo"*}

test {MONITOR should support RESP3 protocol} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd monitor
$rd read ; # Consume the MONITOR reply
$rd readraw 1;

r set foo bar
assert_equal ">2" [$rd read]
assert_equal "\$7" [$rd read]
assert_equal "monitor" [$rd read]
assert_match {*"set"*"foo"*"bar"*} [$rd read]

$rd close
}

test {multiple MONITOR commands should result in ERR} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd readraw 1;

$rd monitor
assert_equal "+OK" [$rd read]

$rd monitor
assert_equal "-ERR The connection is already in monitoring mode." [$rd read]

$rd close
}

test {MONITOR should came after PONG reply} {
set rd [valkey_deferring_client]
$rd HELLO 3
$rd read ; # Consume the HELLO reply

$rd monitor
$rd read ; # Consume the MONITOR reply
$rd readraw 1;

$rd ping

assert_equal "+PONG" [$rd read]
assert_equal ">2" [$rd read]
assert_equal "\$7" [$rd read]
assert_equal "monitor" [$rd read]
assert_match {*"ping"*} [$rd read]

$rd close
}

test {MONITOR can log commands issued by the scripting engine} {
set rd [valkey_deferring_client]
$rd monitor
Expand Down
Loading