Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-502598: Add async query execution #672

Merged
merged 20 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f0abbf9
SNOW-502598: Add async query execution
sfc-gh-ext-simba-lf Oct 20, 2023
903a8d8
SNOW-502598: Add tests
sfc-gh-ext-simba-lf Oct 20, 2023
922acf6
SNOW-502598: Add tests
sfc-gh-ext-simba-lf Oct 20, 2023
a2e92cb
SNOW-502598: Remove redundant requestAsync() function
sfc-gh-ext-simba-lf Oct 23, 2023
1ee54ed
SNOW-502598: Replace for loop with while loop
sfc-gh-ext-simba-lf Oct 23, 2023
b79a595
SNOW-502598: Edit comment for query status retry
sfc-gh-ext-simba-lf Oct 23, 2023
0a7b7c8
SNOW-502598: Remove test comment
sfc-gh-ext-simba-lf Oct 23, 2023
8d8bd87
SNOW-502598: Parse data back into JSON
sfc-gh-ext-simba-lf Oct 23, 2023
a479942
SNOW-502598: Revert normalizing response
sfc-gh-ext-simba-lf Oct 23, 2023
a5b0fd3
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 24, 2023
038f781
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 26, 2023
72751cb
SNOW-502598: Use async for before/after tests
sfc-gh-ext-simba-lf Oct 27, 2023
5ecda9b
SNOW-502598: Extract running and error statuses
sfc-gh-ext-simba-lf Oct 27, 2023
e2240a0
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 27, 2023
3624ae7
SNOW-502598: Refactor error into the correct type
sfc-gh-ext-simba-lf Oct 27, 2023
924052c
SNOW-502598: Keep asyncExec option for execute only
sfc-gh-ext-simba-lf Oct 27, 2023
6c00000
SNOW-502598: Format unused params in tests
sfc-gh-ext-simba-lf Oct 28, 2023
357359a
SNOW-502598: Refactor getQueryStatusThrowIfError
sfc-gh-ext-simba-lf Oct 31, 2023
46748bb
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 31, 2023
6944e99
Merge branch 'master' into SNOW-502598-async-query-exec
sfc-gh-ext-simba-lf Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const Url = require('url');
const QueryString = require('querystring');
const GSErrors = require('../constants/gs_errors')
const QueryStatus = require('../constants/query_status');

var Util = require('../util');
var Errors = require('../errors');
Expand Down Expand Up @@ -37,6 +38,15 @@
// generate an id for the connection
var id = uuidv4();

// async max retry and retry pattern from python connector
const asyncNoDataMaxRetry = 24;
const asyncRetryPattern = [1, 1, 2, 3, 4, 8, 10];
const asyncRetryInMilliseconds = 500;

// Custom regex based on uuid validate
// Unable to directly use uuid validate because the queryId returned from the server doesn't match the regex
const queryIdRegex = new RegExp(/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000)$/i);

//Make session tokens available for testing
this.getTokens = function ()
{
Expand Down Expand Up @@ -411,6 +421,164 @@
return this;
};

/**
* Gets the response containing the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {Object} the query response
*/
async function getQueryResponse(queryId) {
// Check if queryId exists and is valid uuid
Errors.checkArgumentExists(Util.exists(queryId),
ErrorCodes.ERR_CONN_FETCH_RESULT_MISSING_QUERY_ID);
Errors.checkArgumentValid(queryIdRegex.test(queryId),
ErrorCodes.ERR_GET_RESPONSE_QUERY_INVALID_UUID, queryId);

// Form the request options
const options =
{
method: 'GET',
url: Url.format(
{
pathname: `/monitoring/queries/${queryId}`
}),
};

// Get the response containing the query status
const response = await services.sf.requestAsync(options);

return response['data'];
}

/**
* Extracts the status of the query from the query response.
*
* @param {Object} queryResponse
*
* @returns {String} the query status.
*/
function extractQueryStatus(queryResponse) {
const queries = queryResponse['data']['queries'];
let status = QueryStatus.code.NO_DATA; // default status
if (queries.length > 0) {
status = queries[0]['status'];
}

return status;
}

/**
* Gets the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatus = async function (queryId) {
return extractQueryStatus(await getQueryResponse(queryId));
};

/**
* Gets the status of the query based on queryId and throws if there's an error.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatusThrowIfError = async function (queryId) {
sfc-gh-pmotacki marked this conversation as resolved.
Show resolved Hide resolved
const status = this.getQueryStatus(queryId);

let message, code, sqlState = null;

if (this.isAnError(status)) {
const response = await getQueryResponse(queryId);
message = response['message'] || '';
code = response['code'] || -1;

Check warning on line 497 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L495-L497

Added lines #L495 - L497 were not covered by tests

if (response['data']) {
message += response['data']['queries'].length > 0 ? response['data']['queries'][0]['errorMessage'] : '';
sqlState = response['data']['sqlState'];

Check warning on line 501 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L499-L501

Added lines #L499 - L501 were not covered by tests
}

throw Errors.createOperationFailedError(

Check warning on line 504 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L504

Added line #L504 was not covered by tests
code, response, message, sqlState);
}

return status;
};

/**
* Gets the results from a previously ran query based on queryId
*
* @param {Object} options
*
* @returns {Object}
*/
this.getResultsFromQueryId = async function (options) {
const queryId = options.queryId;
let status, noDataCounter = 0, retryPatternPos = 0;

// Wait until query has finished executing
let queryStillExecuting = true;
while (queryStillExecuting) {
// Check if query is still running and trigger exception if it failed
status = await this.getQueryStatusThrowIfError(queryId);
queryStillExecuting = this.isStillRunning(status);
if (!queryStillExecuting) {
break;
}

// Timeout based on query status retry rules
await new Promise((resolve) => {
setTimeout(() => resolve(), asyncRetryInMilliseconds * asyncRetryPattern[retryPatternPos]);
});

// If no data, increment the no data counter
if (QueryStatus.code[status] == QueryStatus.code.NO_DATA) {
noDataCounter++;

Check warning on line 539 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L539

Added line #L539 was not covered by tests
// Check if retry for no data is exceeded
if (noDataCounter > asyncNoDataMaxRetry) {
throw Errors.createClientError(

Check warning on line 542 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L541-L542

Added lines #L541 - L542 were not covered by tests
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NO_DATA, true, queryId);
}
}

if (retryPatternPos < asyncRetryPattern.length - 1) {
retryPatternPos++;
}
}

if (QueryStatus.code[status] != QueryStatus.code.SUCCESS) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS, true, queryId, status);
}

return this.fetchResult(options);
};

/**
* Checks whether the given status is currently running.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isStillRunning = function (status) {
return QueryStatus.runningStatuses.includes(QueryStatus.code[status]);
};

/**
* Checks whether the given status means that there has been an error.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isAnError = function (status) {
return QueryStatus.errorStatuses.includes(QueryStatus.code[status]);
};

/**
* Returns a serialized version of this connection.
*
Expand Down
40 changes: 36 additions & 4 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var statementTypes =
FILE_POST_EXEC: 'FILE_POST_EXEC'
};

const queryCodes = {
QUERY_IN_PROGRESS: '333333', // GS code: the query is in progress
QUERY_IN_PROGRESS_ASYNC: '333334' // GS code: the query is detached
};

exports.createContext = function (
options, services, connectionConfig)
{
Expand Down Expand Up @@ -364,6 +369,12 @@ function createContextPreExec(
RowMode.checkRowModeValid(rowMode);
}

// if an asyncExec flag is specified, make sure it's boolean
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),
ErrorCodes.ERR_CONN_EXEC_STMT_INVALID_ASYNC_EXEC);
}

// create a statement context
var statementContext = createStatementContext();

Expand All @@ -374,6 +385,7 @@ function createContextPreExec(
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// if a binds array is specified, add it to the statement context
if (Util.exists(statementOptions.binds))
Expand Down Expand Up @@ -682,10 +694,13 @@ function invokeStatementComplete(statement, context)
streamResult = context.connectionConfig.getStreamResult();
}

// if the result will be streamed later,
// if the result will be streamed later or in asyncExec mode,
// invoke the complete callback right away
if (streamResult) {
context.complete(Errors.externalize(context.resultError), statement);
} else if (context.asyncExec) {
// return the result object with the query ID inside.
context.complete(null, statement, context.result);
} else {
process.nextTick(function () {
// aggregate all the rows into an array and pass this
Expand Down Expand Up @@ -779,6 +794,12 @@ function createOnStatementRequestSuccRow(statement, context)
// if we don't already have a result
if (!context.result)
{
if (body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC) {
context.result = {
queryId: body.data.queryId
};
return;
}
if (body.data.resultIds != undefined && body.data.resultIds.length > 0)
{
//multi statements
Expand Down Expand Up @@ -1330,6 +1351,11 @@ function sendRequestPreExec(statementContext, onResultAvailable)
json.queryContextDTO = statementContext.services.sf.getQueryContextDTO();
}

// include the asyncExec flag if a value was specified
if (Util.exists(statementContext.asyncExec)) {
json.asyncExec = statementContext.asyncExec;
}

// use the snowflake service to issue the request
sendSfRequest(statementContext,
{
Expand Down Expand Up @@ -1645,9 +1671,15 @@ function buildResultRequestCallback(
statementContext.queryId = body.data.queryId;

// if the result is not ready yet, extract the result url from the response
// and issue a GET request to try to fetch the result again
if (body && (body.code === '333333' || body.code === '333334'))
{
// and issue a GET request to try to fetch the result again unless asyncExec is enabled.
if (body && (body.code === queryCodes.QUERY_IN_PROGRESS
|| body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC)) {

if (statementContext.asyncExec) {
await onResultAvailable.call(null, err, body);
return;
}

// extract the result url from the response and try to get the result
// again
sendSfRequest(statementContext,
Expand Down
10 changes: 8 additions & 2 deletions lib/constants/error_messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ exports[409010] = 'Invalid streamResult flag. The specified value must be a bool
exports[409011] = 'Invalid fetchAsString value. The specified value must be an Array.';
exports[409012] = 'Invalid fetchAsString type: %s. The supported types are: String, Boolean, Number, Date, Buffer, and JSON.';
exports[409013] = 'Invalid requestId. The specified value must be a string.';
exports[409014] = 'Invalid asyncExec. The specified value must be a boolean.'

// 410001
exports[410001] = 'Fetch-result options must be specified.';
exports[410002] = 'Invalid options. The specified value must be an object.';
exports[410003] = 'A statement id must be specified.';
exports[410004] = 'Invalid statement id. The specified value must be a string.';
exports[410003] = 'A query id/statement id must be specified.';
exports[410004] = 'Invalid query id/statement id. The specified value must be a string.';
exports[410005] = 'Invalid complete callback. The specified value must be a function.';
exports[410006] = 'Invalid streamResult flag. The specified value must be a boolean.';
exports[410007] = 'Invalid fetchAsString value. The specified value must be an Array.';
Expand Down Expand Up @@ -152,3 +153,8 @@ exports[450004] = 'Invalid each() callback. The specified value must be a functi
exports[450005] = 'An end() callback must be specified.';
exports[450006] = 'Invalid end() callback. The specified value must be a function.';
exports[450007] = 'Operation failed because the statement is still in progress.';

// 460001
exports[460001] = 'Invalid queryId: %s';
exports[460002] = 'Cannot retrieve data. No information returned from server for query %s';
exports[460003] = 'Status of query %s is %s, results are unavailable';
45 changes: 45 additions & 0 deletions lib/constants/query_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

const code = {};

code.RUNNING = 'RUNNING';
code.ABORTING = 'ABORTING';
code.SUCCESS = 'SUCCESS';
code.FAILED_WITH_ERROR = 'FAILED_WITH_ERROR';
code.ABORTED = 'ABORTED';
code.QUEUED = 'QUEUED';
code.FAILED_WITH_INCIDENT = 'FAILED_WITH_INCIDENT';
code.DISCONNECTED = 'DISCONNECTED';
code.RESUMING_WAREHOUSE = 'RESUMING_WAREHOUSE';
// purposeful typo.Is present in QueryDTO.java
code.QUEUED_REPARING_WAREHOUSE = 'QUEUED_REPARING_WAREHOUSE';
code.RESTARTED = 'RESTARTED';
code.BLOCKED = 'BLOCKED';
code.NO_DATA = 'NO_DATA';

// All running query statuses
const runningStatuses =
[
code.RUNNING,
code.RESUMING_WAREHOUSE,
code.QUEUED,
code.QUEUED_REPARING_WAREHOUSE,
code.NO_DATA,
];

// All error query statuses
const errorStatuses =
[
code.ABORTING,
code.FAILED_WITH_ERROR,
code.ABORTED,
code.FAILED_WITH_INCIDENT,
code.DISCONNECTED,
code.BLOCKED,
];

exports.code = code;
exports.runningStatuses = runningStatuses;
exports.errorStatuses = errorStatuses;
6 changes: 6 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ codes.ERR_CONN_EXEC_STMT_INVALID_STREAM_RESULT = 409010;
codes.ERR_CONN_EXEC_STMT_INVALID_FETCH_AS_STRING = 409011;
codes.ERR_CONN_EXEC_STMT_INVALID_FETCH_AS_STRING_VALUES = 409012;
codes.ERR_CONN_EXEC_STMT_INVALID_REQUEST_ID = 409013;
codes.ERR_CONN_EXEC_STMT_INVALID_ASYNC_EXEC = 409014;

// 410001
codes.ERR_CONN_FETCH_RESULT_MISSING_OPTIONS = 410001;
Expand Down Expand Up @@ -159,6 +160,11 @@ codes.ERR_STMT_FETCH_ROWS_MISSING_END = 450005;
codes.ERR_STMT_FETCH_ROWS_INVALID_END = 450006;
codes.ERR_STMT_FETCH_ROWS_FETCHING_RESULT = 450007;

// 460001
codes.ERR_GET_RESPONSE_QUERY_INVALID_UUID = 460001;
codes.ERR_GET_RESULTS_QUERY_ID_NO_DATA = 460002;
codes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS = 460003;

exports.codes = codes;

/**
Expand Down
9 changes: 8 additions & 1 deletion lib/http/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ HttpClient.prototype.requestAsync = async function (options)
{
const requestOptions = prepareRequestOptions.call(this, options);

return axios.request(requestOptions);
let response = await axios.request(requestOptions);

if (Util.isString(response['data']) &&
Copy link
Contributor

@sfc-gh-fpawlowski sfc-gh-fpawlowski Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sfc-gh-ext-simba-lf - we have been refactoring and some questions arose from this part of code.
Specifically - did You ever noticed such case, that response['data'] is string, but 'content-type' headers are set to 'application/json' at the same time?

We expected axios to parse such data automatically (ad. e.g. https://masteringjs.io/tutorials/axios/json).
Could You provide some reproduction for such scenario?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the tests added with the PR can help repro that scenario. Or a simpler repro would be something like:

connection.execute({
	sqlText: "select 1;",
	asyncExec: true,
	complete: async function (err, stmt) {
		queryId = stmt.getQueryId();
		await connection.getQueryStatus(queryId);
	}
});

The getQueryStatus function receives a response in string that is parsed into JSON

response['headers']['content-type'] === 'application/json') {
response['data'] = JSON.parse(response['data']);
}

return response;
};

/**
Expand Down
Loading
Loading