From 76ac1f18a5a308f37f803488abf1c2a1fde9fec2 Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 4 Dec 2024 09:17:27 -0800 Subject: [PATCH] Add more test cases, fix async fetching bugs --- include/snowflake/platform.h | 3 + lib/client.c | 217 ++++++++++++++++++----------------- lib/connection.c | 3 +- lib/http_perform.c | 18 +-- lib/platform.c | 9 ++ tests/test_async.c | 106 ++++++++++++++++- 6 files changed, 231 insertions(+), 125 deletions(-) diff --git a/include/snowflake/platform.h b/include/snowflake/platform.h index 807d6904f0..bc75f85843 100755 --- a/include/snowflake/platform.h +++ b/include/snowflake/platform.h @@ -142,6 +142,9 @@ void STDCALL sf_memory_error_handler(); // this should be called by application before any calls of sfclient void STDCALL sf_exception_on_memory_failure(); +void STDCALL sf_sleep_ms(int sleep_ms); + + #ifdef __cplusplus } #endif diff --git a/lib/client.c b/lib/client.c index a0654d759e..41fcf9f8eb 100644 --- a/lib/client.c +++ b/lib/client.c @@ -133,7 +133,7 @@ char *get_query_metadata(SF_STMT* sfstmt) { return metadata; } SF_FREE(status_query); - log_error("Error getting query metadata. Query id: %s", sfstmt->sfqid); + log_info("No query metadata found. Query id: %s", sfstmt->sfqid); return NULL; } @@ -149,22 +149,8 @@ SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt) { char* queryStatus = snowflake_cJSON_GetStringValue(status); ret = get_status_from_string(queryStatus); } - else { - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_GENERAL, - "Error retrieving the status from the metadata.", - NULL, - sfstmt->sfqid); - } snowflake_cJSON_Delete(metadataJson); } - else { - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_GENERAL, - "Error retrieving query metadata.", - NULL, - sfstmt->sfqid); - } return ret; } @@ -185,7 +171,7 @@ sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { * Get the results of the async query * @param sfstmt The SF_STMT context */ -void get_real_results(SF_STMT *sfstmt) { +sf_bool get_real_results(SF_STMT *sfstmt) { //Get status until query is complete or timed out SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); int retry = 0; @@ -197,7 +183,7 @@ void get_real_results(SF_STMT *sfstmt) { if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]); - return; + break; } if (query_status == SF_QUERY_STATUS_NO_DATA) { no_data_retry++; @@ -210,22 +196,19 @@ void get_real_results(SF_STMT *sfstmt) { "Cannot retrieve data on the status of this query.", NULL, sfstmt->sfqid); - return; + break; } } int sleep_time = retry_pattern[retry] * 500; -#ifdef _WIN32 - Sleep(sleep_time); -#else - usleep(sleep_time * 1000); -#endif + sf_sleep_ms(sleep_time); if (retry < max_retries) { retry++; } else { log_error( "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + break; } query_status = snowflake_get_query_status(sfstmt); } @@ -237,6 +220,7 @@ void get_real_results(SF_STMT *sfstmt) { SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query)); if (ret != SF_STATUS_SUCCESS) { snowflake_propagate_error(sfstmt->connection, sfstmt); + return SF_BOOLEAN_FALSE; } // Get query stats @@ -251,6 +235,7 @@ void get_real_results(SF_STMT *sfstmt) { sfstmt->stats = set_stats(stats); } } + return SF_BOOLEAN_TRUE; } #define _SF_STMT_TYPE_DML 0x3000 @@ -1962,8 +1947,12 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return SF_STATUS_ERROR_GENERAL; + } } clear_snowflake_error(&sfstmt->error); @@ -2177,6 +2166,9 @@ SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) { } SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) { + if (!sfstmt->is_async) { + sfstmt->is_async = SF_BOOLEAN_TRUE; + } return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE); } @@ -2463,124 +2455,127 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, // Determine query result format and detach rowset object from data. cJSON * qrf = snowflake_cJSON_GetObjectItem(data, "queryResultFormat"); - char * qrf_str = snowflake_cJSON_GetStringValue(qrf); - sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); - cJSON * rowset = NULL; + if (qrf) { + char* qrf_str = snowflake_cJSON_GetStringValue(qrf); + sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); + cJSON* rowset = NULL; - if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { + if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { #ifdef SF_WIN32 SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT, - "Query results were fetched using Arrow, " - "but the client library does not yet support decoding Arrow results", "", - sfstmt->sfqid); + "Query results were fetched using Arrow, " + "but the client library does not yet support decoding Arrow results", "", + sfstmt->sfqid); return SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT; #endif - *((QueryResultFormat_t *) sfstmt->qrf) = ARROW_FORMAT; + * ((QueryResultFormat_t*)sfstmt->qrf) = ARROW_FORMAT; rowset = snowflake_cJSON_DetachItemFromObject(data, "rowsetBase64"); if (!rowset) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else if (strcmp(qrf_str, "json") == 0) { - *((QueryResultFormat_t *) sfstmt->qrf) = JSON_FORMAT; - if (json_detach_array_from_object((cJSON **)(&rowset), data, "rowset")) + } + else if (strcmp(qrf_str, "json") == 0) { + *((QueryResultFormat_t*)sfstmt->qrf) = JSON_FORMAT; + if (json_detach_array_from_object((cJSON**)(&rowset), data, "rowset")) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else { + } + else { log_error("Unsupported query result format: %s", qrf_str); - } + } - // Index starts at 0 and incremented each fetch - sfstmt->total_row_index = 0; + // Index starts at 0 and incremented each fetch + sfstmt->total_row_index = 0; - // When the result set is sufficient large, the server response will contain - // an empty "rowset" object. Instead, it will have a "chunks" object that contains, - // among other fields, a URL from which the result set can be downloaded in chunks. - // In this case, we initialize the chunk downloader, which will download in the - // background as calls to snowflake_fetch() are made. - if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { + // When the result set is sufficient large, the server response will contain + // an empty "rowset" object. Instead, it will have a "chunks" object that contains, + // among other fields, a URL from which the result set can be downloaded in chunks. + // In this case, we initialize the chunk downloader, which will download in the + // background as calls to snowflake_fetch() are made. + if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { // We don't care if there is no qrmk, so ignore return code json_copy_string(&qrmk, data, "qrmk"); chunk_headers = snowflake_cJSON_GetObjectItem(data, "chunkHeaders"); NON_JSON_RESP* (*callback_create_resp)(void) = NULL; - if (ARROW_FORMAT == *((QueryResultFormat_t *)sfstmt->qrf)) { - callback_create_resp = callback_create_arrow_resp; + if (ARROW_FORMAT == *((QueryResultFormat_t*)sfstmt->qrf)) { + callback_create_resp = callback_create_arrow_resp; } sfstmt->chunk_downloader = chunk_downloader_init( - qrmk, - chunk_headers, - chunks, - 2, // thread count - 4, // fetch slot - &sfstmt->error, - sfstmt->connection->insecure_mode, - sfstmt->connection->ocsp_fail_open, - callback_create_resp, - sfstmt->connection->proxy, - sfstmt->connection->no_proxy, - get_retry_timeout(sfstmt->connection), - sfstmt->connection->retry_count); + qrmk, + chunk_headers, + chunks, + 2, // thread count + 4, // fetch slot + &sfstmt->error, + sfstmt->connection->insecure_mode, + sfstmt->connection->ocsp_fail_open, + callback_create_resp, + sfstmt->connection->proxy, + sfstmt->connection->no_proxy, + get_retry_timeout(sfstmt->connection), + sfstmt->connection->retry_count); if (!sfstmt->chunk_downloader) { - // Unable to create chunk downloader. - // Error is set in chunk_downloader_init function. - goto cleanup; + // Unable to create chunk downloader. + // Error is set in chunk_downloader_init function. + goto cleanup; } // Even when the result set is split into chunks, JSON format will still // response with the first chunk in "rowset", so be sure to include it. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *)sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } - } else { + } + else { // Create a result set object and update the total rowcount. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *) sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } + } } } } else if (json_error != SF_JSON_ERROR_NONE) { @@ -2660,8 +2655,12 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } } return sfstmt->total_rowcount; @@ -2673,8 +2672,12 @@ int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } } return sfstmt->total_fieldcount; @@ -2687,8 +2690,12 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return 0; + } } ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params; diff --git a/lib/connection.c b/lib/connection.c index d6588eb655..1b728230ae 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -413,8 +413,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, break; } - while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 || - strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) { + while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0) { // Remove old result URL and query code if this isn't our first rodeo SF_FREE(result_url); memset(query_code, 0, QUERYCODE_LEN); diff --git a/lib/http_perform.c b/lib/http_perform.c index b7291e2034..6bf671e349 100644 --- a/lib/http_perform.c +++ b/lib/http_perform.c @@ -36,8 +36,6 @@ dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, static int my_trace(CURL *handle, curl_infotype type, char *data, size_t size, void *userp); -static void my_sleep_ms(uint32 sleepMs); - static void dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, @@ -128,16 +126,6 @@ int my_trace(CURL *handle, curl_infotype type, return 0; } -static -void my_sleep_ms(uint32 sleepMs) -{ -#ifdef _WIN32 - Sleep(sleepMs); -#else - usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) -#endif -} - sf_bool STDCALL http_perform(CURL *curl, SF_REQUEST_TYPE request_type, char *url, @@ -371,7 +359,7 @@ sf_bool STDCALL http_perform(CURL *curl, if ((renew_injection) && (renew_timeout > 0) && elapsed_time && (*elapsed_time <= 0)) { - my_sleep_ms(renew_timeout * 1000); + sf_sleep_ms(renew_timeout * 1000); res = CURLE_OPERATION_TIMEDOUT; } @@ -387,7 +375,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d second", curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs*1000); + sf_sleep_ms(next_sleep_in_secs*1000); } else if ((res == CURLE_OPERATION_TIMEDOUT) && (renew_timeout > 0)) { retry = SF_BOOLEAN_TRUE; } else { @@ -436,7 +424,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d seconds", http_code, curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs * 1000); + sf_sleep_ms(next_sleep_in_secs * 1000); } else { char msg[1024]; diff --git a/lib/platform.c b/lib/platform.c index baf5a0ead3..8c66157016 100755 --- a/lib/platform.c +++ b/lib/platform.c @@ -236,6 +236,15 @@ struct tm* sfchrono_localtime(const time_t *timep, struct tm *tm) } #endif +void STDCALL sf_sleep_ms(int sleep_ms) +{ +#ifdef _WIN32 + Sleep(sleep_ms); +#else + usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +} + struct tm *STDCALL sf_gmtime(const time_t *timep, struct tm *result) { #ifdef _WIN32 return sfchrono_gmtime(timep, result); diff --git a/tests/test_async.c b/tests/test_async.c index 4cbfd999b2..11eb7652d6 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -44,6 +44,94 @@ void test_select(void **unused) { snowflake_term(sf); } +/** + * Test normal getting query status + */ +void test_query_status(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + assert_int_equal(query_status, SF_QUERY_STATUS_RUNNING); + + int retries = 0; + while (query_status != SF_QUERY_STATUS_SUCCESS || retries > 5) { + query_status = snowflake_get_query_status(sfstmt); + sf_sleep_ms(2000); + retries++; + } + + /* get results */ + char *out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +/** + * Test premature fetch + */ +void test_premature_fetch(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + char* out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + /** * Test async with new connection */ @@ -85,11 +173,9 @@ void test_new_connection(void** unused) { int64 out = 0; assert_int_equal(snowflake_num_rows(async_sfstmt), 1); - int counter = 0; while ((status = snowflake_fetch(async_sfstmt)) == SF_STATUS_SUCCESS) { snowflake_column_as_int64(async_sfstmt, 1, &out); assert_int_equal(out, 1); - ++counter; } if (status != SF_STATUS_EOF) { dump_error(&(async_sfstmt->error)); @@ -113,11 +199,23 @@ void test_fake_table(void** unused) { status = snowflake_prepare(sfstmt, "select * from fake_table;", 0); assert_int_equal(status, SF_STATUS_SUCCESS); status = snowflake_async_execute(sfstmt); + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + assert_int_equal(query_status, SF_QUERY_STATUS_FAILED_WITH_ERROR); + + /* get results */ + char* out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + status = snowflake_fetch(sfstmt); assert_int_equal(status, SF_STATUS_ERROR_GENERAL); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); } /** - * Test async query with fake table + * Test async query with invalid query id */ void test_invalid_query_id(void** unused) { SF_CONNECT* sf = setup_snowflake_connection(); @@ -140,6 +238,8 @@ int main(void) { initialize_test(SF_BOOLEAN_FALSE); const struct CMUnitTest tests[] = { cmocka_unit_test(test_select), + cmocka_unit_test(test_query_status), + cmocka_unit_test(test_premature_fetch), cmocka_unit_test(test_new_connection), cmocka_unit_test(test_fake_table), cmocka_unit_test(test_invalid_query_id),