Skip to content

Commit

Permalink
wip: Prevent a slow moving sql query to block replication
Browse files Browse the repository at this point in the history
Signed-off-by: Akshat Sikarwar <[email protected]>
  • Loading branch information
akshatsikarwar committed Sep 13, 2024
1 parent d628301 commit 589ca8f
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 12 deletions.
3 changes: 3 additions & 0 deletions db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -3682,6 +3682,9 @@ extern int gbl_sql_release_locks_on_slow_reader;
extern int gbl_fail_client_write_lock;
extern int gbl_server_admin_mode;

extern int gbl_epoch_time;
extern int gbl_watchdog_disable_at_start;

void csc2_free_all(void);

/* hack to temporary allow bools on production stage */
Expand Down
2 changes: 1 addition & 1 deletion db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ extern int gbl_altersc_latency_inc;
extern int gbl_sc_history_max_rows;
extern int gbl_sc_status_max_rows;
extern int gbl_rep_process_pstack_time;
extern int gbl_sql_recover_ddlk_duration;

extern void set_snapshot_impl(snap_impl_enum impl);
extern const char *snap_impl_str(snap_impl_enum impl);
Expand Down Expand Up @@ -1890,4 +1891,3 @@ const char *tunable_error(comdb2_tunable_err code)
}
return "????";
}

2 changes: 2 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2434,4 +2434,6 @@ REGISTER_TUNABLE("sc_status_max_rows", "Max number of rows returned in comdb2_sc
TUNABLE_INTEGER, &gbl_sc_status_max_rows, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("rep_process_pstack_time", "pstack the server if rep_process runs longer than time specified in secs (Default: 30s)",
TUNABLE_INTEGER, &gbl_rep_process_pstack_time, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sql_recover_ddlk_duration", "Run recover_deadlock after specified duration, if an SQL statement has waiters. To disable, set to 0 (Default: 60s)",
TUNABLE_INTEGER, &gbl_sql_recover_ddlk_duration, 0, NULL, NULL, NULL, NULL);
#endif /* _DB_TUNABLES_H */
1 change: 0 additions & 1 deletion db/process_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,6 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
dbenv->txns_committed, dbenv->txns_aborted, txns_applied,
n_retries, gbl_verify_tran_replays, rep_retry, max_retries);

extern int gbl_epoch_time;
extern int gbl_starttime;
logmsg(LOGMSG_USER, "uptime %ds\n",
gbl_epoch_time - gbl_starttime);
Expand Down
1 change: 1 addition & 0 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ struct sqlclntstate {

pthread_t debug_sqlclntstate;
int last_check_time;
int last_recover_ddlk;
int query_timeout;
int statement_timedout;
struct conninfo conn;
Expand Down
23 changes: 19 additions & 4 deletions db/sqlglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@
#include <portmuxapi.h>
#include "cdb2_constants.h"
#include <translistener.h>
#include <sqlwriter.h>

int gbl_sql_recover_ddlk_duration = 60;
int gbl_delay_sql_lock_release_sec = 5;

unsigned long long get_id(bdb_state_type *);
Expand Down Expand Up @@ -611,8 +613,6 @@ static int is_sqlite_db_init(BtCursor *pCur)

int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
{
extern int gbl_epoch_time;
extern int gbl_watchdog_disable_at_start;
if (gbl_watchdog_disable_at_start)
return 0;
if (gbl_epoch_time && (gbl_epoch_time - clnt->last_check_time > 5)) {
Expand All @@ -625,6 +625,7 @@ int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
}
return 0;
}

/*
This is called every time the db does something (find/next/etc. on a cursor).
The query is aborted if this returns non-zero.
Expand All @@ -634,7 +635,6 @@ int gbl_debug_sleep_in_analyze;
static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
{
int rc;
extern int gbl_epoch_time;

if (thd == NULL)
return 0;
Expand Down Expand Up @@ -698,6 +698,21 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
goto done;
}

if (no_recover_deadlock ||
clnt->last_recover_ddlk == 0 ||
gbl_sql_recover_ddlk_duration == 0 ||
gbl_epoch_time - clnt->last_recover_ddlk < gbl_sql_recover_ddlk_duration
){
goto done;
}

rc = recover_deadlock_evbuffer(clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: recover_deadlock failed sql:\"%.32s%s\"\n",
__func__, clnt->sql, strlen(clnt->sql) > 32 ? "..." : "");
}
clnt->last_recover_ddlk = gbl_epoch_time;

done:
Pthread_mutex_unlock(&clnt->sql_tick_lk);
return rc;
Expand Down Expand Up @@ -7315,7 +7330,7 @@ int get_data(BtCursor *pCur, struct schema *sc, uint8_t *in, int fnum, Mem *m,

break;
default:
logmsg(LOGMSG_ERROR, "get_data_int: unhandled type %d\n", f->type);
logmsg(LOGMSG_ERROR, "%s: unhandled type %d query:%s\n", __func__, f->type, pCur->clnt->sql);
break;
}

Expand Down
1 change: 1 addition & 0 deletions db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -3651,6 +3651,7 @@ void run_stmt_setup(struct sqlclntstate *clnt, sqlite3_stmt *stmt)
} else {
clnt->has_recording = v->recording;
}
clnt->last_recover_ddlk = gbl_epoch_time;
clnt->nsteps = 0;
comdb2_set_sqlite_vdbe_tzname_int(v, clnt);
comdb2_set_sqlite_vdbe_dtprec_int(v, clnt);
Expand Down
1 change: 0 additions & 1 deletion lua/sp.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ extern int gbl_lua_new_trans_model;
extern int gbl_max_lua_instructions;
extern int gbl_lua_version;
extern int gbl_notimeouts;
extern int gbl_epoch_time;
extern int gbl_allow_lua_print;
extern int gbl_allow_lua_dynamic_libs;
extern int gbl_lua_prepare_max_retries;
Expand Down
11 changes: 6 additions & 5 deletions sqlite/src/func.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,8 @@ static void sleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[])
int i;
for(i = 0; i < n; i++) {
sleep(1);
int rc = comdb2_sql_tick();
if( rc ) {
sqlite3_result_error_code(context, rc);
if( comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ERROR);
return;
}
}
Expand All @@ -473,8 +472,10 @@ static void usleepFunc(sqlite3_context *context, int argc, sqlite3_value *argv[]
us = ( remain > 1000000 ) ? 1000000 : remain;
remain -= us;
usleep(us);
if( comdb2_sql_tick() )
break;
if( comdb2_sql_tick() ){
sqlite3_result_error_code(context, SQLITE_ERROR);
return;
}
}
sqlite3_result_int(context, (total - remain));
}
Expand Down
1 change: 1 addition & 0 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@
(name='sql_optimize_shadows', description='', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_queueing_critical_trace', description='Produce trace when SQL request queue is this deep.', type='INTEGER', value='100', read_only='N')
(name='sql_queueing_disable_trace', description='Disable trace when SQL requests are starting to queue.', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_recover_ddlk_duration', description='Run recover_deadlock after specified duration, if an SQL statement has waiters. To disable, set to 0 (Default: 60s)', type='INTEGER', value='60', read_only='N')
(name='sql_release_locks_in_update_shadows', description='Release sql locks in update_shadows on lockwait', type='BOOLEAN', value='ON', read_only='N')
(name='sql_release_locks_on_emit_row_lockwait', description='Release sql locks when we are about to emit a row', type='BOOLEAN', value='OFF', read_only='N')
(name='sql_release_locks_on_si_lockwait', description='Release sql locks from si if the rep thread is waiting', type='BOOLEAN', value='ON', read_only='N')
Expand Down

0 comments on commit 589ca8f

Please sign in to comment.