Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
2 changes: 0 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3297,9 +3297,7 @@ int clusterProcessPacket(clusterLink *link) {
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
clusterSlotStatsSetClusterMsgLength(ntohl(hdr->totlen));
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
clusterSlotStatsResetClusterMsgLength();
decrRefCount(channel);
decrRefCount(message);
}
Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ struct _clusterNode {
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
Expand Down
113 changes: 73 additions & 40 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand All @@ -20,12 +20,6 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

typedef struct {
uint32_t len;
} pubsubState;

static pubsubState pubsub_state;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -44,15 +38,15 @@ static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_s
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, int stat_type) {
serverAssert(stat_type != INVALID);
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
switch (stat_type) {
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
default: // SLOT_STAT_COUNT, INVALID
madolson marked this conversation as resolved.
Show resolved Hide resolved
serverPanic("Invalid slot stat type %d was found.", stat_type);
}
return slot_stat;
}
Expand All @@ -77,7 +71,7 @@ static int slotStatForSortDescCmp(const void *a, const void *b) {
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
Expand Down Expand Up @@ -106,6 +100,8 @@ static void addReplySlotStat(client *c, int slot) {
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

Expand All @@ -129,9 +125,56 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
Expand Down Expand Up @@ -177,21 +220,24 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
ctx->original_client->slot = -1;
}

static int canAddNetworkBytes(client *c) {
static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked);
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytes() check failure.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesIn(client *c) {
if (!canAddNetworkBytes(c)) return;
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;

if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
Expand All @@ -201,22 +247,6 @@ void clusterSlotStatsAddNetworkBytesIn(client *c) {
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsSetClusterMsgLength(uint32_t len) {
pubsub_state.len = len;
}

void clusterSlotStatsResetClusterMsgLength() {
pubsub_state.len = 0;
}

/* Adds network ingress bytes from sharded pubsub subscription.
* Since sharded pubsub targets a specific slot, we are able to aggregate its ingress bytes under per-slot context. */
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot) {
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);

server.cluster->slot_stats[slot].network_bytes_in += pubsub_state.len;
}

void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
Expand All @@ -242,13 +272,16 @@ void clusterSlotStatsCommand(client *c) {

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1, order_by = INVALID;
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
Expand Down
13 changes: 11 additions & 2 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
#include "script.h"
#include "cluster_legacy.h"

/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);
void clusterSlotStatsAddNetworkBytesIn(client *c);

/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
},
"network-bytes-in": {
madolson marked this conversation as resolved.
Show resolved Hide resolved
"type": "integer"
},
"network-bytes-out": {
"type": "integer"
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ int getKeySlot(sds key) {
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
Expand Down
1 change: 1 addition & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*/

#include "server.h"
#include "cluster_slot_stats.h"
madolson marked this conversation as resolved.
Show resolved Hide resolved

/* ================================ MULTI/EXEC ============================== */

Expand Down
7 changes: 6 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ client *createClient(connection *conn) {
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -451,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2486,6 +2489,7 @@ void resetClient(client *c) {
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2631,7 +2635,7 @@ void processInlineBuffer(client *c) {

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
Expand Down Expand Up @@ -2898,6 +2902,7 @@ void commandProcessed(client *c) {
if (c->flag.blocked) return;

reqresAppendResponse(c);
clusterSlotStatsAddNetworkBytesInForUserClient(c);
resetClient(c);

long long prev_offset = c->reploff;
Expand Down
6 changes: 3 additions & 3 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,21 +476,21 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
int receivers = 0;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
int slot = -1;

/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
clusterSlotStatsAddNetworkBytesInForShardedPubSub(slot);
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator *iter = dictGetIterator(clients);
while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
Expand Down
12 changes: 12 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "bio.h"
#include "functions.h"
#include "connection.h"
Expand Down Expand Up @@ -415,6 +416,8 @@ void feedReplicationBuffer(char *s, size_t len) {

if (server.repl_backlog == NULL) return;

clusterSlotStatsAddNetworkBytesOutForReplication(len);

while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
Expand Down Expand Up @@ -568,6 +571,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {

feedReplicationBufferWithObject(selectcmd);

/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);

server.replicas_eldb = dictid;
Expand Down Expand Up @@ -3967,6 +3975,10 @@ void replicationSendAck(void) {
addReplyBulkLongLong(c, server.fsynced_reploff);
}
c->flag.primary_force_reply = 0;

/* Accumulation from above replies must be reset back to 0 manually,
* as this subroutine does not invoke resetClient(). */
c->net_output_bytes_curr_cmd = 0;
madolson marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,6 +3694,8 @@ void afterCommand(client *c) {
/* Flush pending tracking invalidations. */
trackingHandlePendingKeyInvalidations();

clusterSlotStatsAddNetworkBytesOutForUserClient(c);

/* Flush other pending push messages. only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages);
Expand Down Expand Up @@ -4074,10 +4076,6 @@ int processCommand(client *c) {
call(c, flags);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys();
}

/* Now that c->slot has been parsed, and command has been executed,
* accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);
return C_OK;
}

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ typedef struct client {
* execution of this client's current command. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
} client;

/* ACL information */
Expand Down
Loading
Loading