Skip to content

Commit

Permalink
Align rejected unblocked commands to update the correct error statistic.
Browse files Browse the repository at this point in the history
Currently, in case a blocked command is unblocked externally (eg. due
to the relevant slot being migrated or the CLIENT UNBLOCK command was
issued, the command statistics will always update the failed_calls error
statistic. This leads to missalignment with valkey-io@90b9f08
as well as some inconsistencies. For example when a key is migrated
during cluster slot migration, clients blocked on XREADGROUP will be
unblocked and update the rejected_calls stat, while clients blocked on
BLPOP will get unblocked updating the failed_calls stat.

In this PR we add explicit indication in updateStatsOnUnblock thet
indicates if the command was rejected or failed.

Signed-off-by: ranshid <[email protected]>
  • Loading branch information
ranshid committed May 30, 2024
1 parent 6bab2d7 commit 0d29d9b
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
17 changes: 12 additions & 5 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,21 @@ void blockClient(client *c, int btype) {
* he will attempt to reprocess the command which will update the statistics.
* However in case the client was timed out or in case of module blocked client is being unblocked
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slowlog and monitors.*/
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
* This function will make updates to the commandstats, slowlog and monitors.
* The failed_or_rejected parameter is an indication that the blocked command was either failed internally or
* rejected/aborted externally. In case the command was rejected the value ERROR_COMMAND_REJECTED should be passed.
* In case the command failed internally, ERROR_COMMAND_FAILED should be passed.
* A value of zero indicate no error was reported after the command was unblocked */
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_or_rejected) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
if (had_errors) c->lastcmd->failed_calls++;
debugServerAssertWithInfo(c, NULL, failed_or_rejected >= 0 && failed_or_rejected < ERROR_COMMAND_FAILED);
if (failed_or_rejected & ERROR_COMMAND_FAILED) c->lastcmd->failed_calls++;
else if (failed_or_rejected & ERROR_COMMAND_REJECTED)
c->lastcmd->rejected_calls++;
if (server.latency_tracking_enabled)
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration * 1000);
/* Log the command into the Slow log if needed. */
Expand Down Expand Up @@ -683,7 +690,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
elapsedStart(&replyTimer);

if (moduleTryServeClientBlockedOnKey(c, key)) {
updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), ((server.stat_total_error_replies != prev_error_replies) ? ERROR_COMMAND_FAILED : 0));
moduleUnblockClient(c);
}
/* We need to call afterCommand even if the client was not unblocked
Expand Down Expand Up @@ -712,7 +719,7 @@ void unblockClientOnTimeout(client *c) {
* If err_str is provided it will be used to reply to the blocked client */
void unblockClientOnError(client *c, const char *err_str) {
if (err_str) addReplyError(c, err_str);
updateStatsOnUnblock(c, 0, 0, 1);
updateStatsOnUnblock(c, 0, 0, ERROR_COMMAND_REJECTED);
if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -8281,7 +8281,7 @@ void moduleHandleBlockedClients(void) {
if (c && !clientHasModuleAuthInProgress(c)) {
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors)
: (server.stat_total_error_replies != prev_error_replies);
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors);
updateStatsOnUnblock(c, bc->background_duration, reply_us, (had_errors ? ERROR_COMMAND_FAILED : 0));
}

if (c != NULL) {
Expand Down Expand Up @@ -8368,7 +8368,7 @@ void moduleBlockedClientTimedOut(client *c, int from_module) {
moduleFreeContext(&ctx);

if (!from_module)
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
updateStatsOnUnblock(c, bc->background_duration, 0, ((server.stat_total_error_replies != prev_error_replies) ? ERROR_COMMAND_FAILED : 0));

/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3503,7 +3503,7 @@ void blockForPreReplication(client *c, mstime_t timeout, long long offset, long
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_or_rejected);
void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with);
void totalNumberOfStatefulKeys(unsigned long *blocking_keys,
unsigned long *blocking_keys_on_nokey,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ start_server {tags {"repl external:skip"}} {
} else {
fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]"
}
assert_match {*calls=1,*,rejected_calls=0,failed_calls=1*} [cmdrstat blpop $B]
assert_match {*calls=1,*,rejected_calls=1*,failed_calls=0} [cmdrstat blpop $B]
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/info.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ start_server {tags {"info" "external:skip"}} {
r client unblock $rd_id error
assert_error {UNBLOCKED*} {$rd read}
assert_match {*count=1*} [errorstat UNBLOCKED]
assert_match {*calls=1,*,rejected_calls=0,failed_calls=1} [cmdstat blpop]
assert_match {*calls=1,*,rejected_calls=1,failed_calls=0} [cmdstat blpop]
assert_equal [s total_error_replies] 1
$rd close
}
Expand Down

0 comments on commit 0d29d9b

Please sign in to comment.