diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 0e8c0f5ff8..7dbb33a708 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -361,6 +361,8 @@ struct _clusterNode { /* Struct used for storing slot statistics. */ typedef struct slotStat { uint64_t cpu_usec; + uint64_t network_bytes_in; + uint64_t network_bytes_out; } slotStat; struct clusterState { @@ -414,5 +416,4 @@ struct clusterState { slotStat slot_stats[CLUSTER_SLOTS]; }; - #endif // CLUSTER_LEGACY_H diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 24fcd7f587..708999ed5f 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -8,7 +8,7 @@ #define UNASSIGNED_SLOT 0 -typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes; +typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType; /* ----------------------------------------------------------------------------- * CLUSTER SLOT-STATS command @@ -38,13 +38,15 @@ static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_s return assigned_slots_count; } -static uint64_t getSlotStat(int slot, int stat_type) { - serverAssert(stat_type != INVALID); +static uint64_t getSlotStat(int slot, slotStatType stat_type) { uint64_t slot_stat = 0; - if (stat_type == KEY_COUNT) { - slot_stat = countKeysInSlot(slot); - } else if (stat_type == CPU_USEC) { - slot_stat = server.cluster->slot_stats[slot].cpu_usec; + switch (stat_type) { + case KEY_COUNT: slot_stat = countKeysInSlot(slot); break; + case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break; + case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break; + case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break; + case SLOT_STAT_COUNT: + case INVALID: serverPanic("Invalid slot stat type %d was found.", stat_type); } return slot_stat; } @@ -69,7 +71,7 @@ static int slotStatForSortDescCmp(const void *a, const void *b) { return entry_b.stat - entry_a.stat; } -static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) { +static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) { int i = 0; for (int slot = 0; slot < CLUSTER_SLOTS; slot++) { @@ -96,6 +98,10 @@ static void addReplySlotStat(client *c, int slot) { if (server.cluster_slot_stats_enabled) { addReplyBulkCString(c, "cpu-usec"); addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec); + addReplyBulkCString(c, "network-bytes-in"); + addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in); + addReplyBulkCString(c, "network-bytes-out"); + addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out); } } @@ -119,9 +125,56 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } +static int canAddNetworkBytesOut(client *c) { + return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1; +} + +/* Accumulates egress bytes upon sending RESP responses back to user clients. */ +void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { + if (!canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; +} + +/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ +void clusterSlotStatsAddNetworkBytesOutForReplication(int len) { + client *c = server.current_client; + if (c == NULL || !canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); +} + +/* Upon SPUBLISH, two egress events are triggered. + * 1) Internal propagation, for clients that are subscribed to the current node. + * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). + * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. + * This function covers the internal propagation component. */ +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { + /* For a blocked client, c->slot could be pre-filled. + * Thus c->slot is backed-up for restoration after aggregation is completed. */ + int _slot = c->slot; + c->slot = slot; + if (!canAddNetworkBytesOut(c)) { + /* c->slot should not change as a side effect of this function, + * regardless of the function's early return condition. */ + c->slot = _slot; + return; + } + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; + + /* For sharded pubsub, the client's network bytes metrics must be reset here, + * as resetClient() is not called until subscription ends. */ + c->net_output_bytes_curr_cmd = 0; + c->slot = _slot; +} + /* Adds reply for the ORDERBY variant. * Response is ordered based on the sort result. */ -static void addReplyOrderBy(client *c, int order_by, long limit, int desc) { +static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) { slotStatForSort slot_stats[CLUSTER_SLOTS]; collectAndSortSlotStats(slot_stats, order_by, desc); addReplySortedSlotStats(c, slot_stats, limit); @@ -167,8 +220,35 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) { ctx->original_client->slot = -1; } +static int canAddNetworkBytesIn(client *c) { + /* First, cluster mode must be enabled. + * Second, command should target a specific slot. + * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. + * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of + * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ + return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) && + !server.in_exec; +} + +/* Adds network ingress bytes of the current command in execution, + * calculated earlier within networking.c layer. + * + * Note: Below function should only be called once c->slot is parsed. + * Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure. + * */ +void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) { + if (!canAddNetworkBytesIn(c)) return; + + if (c->cmd->proc == execCommand) { + /* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */ + c->net_input_bytes_curr_cmd += 15; + } + + server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd; +} + void clusterSlotStatsCommand(client *c) { - if (server.cluster_enabled == 0) { + if (!server.cluster_enabled) { addReplyError(c, "This instance has cluster support disabled"); return; } @@ -192,11 +272,16 @@ void clusterSlotStatsCommand(client *c) { } else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) { /* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */ - int desc = 1, order_by = INVALID; + int desc = 1; + slotStatType order_by = INVALID; if (!strcasecmp(c->argv[3]->ptr, "key-count")) { order_by = KEY_COUNT; } else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) { order_by = CPU_USEC; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) { + order_by = NETWORK_BYTES_IN; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) { + order_by = NETWORK_BYTES_OUT; } else { addReplyError(c, "Unrecognized sort metric for ORDERBY."); return; diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index 9faa2a9598..d1a8c6b15d 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -3,7 +3,20 @@ #include "script.h" #include "cluster_legacy.h" +/* General use-cases. */ void clusterSlotStatReset(int slot); void clusterSlotStatResetAll(void); + +/* cpu-usec metric. */ void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration); void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx); + +/* network-bytes-in metric. */ +void clusterSlotStatsAddNetworkBytesInForUserClient(client *c); +void clusterSlotStatsSetClusterMsgLength(uint32_t len); +void clusterSlotStatsResetClusterMsgLength(void); + +/* network-bytes-out metric. */ +void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c); +void clusterSlotStatsAddNetworkBytesOutForReplication(int len); +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot); diff --git a/src/commands/cluster-slot-stats.json b/src/commands/cluster-slot-stats.json index 4f226e4ef3..180b0fd443 100644 --- a/src/commands/cluster-slot-stats.json +++ b/src/commands/cluster-slot-stats.json @@ -38,6 +38,12 @@ }, "cpu-usec": { "type": "integer" + }, + "network-bytes-in": { + "type": "integer" + }, + "network-bytes-out": { + "type": "integer" } } } diff --git a/src/db.c b/src/db.c index 073bec6ca3..d0a6640f57 100644 --- a/src/db.c +++ b/src/db.c @@ -231,15 +231,27 @@ int calculateKeySlot(sds key) { int getKeySlot(sds key) { /* This is performance optimization that uses pre-set slot id from the current command, * in order to avoid calculation of the key hash. + * * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. * It only gets set during the execution of command under `call` method. Other flows requesting * the key slot would fallback to calculateKeySlot. + * + * Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots, + * so we must always recompute the slot for commands coming from the primary. */ - if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) { + if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command && + !server.current_client->flag.primary) { debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot); return server.current_client->slot; } - return calculateKeySlot(key); + int slot = calculateKeySlot(key); + /* For the case of replicated commands from primary, getNodeByQuery() never gets called, + * and thus c->slot never gets populated. That said, if this command ends up accessing a key, + * we are able to backfill c->slot here, where the key's hash calculation is made. */ + if (server.current_client && server.current_client->flag.primary) { + server.current_client->slot = slot; + } + return slot; } /* This is a special version of dbAdd() that is used only when loading diff --git a/src/networking.c b/src/networking.c index 59b894367c..13b76a0893 100644 --- a/src/networking.c +++ b/src/networking.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" @@ -231,7 +232,9 @@ client *createClient(connection *conn) { if (conn) linkClient(c); initClientMultiState(c); c->net_input_bytes = 0; + c->net_input_bytes_curr_cmd = 0; c->net_output_bytes = 0; + c->net_output_bytes_curr_cmd = 0; c->commands_processed = 0; return c; } @@ -449,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { return; } + c->net_output_bytes_curr_cmd += len; + /* We call it here because this function may affect the reply * buffer offset (see function comment) */ reqresSaveClientReplyOffset(c); @@ -2477,10 +2482,12 @@ void resetClient(client *c) { c->cur_script = NULL; c->reqtype = 0; c->multibulklen = 0; + c->net_input_bytes_curr_cmd = 0; c->bulklen = -1; c->slot = -1; c->flag.executing_command = 0; c->flag.replication_done = 0; + c->net_output_bytes_curr_cmd = 0; /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); @@ -2623,6 +2630,21 @@ void processInlineBuffer(client *c) { c->argv_len_sum += sdslen(argv[j]); } zfree(argv); + + /* Per-slot network bytes-in calculation. + * + * We calculate and store the current command's ingress bytes under + * c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred + * until c->slot is parsed later within processCommand(). + * + * Calculation: For inline buffer, every whitespace is of length 1, + * with the exception of the trailing '\r\n' being length 2. + * + * For example; + * Command) SET key value + * Inline) SET key value\r\n + * */ + c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); c->read_flags |= READ_FLAGS_PARSING_COMPLETED; } @@ -2694,7 +2716,8 @@ void processMultibulkBuffer(client *c) { /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*'); - ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll); + size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); + ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll); if (!ok || ll > INT_MAX) { c->read_flags |= READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN; return; @@ -2717,6 +2740,39 @@ void processMultibulkBuffer(client *c) { c->argv_len = min(c->multibulklen, 1024); c->argv = zmalloc(sizeof(robj *) * c->argv_len); c->argv_len_sum = 0; + + /* Per-slot network bytes-in calculation. + * + * We calculate and store the current command's ingress bytes under + * c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred + * until c->slot is parsed later within processCommand(). + * + * Calculation: For multi bulk buffer, we accumulate four factors, namely; + * + * 1) multibulklen_slen + 1 + * Cumulative string length (and not the value of) of multibulklen, + * including +1 from RESP first byte. + * 2) bulklen_slen + c->argc + * Cumulative string length (and not the value of) of bulklen, + * including +1 from RESP first byte per argument count. + * 3) c->argv_len_sum + * Cumulative string length of all argument vectors. + * 4) c->argc * 4 + 2 + * Cumulative string length of all white-spaces, for which there exists a total of + * 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen. + * + * For example; + * Command) SET key value + * RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n + * + * 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1). + * 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc). + * 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum). + * 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2). + * + * The 1st component is calculated within the below line. + * */ + c->net_input_bytes_curr_cmd += (multibulklen_slen + 1); } serverAssertWithInfo(c, NULL, c->multibulklen > 0); @@ -2740,7 +2796,8 @@ void processMultibulkBuffer(client *c) { return; } - ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll); + size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1); + ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll); if (!ok || ll < 0 || (!(is_primary) && ll > server.proto_max_bulk_len)) { c->read_flags |= READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN; return; @@ -2780,6 +2837,9 @@ void processMultibulkBuffer(client *c) { } } c->bulklen = ll; + /* Per-slot network bytes-in calculation, 2nd component. + * c->argc portion is deferred, as it may not have been fully populated at this point. */ + c->net_input_bytes_curr_cmd += bulklen_slen; } /* Read bulk argument */ @@ -2816,7 +2876,12 @@ void processMultibulkBuffer(client *c) { } /* We're done when c->multibulk == 0 */ - if (c->multibulklen == 0) c->read_flags |= READ_FLAGS_PARSING_COMPLETED; + if (c->multibulklen == 0) { + /* Per-slot network bytes-in calculation, 3rd and 4th components. + * Here, the deferred c->argc from 2nd component is added, resulting in c->argc * 5 instead of * 4. */ + c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 5 + 2)); + c->read_flags |= READ_FLAGS_PARSING_COMPLETED; + } } /* Perform necessary tasks after a command was executed: @@ -2835,6 +2900,7 @@ void commandProcessed(client *c) { if (c->flag.blocked) return; reqresAppendResponse(c); + clusterSlotStatsAddNetworkBytesInForUserClient(c); resetClient(c); long long prev_offset = c->reploff; diff --git a/src/pubsub.c b/src/pubsub.c index b79b532bf8..eacadfc185 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" /* Structure to hold the pubsub related metadata. Currently used * for pubsub and pubsubshard feature. */ @@ -475,13 +476,13 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) int receivers = 0; dictEntry *de; dictIterator *di; - unsigned int slot = 0; + int slot = -1; /* Send to clients listening for that channel */ if (server.cluster_enabled && type.shard) { slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); } - de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); + de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel); if (de) { dict *clients = dictGetVal(de); dictEntry *entry; @@ -489,6 +490,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) while ((entry = dictNext(iter)) != NULL) { client *c = dictGetKey(entry); addReplyPubsubMessage(c, channel, message, *type.messageBulk); + clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot); updateClientMemUsageAndBucket(c); receivers++; } diff --git a/src/replication.c b/src/replication.c index 1da616d801..5d7374f0e6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "bio.h" #include "functions.h" #include "connection.h" @@ -415,6 +416,8 @@ void feedReplicationBuffer(char *s, size_t len) { if (server.repl_backlog == NULL) return; + clusterSlotStatsAddNetworkBytesOutForReplication(len); + while (len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ @@ -568,6 +571,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { feedReplicationBufferWithObject(selectcmd); + /* Although the SELECT command is not associated with any slot, + * its per-slot network-bytes-out accumulation is made by the above function call. + * To cancel-out this accumulation, below adjustment is made. */ + clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); server.replicas_eldb = dictid; @@ -3972,6 +3980,10 @@ void replicationSendAck(void) { addReplyBulkLongLong(c, server.fsynced_reploff); } c->flag.primary_force_reply = 0; + + /* Accumulation from above replies must be reset back to 0 manually, + * as this subroutine does not invoke resetClient(). */ + c->net_output_bytes_curr_cmd = 0; } } diff --git a/src/server.c b/src/server.c index af172fcd2f..d332e6989c 100644 --- a/src/server.c +++ b/src/server.c @@ -3694,6 +3694,8 @@ void afterCommand(client *c) { /* Flush pending tracking invalidations. */ trackingHandlePendingKeyInvalidations(); + clusterSlotStatsAddNetworkBytesOutForUserClient(c); + /* Flush other pending push messages. only when we are not in nested call. * So the messages are not interleaved with transaction response. */ if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages); diff --git a/src/server.h b/src/server.h index c70d027759..ccdece20dd 100644 --- a/src/server.h +++ b/src/server.h @@ -1372,9 +1372,13 @@ typedef struct client { #ifdef LOG_REQ_RES clientReqResInfo reqres; #endif - unsigned long long net_input_bytes; /* Total network input bytes read from this client. */ - unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */ - unsigned long long commands_processed; /* Total count of commands this client executed. */ + unsigned long long net_input_bytes; /* Total network input bytes read from this client. */ + unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the + * execution of this client's current command. */ + unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */ + unsigned long long commands_processed; /* Total count of commands this client executed. */ + unsigned long long + net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */ } client; /* ACL information */ diff --git a/tests/unit/cluster/slot-stats.tcl b/tests/unit/cluster/slot-stats.tcl index 38457ba8d7..3e3487a612 100644 --- a/tests/unit/cluster/slot-stats.tcl +++ b/tests/unit/cluster/slot-stats.tcl @@ -68,7 +68,7 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_ } } -proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} { +proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} { set slot_stats_1 [convert_array_into_dict $slot_stats_1] set slot_stats_2 [convert_array_into_dict $slot_stats_2] assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]} @@ -76,8 +76,15 @@ proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} { dict for {slot stats_1} $slot_stats_1 { assert {[dict exists $slot_stats_2 $slot]} set stats_2 [dict get $slot_stats_2 $slot] - foreach metric_name $metrics_to_assert { - assert {[dict get $stats_1 $metric_name] == [dict get $stats_2 $metric_name]} + + # For deterministic metrics, we assert their equality. + foreach metric $deterministic_metrics { + assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]} + } + # For non-deterministic metrics, we assert their non-zeroness as a best-effort. + foreach metric $non_deterministic_metrics { + assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \ + ([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)} } } } @@ -98,17 +105,6 @@ proc assert_slot_visibility {slot_stats expected_slots} { assert_all_slots_have_been_seen $expected_slots } -proc assert_slot_stats_key_count {slot_stats expected_slots_key_count} { - set slot_stats [convert_array_into_dict $slot_stats] - dict for {slot stats} $slot_stats { - if {[dict exists $expected_slots_key_count $slot]} { - set key_count [dict get $stats key-count] - set key_count_expected [dict get $expected_slots_key_count $slot] - assert {$key_count == $key_count_expected} - } - } -} - proc assert_slot_stats_monotonic_order {slot_stats orderby is_desc} { # For Tcl dict, the order of iteration is the order in which the keys were inserted into the dictionary # Thus, the response ordering is preserved upon calling 'convert_array_into_dict()'. @@ -347,10 +343,349 @@ start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-en } # ----------------------------------------------------------------------------- -# Test cases for CLUSTER SLOT-STATS key-count metric correctness. +# Test cases for CLUSTER SLOT-STATS network-bytes-in. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + # Define shared variables. + set key "key" + set key_slot [R 0 cluster keyslot $key] + set metrics_to_assert [list network-bytes-in] + + test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." { + # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + R 0 SET $key value + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 33 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." { + set rd [valkey_deferring_client] + # SET key value\r\n --> 15 bytes. + $rd write "SET $key value\r\n" + $rd flush + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 15 + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, blocking command." { + set rd [valkey_deferring_client] + # *3\r\n$5\r\nblpop\r\n$3\r\nkey\r\n$1\r\n0\r\n --> 31 bytes. + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + + # Slot-stats must be empty here, as the client is yet to be unblocked. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 66 ;# 31 + 35 bytes. + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, multi-exec transaction." { + set r [valkey_client] + # *1\r\n$5\r\nmulti\r\n --> 15 bytes. + $r MULTI + # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + assert {[$r SET $key value] eq {QUEUED}} + # *1\r\n$4\r\nexec\r\n --> 14 bytes. + assert {[$r EXEC] eq {OK}} + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 62 ;# 15 + 33 + 14 bytes. + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, non slot specific command." { + R 0 INFO + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, pub/sub." { + # PUB/SUB does not get accumulated at per-slot basis, + # as it is cluster-wide and is not slot specific. + set rd [valkey_deferring_client] + $rd subscribe channel + R 0 publish channel message + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + set channel "channel" + set key_slot [R 0 cluster keyslot $channel] + set metrics_to_assert [list network-bytes-in] + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS network-bytes-in, sharded pub/sub." { + set slot [R 0 cluster keyslot $channel] + set primary [Rn 0] + set replica [Rn 1] + set replica_subcriber [valkey_deferring_client -1] + $replica_subcriber SSUBSCRIBE $channel + # *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes. + $primary SPUBLISH $channel hello + # *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + + set slot_stats [$primary CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 42 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + + set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 34 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness. # ----------------------------------------------------------------------------- start_cluster 1 0 {tags {external:skip cluster}} { + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + set expected_slots_to_key_count [dict create $key_slot 1] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." { + R 0 INFO + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." { + R 0 SET $key value + # +OK\r\n --> 5 bytes + + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 5 + ] + ] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." { + set rd [valkey_deferring_client] + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + + # Assert empty slot stats here, since COB is yet to be flushed due to the block. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # Unblock the command. + # LPUSH client) :1\r\n --> 4 bytes. + # BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 28 ;# 4 + 24 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set key "FOO" + set key_slot [R 0 CLUSTER KEYSLOT $key] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." { + assert_equal [R 0 SET $key VALUE] {OK} + # Local client) +OK\r\n --> 5 bytes. + # Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 ;# 5 + 33 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set channel "channel" + set key_slot [R 0 cluster keyslot $channel] + set channel_secondary "channel2" + set key_slot_secondary [R 0 cluster keyslot $channel_secondary] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [valkey_client] + set replica [valkey_deferring_client -1] + + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + $subscriber SSUBSCRIBE $channel + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 + ] + ] + R 0 CONFIG RESETSTAT + + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 46 ;# 4 + 42 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + $subscriber QUIT + R 0 FLUSHALL + R 0 CONFIG RESETSTAT + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [valkey_client] + set replica [valkey_deferring_client -1] + + # Stack multi-slot subscriptions against a single client. + # For primary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + # For secondary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes + $subscriber SSUBSCRIBE $channel + $subscriber SSUBSCRIBE $channel_secondary + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 38 + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 39 + ] + ] + R 0 CONFIG RESETSTAT + + # For primary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + # For secondary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + assert_equal 1 [$publisher SPUBLISH $channel_secondary hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 46 ;# 4 + 42 bytes. + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 47 ;# 4 + 43 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS key-count metric correctness. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { # Define shared variables. set key "FOO" @@ -435,7 +770,9 @@ start_cluster 1 0 {tags {external:skip cluster}} { # Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument. # ----------------------------------------------------------------------------- -start_cluster 1 0 {tags {external:skip cluster}} { +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + set metrics [list "key-count" "cpu-usec" "network-bytes-in" "network-bytes-out"] # SET keys for target hashslots, to encourage ordering. set hash_tags [list 0 1 2 3 4] @@ -456,32 +793,36 @@ start_cluster 1 0 {tags {external:skip cluster}} { } test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" { - set orderby "key-count" - assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC LIMIT -1} - set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC] - assert_slot_stats_monotonic_descent $slot_stats $orderby + foreach orderby $metrics { + set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC] + assert_slot_stats_monotonic_descent $slot_stats $orderby + } } test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" { - set orderby "key-count" - set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC] - assert_slot_stats_monotonic_ascent $slot_stats $orderby + foreach orderby $metrics { + set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC] + assert_slot_stats_monotonic_ascent $slot_stats $orderby + } } test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" { R 0 FLUSHALL SYNC + R 0 CONFIG RESETSTAT - set limit 5 - set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC] - set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC] - set slot_stats_desc_length [llength $slot_stats_desc] - set slot_stats_asc_length [llength $slot_stats_asc] - assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length} - - # The key count of all slots is 0, so we will order by slot in ascending order. - set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0] - assert_slot_visibility $slot_stats_desc $expected_slots - assert_slot_visibility $slot_stats_asc $expected_slots + foreach orderby $metrics { + set limit 5 + set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC] + set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC] + set slot_stats_desc_length [llength $slot_stats_desc] + set slot_stats_asc_length [llength $slot_stats_asc] + assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length} + + # All slot statistics have been reset to 0, so we will order by slot in ascending order. + set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0] + assert_slot_visibility $slot_stats_desc $expected_slots + assert_slot_visibility $slot_stats_asc $expected_slots + } } test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" { @@ -490,28 +831,39 @@ start_cluster 1 0 {tags {external:skip cluster}} { R 0 CLUSTER FLUSHSLOTS R 0 CLUSTER ADDSLOTS 100 101 - set num_assigned_slots 2 - set limit 5 - set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC] - set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC] - set slot_stats_desc_length [llength $slot_stats_desc] - set slot_stats_asc_length [llength $slot_stats_asc] - set expected_response_length [expr min($num_assigned_slots, $limit)] - assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length} - - set expected_slots [dict create 100 0 101 0] - assert_slot_visibility $slot_stats_desc $expected_slots - assert_slot_visibility $slot_stats_asc $expected_slots + foreach orderby $metrics { + set num_assigned_slots 2 + set limit 5 + set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC] + set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC] + set slot_stats_desc_length [llength $slot_stats_desc] + set slot_stats_asc_length [llength $slot_stats_asc] + set expected_response_length [expr min($num_assigned_slots, $limit)] + assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length} + + set expected_slots [dict create 100 0 101 0] + assert_slot_visibility $slot_stats_desc $expected_slots + assert_slot_visibility $slot_stats_asc $expected_slots + } } - test "CLUSTER SLOT-STATS ORDERBY unsupported sort metric." { - set orderby "non-existent-metric" - assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} - + test "CLUSTER SLOT-STATS ORDERBY arg sanity check." { + # Non-existent argument. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg} + # Negative LIMIT. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1} + # Non-existent ORDERBY metric. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric} # When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics. + R 0 CONFIG SET cluster-slot-stats-enabled no set orderby "cpu-usec" assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + set orderby "network-bytes-in" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + set orderby "network-bytes-out" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} } + } # ----------------------------------------------------------------------------- @@ -521,14 +873,23 @@ start_cluster 1 0 {tags {external:skip cluster}} { start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { # Define shared variables. - set key "FOO" + set key "key" set key_slot [R 0 CLUSTER KEYSLOT $key] - - # For replication, only those metrics that are deterministic upon replication are asserted. - # * key-count is asserted, as both the primary and its replica must hold the same number of keys. - # * cpu-usec is not asserted, as its micro-seconds command duration is not guaranteed to be exact - # between the primary and its replica. - set metrics_to_assert [list key-count] + set primary [Rn 0] + set replica [Rn 1] + + # For replication, assertions are split between deterministic and non-deterministic metrics. + # * For deterministic metrics, strict equality assertions are made. + # * For non-deterministic metrics, non-zeroness assertions are made. + # Non-zeroness as in, both primary and replica should either have some value, or no value at all. + # + # * key-count is deterministic between primary and its replica. + # * cpu-usec is non-deterministic between primary and its replica. + # * network-bytes-in is deterministic between primary and its replica. + # * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck(). + set deterministic_metrics [list key-count network-bytes-in] + set non_deterministic_metrics [list cpu-usec] + set empty_metrics [list network-bytes-out] # Setup replication. assert {[s -1 role] eq {slave}} @@ -539,39 +900,75 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en } R 1 readonly - test "CLUSTER SLOT-STATS key-count replication for new keys" { + test "CLUSTER SLOT-STATS metrics replication for new keys" { + # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. R 0 SET $key VALUE - set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - set expected_slots_key_count [dict create $key_slot 1] - assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count - wait_for_replica_key_exists $key 1 + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 33 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat set $replica]] + } else { + fail "Replica did not receive the command." + } set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT - test "CLUSTER SLOT-STATS key-count replication for existing keys" { + test "CLUSTER SLOT-STATS metrics replication for existing keys" { + # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$13\r\nvalue_updated\r\n --> 42 bytes. R 0 SET $key VALUE_UPDATED - set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - set expected_slots_key_count [dict create $key_slot 1] - assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count - wait_for_replica_key_exists $key 1 + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 42 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat set $replica]] + } else { + fail "Replica did not receive the command." + } set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT - test "CLUSTER SLOT-STATS key-count replication for deleting keys" { + test "CLUSTER SLOT-STATS metrics replication for deleting keys" { + # *2\r\n$3\r\ndel\r\n$3\r\nkey\r\n --> 22 bytes. R 0 DEL $key - set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - set expected_slots_key_count [dict create $key_slot 0] - assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count - wait_for_replica_key_exists $key 0 + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 0 network-bytes-in 22 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat del $replica]] + } else { + fail "Replica did not receive the command." + } set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] - assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT } \ No newline at end of file