From 30c1c7eadb5d5d2a0ecc4a380858b292e14a76f4 Mon Sep 17 00:00:00 2001 From: KowalczykBartek Date: Wed, 11 Dec 2024 18:06:04 +0100 Subject: [PATCH] RESP3 support for monitor command --- src/replication.c | 9 ++++++++- src/server.c | 1 + src/server.h | 2 +- tests/unit/introspection.tcl | 14 ++++++++++++++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..bef0b493f8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -704,7 +704,14 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors, &li); while ((ln = listNext(&li))) { client *monitor = ln->value; - addReply(monitor, cmdobj); + if(monitor->resp > 2) { + monitor->flag.pushing = 1; + addReplyPushLen(monitor,2); + addReply(monitor,shared.monitorbulk); + addReply(monitor,cmdobj); + } else { + addReply(monitor,cmdobj); + } updateClientMemUsageAndBucket(monitor); } decrRefCount(cmdobj); diff --git a/src/server.c b/src/server.c index 1e38b5ac69..16780a3ef1 100644 --- a/src/server.c +++ b/src/server.c @@ -2035,6 +2035,7 @@ void createSharedObjects(void) { shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); + shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19); diff --git a/src/server.h b/src/server.h index 14a16593b0..df7296f957 100644 --- a/src/server.h +++ b/src/server.h @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct { *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk, - *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], + *smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$\r\n" */ *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%\r\n" */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index bafc46d4b7..4b696febab 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -299,6 +299,20 @@ start_server {tags {"introspection"}} { set _ $res } {*"set" "foo"*"get" "foo"*} + test {MONITOR should support RESP3 protocol} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd monitor + $rd read ; # Consume the MONITOR reply + + r set foo bar + + assert_match {monitor*"set"*"foo"*"bar"*} [$rd read] + $rd close + } + test {MONITOR can log commands issued by the scripting engine} { set rd [valkey_deferring_client] $rd monitor