Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
3 changes: 2 additions & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ struct _clusterNode {
/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
Expand Down Expand Up @@ -385,5 +387,4 @@ struct clusterState {
slotStat slot_stats[CLUSTER_SLOTS];
};


#endif // CLUSTER_LEGACY_H
107 changes: 96 additions & 11 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand Down Expand Up @@ -38,13 +38,15 @@ static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_s
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
switch (stat_type) {
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
default: // SLOT_STAT_COUNT, INVALID
madolson marked this conversation as resolved.
Show resolved Hide resolved
serverPanic("Invalid slot stat type %d was found.", stat_type);
}
return slot_stat;
}
Expand All @@ -69,7 +71,7 @@ static int slotStatForSortDescCmp(const void *a, const void *b) {
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
Expand All @@ -96,6 +98,10 @@ static void addReplySlotStat(client *c, int slot) {
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

Expand All @@ -119,9 +125,56 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
Expand Down Expand Up @@ -167,8 +220,35 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
ctx->original_client->slot = -1;
}

static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;

if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}

server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
Expand All @@ -192,11 +272,16 @@ void clusterSlotStatsCommand(client *c) {

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
Expand Down
13 changes: 13 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,20 @@
#include "script.h"
#include "cluster_legacy.h"

/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);

/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
6 changes: 6 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
},
"cpu-usec": {
"type": "integer"
},
"network-bytes-in": {
madolson marked this conversation as resolved.
Show resolved Hide resolved
"type": "integer"
},
"network-bytes-out": {
"type": "integer"
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,19 @@ int getKeySlot(sds key) {
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
int slot = calculateKeySlot(key);
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
if (server.current_client && server.current_client->flag.primary) {
server.current_client->slot = slot;
}
return slot;
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

/* This is a special version of dbAdd() that is used only when loading
Expand Down
1 change: 1 addition & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*/

#include "server.h"
#include "cluster_slot_stats.h"
madolson marked this conversation as resolved.
Show resolved Hide resolved

/* ================================ MULTI/EXEC ============================== */

Expand Down
72 changes: 69 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -231,7 +232,9 @@ client *createClient(connection *conn) {
if (conn) linkClient(c);
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -449,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2479,10 +2484,12 @@ void resetClient(client *c) {
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
c->net_input_bytes_curr_cmd = 0;
c->bulklen = -1;
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2625,6 +2632,21 @@ void processInlineBuffer(client *c) {
c->argv_len_sum += sdslen(argv[j]);
}
zfree(argv);

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For inline buffer, every whitespace is of length 1,
* with the exception of the trailing '\r\n' being length 2.
*
* For example;
* Command) SET key value
* Inline) SET key value\r\n
* */
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
}

Expand Down Expand Up @@ -2696,7 +2718,8 @@ void processMultibulkBuffer(client *c) {
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll);
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll);
if (!ok || ll > INT_MAX) {
c->read_flags |= READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN;
return;
Expand All @@ -2719,6 +2742,39 @@ void processMultibulkBuffer(client *c) {
c->argv_len = min(c->multibulklen, 1024);
c->argv = zmalloc(sizeof(robj *) * c->argv_len);
c->argv_len_sum = 0;

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
*
* 1) multibulklen_slen + 1
* Cumulative string length (and not the value of) of multibulklen,
* including +1 from RESP first byte.
* 2) bulklen_slen + c->argc
* Cumulative string length (and not the value of) of bulklen,
* including +1 from RESP first byte per argument count.
* 3) c->argv_len_sum
* Cumulative string length of all argument vectors.
* 4) c->argc * 4 + 2
* Cumulative string length of all white-spaces, for which there exists a total of
* 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen.
*
* For example;
* Command) SET key value
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
* 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1).
* 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc).
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
* 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2).
*
* The 1st component is calculated within the below line.
* */
c->net_input_bytes_curr_cmd += (multibulklen_slen + 1);
}

serverAssertWithInfo(c, NULL, c->multibulklen > 0);
Expand All @@ -2742,7 +2798,8 @@ void processMultibulkBuffer(client *c) {
return;
}

ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll);
if (!ok || ll < 0 || (!(is_primary) && ll > server.proto_max_bulk_len)) {
c->read_flags |= READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN;
return;
Expand Down Expand Up @@ -2782,6 +2839,9 @@ void processMultibulkBuffer(client *c) {
}
}
c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component.
* c->argc portion is deferred, as it may not have been fully populated at this point. */
c->net_input_bytes_curr_cmd += bulklen_slen;
}

/* Read bulk argument */
Expand Down Expand Up @@ -2818,7 +2878,12 @@ void processMultibulkBuffer(client *c) {
}

/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components.
* Here, the deferred c->argc from 2nd component is added, resulting in c->argc * 5 instead of * 4. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 5 + 2));
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
}
}

/* Perform necessary tasks after a command was executed:
Expand All @@ -2837,6 +2902,7 @@ void commandProcessed(client *c) {
if (c->flag.blocked) return;

reqresAppendResponse(c);
clusterSlotStatsAddNetworkBytesInForUserClient(c);
resetClient(c);

long long prev_offset = c->reploff;
Expand Down
Loading
Loading