Skip to content

Commit

Permalink
SNOW-878098 retry strategy fix to match the design doc (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Harry Xi authored Nov 3, 2023
1 parent 892baf4 commit cd072c3
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 93 deletions.
17 changes: 13 additions & 4 deletions include/snowflake/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ extern "C" {
/**
* Login timeout in seconds
*/
// make the login timetout defaults to 300 to be inline with retry timeout
// while customer can reduce it as needed
#define SF_LOGIN_TIMEOUT 300

/**
* network timeout other than login requests
* retry timeout in seconds
*/
#define SF_NETWORK_TIMEOUT 120
#define SF_RETRY_TIMEOUT 300

/**
* max retry number for login reuests (login/authenticator/token)
* max retry number
*/
#define SF_LOGIN_MAX_RETRY 7
#define SF_MAX_RETRY 7

/**
* Default JWT timeout in seconds
Expand Down Expand Up @@ -244,6 +246,8 @@ typedef enum SF_ATTRIBUTE {
SF_CON_NO_PROXY,
SF_CON_DISABLE_QUERY_CONTEXT_CACHE,
SF_CON_INCLUDE_RETRY_REASON,
SF_CON_RETRY_TIMEOUT,
SF_CON_MAX_RETRY,
SF_DIR_QUERY_URL,
SF_DIR_QUERY_URL_PARAM,
SF_DIR_QUERY_TOKEN,
Expand Down Expand Up @@ -347,6 +351,8 @@ typedef struct SF_CONNECT {

int64 login_timeout;
int64 network_timeout;
// retry timeout for new retry strategy
int64 retry_timeout;

// Session specific fields
int64 sequence_counter;
Expand All @@ -363,6 +369,9 @@ typedef struct SF_CONNECT {

int8 retry_on_connect_count;

// max retry number for new retry strategy
int8 retry_count;

// Error
SF_ERROR_STRUCT error;
} SF_CONNECT;
Expand Down
49 changes: 34 additions & 15 deletions lib/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ _snowflake_check_connection_parameters(SF_CONNECT *sf) {
log_debug("timezone: %s", sf->timezone);
log_debug("login_timeout: %d", sf->login_timeout);
log_debug("network_timeout: %d", sf->network_timeout);
log_debug("retry_timeout: %d", sf->retry_timeout);
log_debug("retry_count: %d", sf->retry_count);
log_debug("qcc_disable: %s", sf->qcc_disable ? "true" : "false");
log_debug("include_retry_reason: %s", sf->include_retry_reason ? "true" : "false");

Expand Down Expand Up @@ -645,6 +647,8 @@ SF_CONNECT *STDCALL snowflake_init() {

// Make sure memory was actually allocated
if (sf) {
// seed the rand
srand(time(NULL));
// Initialize object with default values
sf->host = NULL;
sf->port = NULL;
Expand Down Expand Up @@ -677,7 +681,8 @@ SF_CONNECT *STDCALL snowflake_init() {
sf->token = NULL;
sf->master_token = NULL;
sf->login_timeout = SF_LOGIN_TIMEOUT;
sf->network_timeout = SF_NETWORK_TIMEOUT;
sf->network_timeout = 0;
sf->retry_timeout = SF_RETRY_TIMEOUT;
sf->sequence_counter = 0;
_mutex_init(&sf->mutex_sequence_counter);
sf->request_id[0] = '\0';
Expand All @@ -695,7 +700,8 @@ SF_CONNECT *STDCALL snowflake_init() {
sf->directURL = NULL;
sf->direct_query_token = NULL;
sf->retry_on_curle_couldnt_connect_count = 0;
sf->retry_on_connect_count = SF_LOGIN_MAX_RETRY;
sf->retry_on_connect_count = 0;
sf->retry_count = SF_MAX_RETRY;

sf->qcc_capacity = QCC_CAPACITY_DEF;
sf->qcc_disable = SF_BOOLEAN_FALSE;
Expand All @@ -722,7 +728,7 @@ SF_STATUS STDCALL snowflake_term(SF_CONNECT *sf) {
if (request(sf, &resp, DELETE_SESSION_URL, url_params,
sizeof(url_params) / sizeof(URL_KEY_VALUE), NULL, NULL,
POST_REQUEST_TYPE, &sf->error, SF_BOOLEAN_FALSE,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
0, sf->retry_count, get_retry_timeout(sf), NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
s_resp = snowflake_cJSON_Print(resp);
log_trace("JSON response:\n%s", s_resp);
/* Even if the session deletion fails, it will be cleaned after 7 days.
Expand Down Expand Up @@ -874,7 +880,7 @@ SF_STATUS STDCALL snowflake_connect(SF_CONNECT *sf) {
if (request(sf, &resp, SESSION_URL, url_params,
sizeof(url_params) / sizeof(URL_KEY_VALUE), s_body, NULL,
POST_REQUEST_TYPE, &sf->error, SF_BOOLEAN_FALSE,
renew_timeout, sf->retry_on_connect_count, &elapsed_time,
renew_timeout, get_login_retry_count(sf), get_login_timeout(sf), &elapsed_time,
&retried_count, &is_renew, renew_injection)) {
s_resp = snowflake_cJSON_Print(resp);
log_trace("Here is JSON response:\n%s", s_resp);
Expand Down Expand Up @@ -1054,14 +1060,17 @@ SF_STATUS STDCALL snowflake_set_attribute(
break;
case SF_CON_LOGIN_TIMEOUT:
sf->login_timeout = value ? *((int64 *) value) : SF_LOGIN_TIMEOUT;
if (sf->login_timeout < SF_LOGIN_TIMEOUT)
{
sf->login_timeout = SF_LOGIN_TIMEOUT;
}
break;
case SF_CON_NETWORK_TIMEOUT:
sf->network_timeout = value ? *((int64 *) value) : SF_NETWORK_TIMEOUT;
sf->network_timeout = value ? *((int64 *) value) : SF_LOGIN_TIMEOUT;
break;
case SF_CON_RETRY_TIMEOUT:
sf->retry_timeout = value ? *((int64 *)value) : SF_RETRY_TIMEOUT;
if ((sf->retry_timeout < SF_RETRY_TIMEOUT) && (sf->retry_timeout != 0))
{
sf->retry_timeout = SF_RETRY_TIMEOUT;
}
break;
case SF_CON_AUTOCOMMIT:
sf->autocommit = value ? *((sf_bool *) value) : SF_BOOLEAN_TRUE;
break;
Expand Down Expand Up @@ -1093,12 +1102,15 @@ SF_STATUS STDCALL snowflake_set_attribute(
sf->jwt_cnxn_wait_time = value ? *((int64 *)value) : SF_JWT_CNXN_WAIT_TIME;
break;
case SF_CON_MAX_CON_RETRY:
sf->retry_on_connect_count = value ? *((int8 *)value) : SF_LOGIN_MAX_RETRY;
if (sf->retry_on_connect_count < SF_LOGIN_MAX_RETRY)
{
sf->retry_on_connect_count = SF_LOGIN_MAX_RETRY;
}
sf->retry_on_connect_count = value ? *((int8 *)value) : SF_MAX_RETRY;
break;
case SF_CON_MAX_RETRY:
sf->retry_count = value ? *((int8 *)value) : SF_MAX_RETRY;
if ((sf->retry_count < SF_MAX_RETRY) && (sf->retry_count != 0))
{
sf->retry_count = SF_MAX_RETRY;
}
break;
case SF_CON_PROXY:
alloc_buffer_and_copy(&sf->proxy, value);
break;
Expand Down Expand Up @@ -1188,6 +1200,9 @@ SF_STATUS STDCALL snowflake_get_attribute(
case SF_CON_NETWORK_TIMEOUT:
*value = &sf->network_timeout;
break;
case SF_CON_RETRY_TIMEOUT:
*value = &sf->retry_timeout;
break;
case SF_CON_AUTOCOMMIT:
*value = &sf->autocommit;
break;
Expand Down Expand Up @@ -1227,6 +1242,9 @@ SF_STATUS STDCALL snowflake_get_attribute(
case SF_CON_MAX_CON_RETRY:
*value = &sf->retry_on_connect_count;
break;
case SF_CON_MAX_RETRY:
*value = &sf->retry_count;
break;
case SF_CON_PROXY:
*value = sf->proxy;
break;
Expand Down Expand Up @@ -2022,7 +2040,8 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,
if (request(sfstmt->connection, &resp, queryURL, url_params,
url_paramSize , s_body, NULL,
POST_REQUEST_TYPE, &sfstmt->error, is_put_get_command,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
0, sfstmt->connection->retry_count, get_retry_timeout(sfstmt->connection),
NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
// s_resp will be freed by snowflake_query_result_capture_term
s_resp = snowflake_cJSON_Print(resp);
log_trace("Here is JSON response:\n%s", s_resp);
Expand Down
108 changes: 77 additions & 31 deletions lib/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
SF_ERROR_STRUCT *error,
int64 renew_timeout,
int8 retry_max_count,
int64 retry_timeout,
int64 *elapsed_time,
int8 *retried_count,
sf_bool *is_renew,
Expand All @@ -330,28 +331,23 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
// Set to 0
memset(query_code, 0, QUERYCODE_LEN);

int64 timeout;
sf_bool is_login_request = is_login_url(url);
if (SF_BOOLEAN_TRUE == is_login_request)
sf_bool is_new_strategy_url = is_new_retry_strategy_url(url);
if (SF_BOOLEAN_TRUE == is_new_strategy_url)
{
timeout = sf->login_timeout;
if (!add_appinfo_header(sf, header, error)) {
return ret;
}
}
else
{
timeout = sf->network_timeout;
}

do {
if (!http_perform(curl, POST_REQUEST_TYPE, url, header, body, json, NULL,
timeout, SF_BOOLEAN_FALSE, error,
retry_timeout, SF_BOOLEAN_FALSE, error,
sf->insecure_mode,
sf->retry_on_curle_couldnt_connect_count,
renew_timeout, retry_max_count, elapsed_time,
retried_count, is_renew, renew_injection,
sf->proxy, sf->no_proxy, sf->include_retry_reason,
is_login_request) ||
is_new_strategy_url) ||
!*json) {
// Error is set in the perform function
break;
Expand Down Expand Up @@ -385,7 +381,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
break;
}
if (!curl_post_call(sf, curl, url, new_header, body, json,
error, renew_timeout, retry_max_count,
error, renew_timeout, retry_max_count, retry_timeout,
elapsed_time, retried_count, is_renew, renew_injection)) {
// Error is set in curl call
break;
Expand Down Expand Up @@ -421,7 +417,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
log_trace("ping pong starting...");
if (!request(sf, json, result_url, NULL, 0, NULL, header,
GET_REQUEST_TYPE, error, SF_BOOLEAN_FALSE,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
0, retry_max_count, retry_timeout, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) {
// Error came from request up, just break
stop = SF_BOOLEAN_TRUE;
break;
Expand Down Expand Up @@ -472,10 +468,10 @@ sf_bool STDCALL curl_get_call(SF_CONNECT *sf,

do {
if (!http_perform(curl, GET_REQUEST_TYPE, url, header, NULL, json, NULL,
sf->network_timeout, SF_BOOLEAN_FALSE, error,
get_retry_timeout(sf), SF_BOOLEAN_FALSE, error,
sf->insecure_mode,
sf->retry_on_curle_couldnt_connect_count,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
0, sf->retry_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE) ||
!*json) {
// Error is set in the perform function
Expand Down Expand Up @@ -548,17 +544,22 @@ STDCALL decorrelate_jitter_init(uint32 base, uint32 cap) {
}

uint32
get_next_sleep_with_jitter(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep) {
sleep = uimin(sleep, djb->cap);
// Prevents division by 0 when sleep = 1
// and if sleep == 2 the value of sleep time returned never changes.
if(sleep <= 2)
{
sleep = 4;
}
// (sleep/2) + (random from 0 to sleep) = random from sleep/2 to sleep + sleep/2
// = sleep +-50%
return ((uint32)(sleep/2) + (uint32) (rand() % (sleep + 1)));
get_next_sleep_with_jitter(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep, uint64 retry_count) {
float cur_wait_time = sleep;
cur_wait_time = choose_random(cur_wait_time + get_jitter(sleep),
pow(2, retry_count) + get_jitter(sleep));
// no cap for new retry strategy while keep the existing cap for other requests
if ((djb->cap != SF_NEW_STRATEGY_BACKOFF_CAP) && (cur_wait_time > djb->cap))
{
cur_wait_time = djb->cap;
}
// at least wait for 1 seconds
if (cur_wait_time < 1)
{
cur_wait_time = 1;
}

return (uint32)cur_wait_time;
}

char * STDCALL encode_url(CURL *curl,
Expand Down Expand Up @@ -905,6 +906,7 @@ sf_bool STDCALL request(SF_CONNECT *sf,
sf_bool use_application_json_accept_type,
int64 renew_timeout,
int8 retry_max_count,
int64 retry_timeout,
int64 *elapsed_time,
int8 *retried_count,
sf_bool *is_renew,
Expand Down Expand Up @@ -938,7 +940,7 @@ sf_bool STDCALL request(SF_CONNECT *sf,
// Execute request and set return value to result
if (request_type == POST_REQUEST_TYPE) {
ret = curl_post_call(sf, curl, encoded_url, my_header, body, json,
error, renew_timeout, retry_max_count,
error, renew_timeout, retry_max_count, retry_timeout,
elapsed_time, retried_count, is_renew,
renew_injection);
} else if (request_type == GET_REQUEST_TYPE) {
Expand Down Expand Up @@ -1022,7 +1024,7 @@ sf_bool STDCALL renew_session(CURL *curl, SF_CONNECT *sf, SF_ERROR_STRUCT *error
// Successful call, non-null json, successful success code, data object and session token must all be present
// otherwise set an error
if (!curl_post_call(sf, curl, encoded_url, header, s_body, &json, error,
0, sf->retry_on_connect_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE) ||
0, sf->retry_count, get_retry_timeout(sf), NULL, NULL, NULL, SF_BOOLEAN_FALSE) ||
!json) {
// Do nothing, let error propogate up from post call
log_error("Curl call failed during renew session");
Expand Down Expand Up @@ -1080,9 +1082,10 @@ void STDCALL retry_ctx_free(RETRY_CONTEXT *retry_ctx) {
}

uint32 STDCALL retry_ctx_next_sleep(RETRY_CONTEXT *retry_ctx) {
uint32 jittered_sleep = get_next_sleep_with_jitter(retry_ctx->djb, retry_ctx->sleep_time);
retry_ctx->sleep_time = retry_ctx->sleep_time * 2;
uint32 cur_wait_time = retry_ctx->sleep_time;
++retry_ctx->retry_count;
// caculate next sleep time
retry_ctx->sleep_time = get_next_sleep_with_jitter(retry_ctx->djb, retry_ctx->sleep_time, retry_ctx->retry_count);

// limit the sleep time within retry timeout
uint32 time_elapsed = time(NULL) - retry_ctx->start_time;
Expand All @@ -1093,7 +1096,7 @@ uint32 STDCALL retry_ctx_next_sleep(RETRY_CONTEXT *retry_ctx) {
// can be caught right after
return 1;
}
return uimin(jittered_sleep, (uint32)(retry_ctx->retry_timeout - time_elapsed));
return uimin(cur_wait_time, (uint32)(retry_ctx->retry_timeout - time_elapsed));
}

sf_bool STDCALL retry_ctx_update_url(RETRY_CONTEXT *retry_ctx,
Expand Down Expand Up @@ -1215,7 +1218,7 @@ void STDCALL sf_header_destroy(SF_HEADER *sf_header) {
SF_FREE(sf_header);
}

sf_bool is_login_url(const char * url)
sf_bool is_new_retry_strategy_url(const char * url)
{
if (!url)
{
Expand Down Expand Up @@ -1279,3 +1282,46 @@ sf_bool add_appinfo_header(SF_CONNECT *sf, SF_HEADER *header, SF_ERROR_STRUCT *e
error:
return ret;
}

float choose_random(float min, float max)
{
// get a number between 0 ~ 1
float scale = rand() / (float)RAND_MAX;
// scale to min ~ max
return min + scale * (max - min);
}

float get_jitter(int cur_wait_time) {
float multiplication_factor = choose_random(-1, 1);
float jitter_amount = 0.5 * cur_wait_time * multiplication_factor;
return jitter_amount;
}

// utility function to get the less one, take 0 as special value for unlimited
int64 get_less_one(int64 a, int64 b)
{
if (a == 0)
{
return b;
}
if (b == 0)
{
return a;
}
return a < b ? a : b;
}

int64 get_login_timeout(SF_CONNECT *sf)
{
return get_less_one(sf->login_timeout, sf->retry_timeout);
}

int64 get_retry_timeout(SF_CONNECT *sf)
{
return get_less_one(sf->network_timeout, sf->retry_timeout);
}

int8 get_login_retry_count(SF_CONNECT *sf)
{
return (int8)get_less_one(sf->retry_on_connect_count, sf->retry_count);
}
Loading

0 comments on commit cd072c3

Please sign in to comment.