Skip to content

Commit

Permalink
Add network-bytes-out metric support for CLUSTER SLOT-STATS command (v…
Browse files Browse the repository at this point in the history
…alkey-io#20).

The metric tracks network egress bytes under per-slot context,
by hooking onto COB buffer mutations.

The metric can be viewed by calling the CLUSTER SLOT-STATS command,
with sample response attached below;

```
127.0.0.1:6379> cluster slot-stats slotsrange 0 0
1) 1) (integer) 0
    2) 1) "key-count"
       2) (integer) 0
       3) "cpu-usec"
       4) (integer) 0
       5) "network-bytes-in"
       6) (integer) 0
       7) "network-bytes-out"
       8) (integer) 0
```

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 24, 2024
1 parent cee6c74 commit b90f537
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4000,6 +4000,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(msgblock->totlen);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ struct _clusterNode {
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
Expand Down
78 changes: 71 additions & 7 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand Down Expand Up @@ -106,6 +106,8 @@ static void addReplySlotStat(client *c, int slot) {
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

Expand All @@ -129,6 +131,63 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should be kept idempotent, regardless of the function's early return condition. */
c->slot = _slot;
return;
}

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the external propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += len;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -177,21 +236,24 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
ctx->original_client->slot = -1;
}

static int canAddNetworkBytes(client *c) {
static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked);
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytes() check failure.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesIn(client *c) {
if (!canAddNetworkBytes(c)) return;
if (!canAddNetworkBytesIn(c)) return;

if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
Expand All @@ -205,7 +267,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len) {
pubsub_state.len = len;
}

void clusterSlotStatsResetClusterMsgLength() {
void clusterSlotStatsResetClusterMsgLength(void) {
pubsub_state.len = 0;
}

Expand Down Expand Up @@ -249,6 +311,8 @@ void clusterSlotStatsCommand(client *c) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
Expand Down
5 changes: 5 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ void clusterSlotStatsAddNetworkBytesIn(client *c);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot);
void clusterSlotStatsAddNetworkBytesOut(client *c);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len);
3 changes: 3 additions & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*/

#include "server.h"
#include "cluster_slot_stats.h"

/* ================================ MULTI/EXEC ============================== */

Expand Down Expand Up @@ -91,6 +92,8 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;

clusterSlotStatsAddNetworkBytesIn(c);
}

void discardTransaction(client *c) {
Expand Down
6 changes: 5 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ client *createClient(connection *conn) {
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -451,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2486,6 +2489,7 @@ void resetClient(client *c) {
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2631,7 +2635,7 @@ void processInlineBuffer(client *c) {

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
Expand Down
5 changes: 3 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,21 +476,22 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
int receivers = 0;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
int slot = -1;

/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
clusterSlotStatsAddNetworkBytesInForShardedPubSub(slot);
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator *iter = dictGetIterator(clients);
while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
Expand Down
8 changes: 8 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "bio.h"
#include "functions.h"
#include "connection.h"
Expand Down Expand Up @@ -415,6 +416,8 @@ void feedReplicationBuffer(char *s, size_t len) {

if (server.repl_backlog == NULL) return;

clusterSlotStatsAddNetworkBytesOutForReplication(len);

while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
Expand Down Expand Up @@ -568,6 +571,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {

feedReplicationBufferWithObject(selectcmd);

/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);

server.replicas_eldb = dictid;
Expand Down
7 changes: 3 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,6 +3694,9 @@ void afterCommand(client *c) {
/* Flush pending tracking invalidations. */
trackingHandlePendingKeyInvalidations();

clusterSlotStatsAddNetworkBytesIn(c);
clusterSlotStatsAddNetworkBytesOutForUserClient(c);

/* Flush other pending push messages. only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages);
Expand Down Expand Up @@ -4074,10 +4077,6 @@ int processCommand(client *c) {
call(c, flags);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys();
}

/* Now that c->slot has been parsed, and command has been executed,
* accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);
return C_OK;
}

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ typedef struct client {
* execution of this client's current command. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
} client;

/* ACL information */
Expand Down
Loading

0 comments on commit b90f537

Please sign in to comment.