Skip to content

Commit

Permalink
Major revision.
Browse files Browse the repository at this point in the history
- Added config guard, "cluster-slot-stats-enabled", with default value true.
- Added more test-cases.
- Added network-bytes-in accumulation for replicated commands.
- Added network-bytes-in accumulation for sharded pub/sub.
- Added network-bytes-in accumulation for MULTI's RESP.
- Moved slot_stats array under clusterState.
- Fixed network-bytes-in accumulation for blocking commands.
- Fixed c->argc accumulation by deferring its calculation.

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 9, 2024
1 parent 018d698 commit b0f1009
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 75 deletions.
3 changes: 2 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
clusterSlotStatsReset();
clusterSlotStatResetAll();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -3247,6 +3247,7 @@ int clusterProcessPacket(clusterLink *link) {
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
clusterSlotStatsAddNetworkBytesInForShardedPubSub(channel, message);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
Expand Down
8 changes: 7 additions & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ struct _clusterNode {
Update with updateAndCountChangedNodeHealth(). */
};

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t network_bytes_in;
} slotStat;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -362,7 +367,8 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
slotStat slot_stats[CLUSTER_SLOTS];
};


#endif // CLUSTER_LEGACY_H
46 changes: 28 additions & 18 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t network_bytes_in;
} slotStat;

/* Struct used for storing slot statistics, for all slots owned by the current shard. */
struct slotStat cluster_slot_stats[CLUSTER_SLOTS];

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -56,7 +48,7 @@ static uint64_t getSlotStat(int slot, int stat_type) {
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = cluster_slot_stats[slot].network_bytes_in;
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
}
return slot_stat;
}
Expand Down Expand Up @@ -102,7 +94,7 @@ static void addReplySlotStat(client *c, int slot) {
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, cluster_slot_stats[slot].network_bytes_in);
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -137,17 +129,19 @@ static int canAddNetworkBytes(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */
return server.cluster_enabled && c->slot != -1 && !(c->flag.blocked);
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked);
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through countKeysInSlot(). */
cluster_slot_stats[slot].network_bytes_in = 0;
server.cluster->slot_stats[slot].network_bytes_in = 0;
}

void clusterSlotStatsReset(void) {
memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats));
void clusterSlotStatResetAll(void) {
if (server.cluster == NULL) return;

memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}

/* Adds network ingress bytes of the current command in execution,
Expand All @@ -159,15 +153,32 @@ void clusterSlotStatsReset(void) {
void clusterSlotStatsAddNetworkBytesIn(client *c) {
if (!canAddNetworkBytes(c)) return;

cluster_slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}

server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

/* Adds network ingress bytes from sharded pubsub subscription.
* Since sharded pubsub targets a specific slot, we are able to aggregate its ingress bytes under per-slot context. */
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(robj *channel, robj *message) {
int slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
server.cluster->slot_stats[slot].network_bytes_in += (sdslen(channel->ptr) + sdslen(message->ptr));
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

if (!server.cluster_slot_stats_enabled) {
addReplyError(c, "Slot usage statistics configuration is disabled");
return;
}

/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
Expand All @@ -193,8 +204,7 @@ void clusterSlotStatsCommand(client *c) {
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in")) {
order_by = NETWORK_BYTES_IN;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported "
"metrics are: key-count and cpu-usec.");
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
Expand Down
4 changes: 3 additions & 1 deletion src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatsReset(void);
void clusterSlotStatResetAll(void);
void clusterSlotStatsAddNetworkBytesIn(client *c);
void clusterSlotStatsAddNetworkBytesInForShardedPubSub(robj *channel, robj *message);
2 changes: 1 addition & 1 deletion src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"key-count": {
"type": "integer"
},
"memory-bytes-in": {
"network-bytes-in": {
"type": "integer"
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3069,6 +3069,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 1, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
9 changes: 8 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,14 @@ int getKeySlot(sds key) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
int slot = calculateKeySlot(key);
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
if (server.current_client && server.current_client->flag.primary) {
server.current_client->slot = slot;
}
return slot;
}

/* This is a special version of dbAdd() that is used only when loading
Expand Down
10 changes: 6 additions & 4 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2483,8 +2483,9 @@ int processMultibulkBuffer(client *c) {
}
}
c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component. */
c->net_input_bytes_curr_cmd += (bulklen_slen + c->argc);
/* Per-slot network bytes-in calculation, 2nd component.
* c->argc portion is deferred, as it may not have been fully populated at this point. */
c->net_input_bytes_curr_cmd += bulklen_slen;
}

/* Read bulk argument */
Expand Down Expand Up @@ -2522,8 +2523,9 @@ int processMultibulkBuffer(client *c) {

/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 4 + 2));
/* Per-slot network bytes-in calculation, 3rd and 4th components.
* Here, the deferred c->argc from 2nd component is added, resulting in c->argc * 5 instead of * 4. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 5 + 2));
return C_OK;
}

Expand Down
1 change: 1 addition & 0 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
Expand Down
9 changes: 5 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2500,7 +2500,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatsReset();
clusterSlotStatResetAll();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down Expand Up @@ -3871,9 +3871,6 @@ int processCommand(client *c) {
}
}

/* Now that c->slot has been parsed, accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
(is_write_command || (is_read_command && !c->flag.readonly))) {
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
Expand Down Expand Up @@ -4048,6 +4045,10 @@ int processCommand(client *c) {
call(c, flags);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys();
}

/* Now that c->slot has been parsed, and command has been executed,
* accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);
return C_OK;
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
int cluster_slot_stats_enabled; /* Cluster wide slot usage statistics tracking enabled. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX][4]; /* Align to RESP3 */
Expand Down
Loading

0 comments on commit b0f1009

Please sign in to comment.