Skip to content

Commit

Permalink
Add more test cases, fix async fetching bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-nl committed Dec 4, 2024
1 parent 43c5653 commit 76ac1f1
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 125 deletions.
3 changes: 3 additions & 0 deletions include/snowflake/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ void STDCALL sf_memory_error_handler();
// this should be called by application before any calls of sfclient
void STDCALL sf_exception_on_memory_failure();

void STDCALL sf_sleep_ms(int sleep_ms);


#ifdef __cplusplus
}
#endif
Expand Down
217 changes: 112 additions & 105 deletions lib/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ char *get_query_metadata(SF_STMT* sfstmt) {
return metadata;
}
SF_FREE(status_query);
log_error("Error getting query metadata. Query id: %s", sfstmt->sfqid);
log_info("No query metadata found. Query id: %s", sfstmt->sfqid);
return NULL;
}

Expand All @@ -149,22 +149,8 @@ SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt) {
char* queryStatus = snowflake_cJSON_GetStringValue(status);
ret = get_status_from_string(queryStatus);
}
else {
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_GENERAL,
"Error retrieving the status from the metadata.",
NULL,
sfstmt->sfqid);
}
snowflake_cJSON_Delete(metadataJson);
}
else {
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_GENERAL,
"Error retrieving query metadata.",
NULL,
sfstmt->sfqid);
}

return ret;
}
Expand All @@ -185,7 +171,7 @@ sf_bool is_query_still_running(SF_QUERY_STATUS query_status) {
* Get the results of the async query
* @param sfstmt The SF_STMT context
*/
void get_real_results(SF_STMT *sfstmt) {
sf_bool get_real_results(SF_STMT *sfstmt) {
//Get status until query is complete or timed out
SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt);
int retry = 0;
Expand All @@ -197,7 +183,7 @@ void get_real_results(SF_STMT *sfstmt) {
if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) {
log_error("Query status is done running and did not succeed. Status is %s",
query_status_names[query_status]);
return;
break;
}
if (query_status == SF_QUERY_STATUS_NO_DATA) {
no_data_retry++;
Expand All @@ -210,22 +196,19 @@ void get_real_results(SF_STMT *sfstmt) {
"Cannot retrieve data on the status of this query.",
NULL,
sfstmt->sfqid);
return;
break;
}
}

int sleep_time = retry_pattern[retry] * 500;
#ifdef _WIN32
Sleep(sleep_time);
#else
usleep(sleep_time * 1000);
#endif
sf_sleep_ms(sleep_time);
if (retry < max_retries) {
retry++;
}
else {
log_error(
"Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid);
break;
}
query_status = snowflake_get_query_status(sfstmt);
}
Expand All @@ -237,6 +220,7 @@ void get_real_results(SF_STMT *sfstmt) {
SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query));
if (ret != SF_STATUS_SUCCESS) {
snowflake_propagate_error(sfstmt->connection, sfstmt);
return SF_BOOLEAN_FALSE;
}

// Get query stats
Expand All @@ -251,6 +235,7 @@ void get_real_results(SF_STMT *sfstmt) {
sfstmt->stats = set_stats(stats);
}
}
return SF_BOOLEAN_TRUE;
}

#define _SF_STMT_TYPE_DML 0x3000
Expand Down Expand Up @@ -1962,8 +1947,12 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) {
}

if (sfstmt->is_async && !sfstmt->is_async_initialized) {
get_real_results(sfstmt);
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
if (get_real_results(sfstmt)) {
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
}
else {
return SF_STATUS_ERROR_GENERAL;
}
}

clear_snowflake_error(&sfstmt->error);
Expand Down Expand Up @@ -2177,6 +2166,9 @@ SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) {
}

SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) {
if (!sfstmt->is_async) {
sfstmt->is_async = SF_BOOLEAN_TRUE;
}
return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE);
}

Expand Down Expand Up @@ -2463,124 +2455,127 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt,

// Determine query result format and detach rowset object from data.
cJSON * qrf = snowflake_cJSON_GetObjectItem(data, "queryResultFormat");
char * qrf_str = snowflake_cJSON_GetStringValue(qrf);
sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t));
cJSON * rowset = NULL;
if (qrf) {
char* qrf_str = snowflake_cJSON_GetStringValue(qrf);
sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t));
cJSON* rowset = NULL;

if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) {
if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) {
#ifdef SF_WIN32
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT,
"Query results were fetched using Arrow, "
"but the client library does not yet support decoding Arrow results", "",
sfstmt->sfqid);
"Query results were fetched using Arrow, "
"but the client library does not yet support decoding Arrow results", "",
sfstmt->sfqid);

return SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT;
#endif
*((QueryResultFormat_t *) sfstmt->qrf) = ARROW_FORMAT;
* ((QueryResultFormat_t*)sfstmt->qrf) = ARROW_FORMAT;
rowset = snowflake_cJSON_DetachItemFromObject(data, "rowsetBase64");
if (!rowset)
{
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
}
}
else if (strcmp(qrf_str, "json") == 0) {
*((QueryResultFormat_t *) sfstmt->qrf) = JSON_FORMAT;
if (json_detach_array_from_object((cJSON **)(&rowset), data, "rowset"))
}
else if (strcmp(qrf_str, "json") == 0) {
*((QueryResultFormat_t*)sfstmt->qrf) = JSON_FORMAT;
if (json_detach_array_from_object((cJSON**)(&rowset), data, "rowset"))
{
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
log_error("No valid rowset found in response");
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error,
SF_STATUS_ERROR_BAD_JSON,
"Missing rowset from response. No results found.",
SF_SQLSTATE_APP_REJECT_CONNECTION,
sfstmt->sfqid);
goto cleanup;
}
}
else {
}
else {
log_error("Unsupported query result format: %s", qrf_str);
}
}

// Index starts at 0 and incremented each fetch
sfstmt->total_row_index = 0;
// Index starts at 0 and incremented each fetch
sfstmt->total_row_index = 0;

// When the result set is sufficient large, the server response will contain
// an empty "rowset" object. Instead, it will have a "chunks" object that contains,
// among other fields, a URL from which the result set can be downloaded in chunks.
// In this case, we initialize the chunk downloader, which will download in the
// background as calls to snowflake_fetch() are made.
if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) {
// When the result set is sufficient large, the server response will contain
// an empty "rowset" object. Instead, it will have a "chunks" object that contains,
// among other fields, a URL from which the result set can be downloaded in chunks.
// In this case, we initialize the chunk downloader, which will download in the
// background as calls to snowflake_fetch() are made.
if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) {
// We don't care if there is no qrmk, so ignore return code
json_copy_string(&qrmk, data, "qrmk");
chunk_headers = snowflake_cJSON_GetObjectItem(data, "chunkHeaders");
NON_JSON_RESP* (*callback_create_resp)(void) = NULL;
if (ARROW_FORMAT == *((QueryResultFormat_t *)sfstmt->qrf)) {
callback_create_resp = callback_create_arrow_resp;
if (ARROW_FORMAT == *((QueryResultFormat_t*)sfstmt->qrf)) {
callback_create_resp = callback_create_arrow_resp;
}

sfstmt->chunk_downloader = chunk_downloader_init(
qrmk,
chunk_headers,
chunks,
2, // thread count
4, // fetch slot
&sfstmt->error,
sfstmt->connection->insecure_mode,
sfstmt->connection->ocsp_fail_open,
callback_create_resp,
sfstmt->connection->proxy,
sfstmt->connection->no_proxy,
get_retry_timeout(sfstmt->connection),
sfstmt->connection->retry_count);
qrmk,
chunk_headers,
chunks,
2, // thread count
4, // fetch slot
&sfstmt->error,
sfstmt->connection->insecure_mode,
sfstmt->connection->ocsp_fail_open,
callback_create_resp,
sfstmt->connection->proxy,
sfstmt->connection->no_proxy,
get_retry_timeout(sfstmt->connection),
sfstmt->connection->retry_count);
if (!sfstmt->chunk_downloader) {
// Unable to create chunk downloader.
// Error is set in chunk_downloader_init function.
goto cleanup;
// Unable to create chunk downloader.
// Error is set in chunk_downloader_init function.
goto cleanup;
}

// Even when the result set is split into chunks, JSON format will still
// response with the first chunk in "rowset", so be sure to include it.
sfstmt->result_set = rs_create_with_json_result(
rowset,
sfstmt->desc,
(QueryResultFormat_t *)sfstmt->qrf,
sfstmt->connection->timezone);
rowset,
sfstmt->desc,
(QueryResultFormat_t*)sfstmt->qrf,
sfstmt->connection->timezone);

// Update chunk row count. Controls the chunk downloader.
sfstmt->chunk_rowcount = rs_get_row_count_in_chunk(
sfstmt->result_set,
(QueryResultFormat_t *) sfstmt->qrf);
sfstmt->result_set,
(QueryResultFormat_t*)sfstmt->qrf);

// Update total row count. Used in snowflake_num_rows().
if (json_copy_int(&sfstmt->total_rowcount, data, "total")) {
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
}
} else {
}
else {
// Create a result set object and update the total rowcount.
sfstmt->result_set = rs_create_with_json_result(
rowset,
sfstmt->desc,
(QueryResultFormat_t *) sfstmt->qrf,
sfstmt->connection->timezone);
rowset,
sfstmt->desc,
(QueryResultFormat_t*)sfstmt->qrf,
sfstmt->connection->timezone);

// Update chunk row count. Controls the chunk downloader.
sfstmt->chunk_rowcount = rs_get_row_count_in_chunk(
sfstmt->result_set,
(QueryResultFormat_t *) sfstmt->qrf);
sfstmt->result_set,
(QueryResultFormat_t*)sfstmt->qrf);

// Update total row count. Used in snowflake_num_rows().
if (json_copy_int(&sfstmt->total_rowcount, data, "total")) {
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
log_warn(
"No total count found in response. Reverting to using array size of results");
sfstmt->total_rowcount = sfstmt->chunk_rowcount;
}
}
}
}
} else if (json_error != SF_JSON_ERROR_NONE) {
Expand Down Expand Up @@ -2660,8 +2655,12 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) {
}

if (sfstmt->is_async && !sfstmt->is_async_initialized) {
get_real_results(sfstmt);
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
if (get_real_results(sfstmt)) {
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
}
else {
return -1;
}
}

return sfstmt->total_rowcount;
Expand All @@ -2673,8 +2672,12 @@ int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) {
}

if (sfstmt->is_async && !sfstmt->is_async_initialized) {
get_real_results(sfstmt);
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
if (get_real_results(sfstmt)) {
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
}
else {
return -1;
}
}

return sfstmt->total_fieldcount;
Expand All @@ -2687,8 +2690,12 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) {
}

if (sfstmt->is_async && !sfstmt->is_async_initialized) {
get_real_results(sfstmt);
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
if (get_real_results(sfstmt)) {
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE;
}
else {
return 0;
}
}

ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params;
Expand Down
3 changes: 1 addition & 2 deletions lib/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
break;
}

while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 ||
strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) {
while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0) {
// Remove old result URL and query code if this isn't our first rodeo
SF_FREE(result_url);
memset(query_code, 0, QUERYCODE_LEN);
Expand Down
Loading

0 comments on commit 76ac1f1

Please sign in to comment.