Skip to content

Commit

Permalink
SNOW-502598: Add async query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-lf committed Oct 20, 2023
1 parent 2e711fd commit f0abbf9
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 23 deletions.
185 changes: 185 additions & 0 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { v4: uuidv4 } = require('uuid');
const Url = require('url');
const QueryString = require('querystring');
const GSErrors = require('../constants/gs_errors')
const QueryStatus = require('../constants/query_status').code;

var Util = require('../util');
var Errors = require('../errors');
Expand Down Expand Up @@ -37,6 +38,15 @@ function Connection(context)
// generate an id for the connection
var id = uuidv4();

// async max retry and retry pattern from python connector
const asyncNoDataMaxRetry = 24;
const asyncRetryPattern = [1, 1, 2, 3, 4, 8, 10];
const asyncRetryInMilliseconds = 500;

// Custom regex based on uuid validate
// Unable to directly use uuid validate because the queryId returned from the server doesn't match the regex
const queryIdRegex = new RegExp(/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000)$/i);

//Make session tokens available for testing
this.getTokens = function ()
{
Expand Down Expand Up @@ -411,6 +421,181 @@ function Connection(context)
return this;
};

/**
* Gets the response containing the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {Object} the query response
*/
async function getQueryResponse(queryId) {
// Check if queryId exists and is valid uuid
Errors.checkArgumentExists(Util.exists(queryId),
ErrorCodes.ERR_CONN_FETCH_RESULT_MISSING_QUERY_ID);
Errors.checkArgumentValid(queryIdRegex.test(queryId),
ErrorCodes.ERR_GET_RESPONSE_QUERY_INVALID_UUID, queryId);

// Form the request options
const options =
{
method: 'GET',
url: Url.format(
{
pathname: `/monitoring/queries/${queryId}`
}),
};

// Get the response containing the query status
const response = await services.sf.requestAsync(options);

return response['body'];
}

/**
* Extracts the status of the query from the query response.
*
* @param {Object} queryResponse
*
* @returns {String} the query status.
*/
function extractQueryStatus(queryResponse) {
const queries = queryResponse['data']['queries'];
let status = QueryStatus.NO_DATA; // default status
if (queries.length > 0) {
status = queries[0]['status'];
}

return status;
}

/**
* Gets the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatus = async function (queryId) {
return extractQueryStatus(await getQueryResponse(queryId));
};

/**
* Gets the status of the query based on queryId and throws if there's an error.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatusThrowIfError = async function (queryId) {
const response = await getQueryResponse(queryId);
const queries = response['data']['queries'];
const status = extractQueryStatus(response);
let message, code, sqlState = null;

if (this.isAnError(status)) {
message = response['message'] || '';
code = response['code'] || -1;

if (response['data']) {
message += queries.length > 0 ? queries[0]['errorMessage'] : '';
sqlState = response['data']['sqlState'];
}

throw Errors.createOperationFailedError(
code, response, message, sqlState);
}

return status;
};

/**
* Gets the results from a previously ran query based on queryId
*
* @param {Object} options
*
* @returns {Object}
*/
this.getResultsFromQueryId = async function (options) {
const queryId = options.queryId;
let status, noDataCounter = 0, retryPatternPos = 0;

// Wait until query has finished executing
for (;;) {
// Check if query is still running and trigger exception if it failed
status = await this.getQueryStatusThrowIfError(queryId);
if (!this.isStillRunning(status)) {
break;
}

// Timeout based on python connector
await new Promise((resolve) => {
setTimeout(() => resolve(), asyncRetryInMilliseconds * asyncRetryPattern[retryPatternPos]);
});

// If no data, increment the no data counter
if (QueryStatus[status] == QueryStatus.NO_DATA) {
noDataCounter++;
// Check if retry for no data is exceeded
if (noDataCounter > asyncNoDataMaxRetry) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NO_DATA, true, queryId);
}
}

if (retryPatternPos < asyncRetryPattern.length - 1) {
retryPatternPos++;
}
}

if (QueryStatus[status] != QueryStatus.SUCCESS) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS, true, queryId, status);
}

return this.fetchResult(options);
};

/**
* Checks whether the given status is currently running.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isStillRunning = function (status) {
const stillRunning =
[
QueryStatus.RUNNING,
QueryStatus.RESUMING_WAREHOUSE,
QueryStatus.QUEUED,
QueryStatus.QUEUED_REPARING_WAREHOUSE,
QueryStatus.NO_DATA,
].includes(QueryStatus[status]);

return stillRunning;
};

/**
* Checks whether the given status means that there has been an error.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isAnError = function (status) {
const isAnError =
[
QueryStatus.ABORTING,
QueryStatus.FAILED_WITH_ERROR,
QueryStatus.ABORTED,
QueryStatus.FAILED_WITH_INCIDENT,
QueryStatus.DISCONNECTED,
QueryStatus.BLOCKED,
].includes(QueryStatus[status]);

return isAnError;
};

/**
* Returns a serialized version of this connection.
*
Expand Down
47 changes: 43 additions & 4 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var statementTypes =
FILE_POST_EXEC: 'FILE_POST_EXEC'
};

const queryCodes = {
QUERY_IN_PROGRESS: '333333', // GS code: the query is in progress
QUERY_IN_PROGRESS_ASYNC: '333334' // GS code: the query is detached
};

exports.createContext = function (
options, services, connectionConfig)
{
Expand Down Expand Up @@ -187,6 +192,12 @@ exports.createStatementPostExec = function (
JSON.stringify(fetchAsString[invalidValueIndex]));
}

// check for invalid asyncExec
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),
ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC);
}

const rowMode = statementOptions.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
Expand All @@ -206,6 +217,7 @@ exports.createStatementPostExec = function (
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// set the statement type
statementContext.type = (statementContext.type == statementTypes.ROW_PRE_EXEC) ? statementTypes.ROW_POST_EXEC : statementTypes.FILE_POST_EXEC;
Expand Down Expand Up @@ -364,6 +376,12 @@ function createContextPreExec(
RowMode.checkRowModeValid(rowMode);
}

// if an asyncExec flag is specified, make sure it's boolean
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),
ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC);
}

// create a statement context
var statementContext = createStatementContext();

Expand All @@ -374,6 +392,7 @@ function createContextPreExec(
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// if a binds array is specified, add it to the statement context
if (Util.exists(statementOptions.binds))
Expand Down Expand Up @@ -682,10 +701,13 @@ function invokeStatementComplete(statement, context)
streamResult = context.connectionConfig.getStreamResult();
}

// if the result will be streamed later,
// if the result will be streamed later or in asyncExec mode,
// invoke the complete callback right away
if (streamResult) {
context.complete(Errors.externalize(context.resultError), statement);
} else if (context.asyncExec) {
// return the result object with the query ID inside.
context.complete(null, statement, context.result);
} else {
process.nextTick(function () {
// aggregate all the rows into an array and pass this
Expand Down Expand Up @@ -779,6 +801,12 @@ function createOnStatementRequestSuccRow(statement, context)
// if we don't already have a result
if (!context.result)
{
if (body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC) {
context.result = {
queryId: body.data.queryId
};
return;
}
if (body.data.resultIds != undefined && body.data.resultIds.length > 0)
{
//multi statements
Expand Down Expand Up @@ -1330,6 +1358,11 @@ function sendRequestPreExec(statementContext, onResultAvailable)
json.queryContextDTO = statementContext.services.sf.getQueryContextDTO();
}

// include the asyncExec flag if a value was specified
if (Util.exists(statementContext.asyncExec)) {
json.asyncExec = statementContext.asyncExec;
}

// use the snowflake service to issue the request
sendSfRequest(statementContext,
{
Expand Down Expand Up @@ -1645,9 +1678,15 @@ function buildResultRequestCallback(
statementContext.queryId = body.data.queryId;

// if the result is not ready yet, extract the result url from the response
// and issue a GET request to try to fetch the result again
if (body && (body.code === '333333' || body.code === '333334'))
{
// and issue a GET request to try to fetch the result again unless asyncExec is enabled.
if (body && (body.code === queryCodes.QUERY_IN_PROGRESS
|| body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC)) {

if (statementContext.asyncExec) {
await onResultAvailable.call(null, err, body);
return;
}

// extract the result url from the response and try to get the result
// again
sendSfRequest(statementContext,
Expand Down
10 changes: 8 additions & 2 deletions lib/constants/error_messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ exports[404040] = 'Invalid browser timeout value. The specified value must be a
exports[404041] = 'Invalid disablQueryContextCache. The specified value must be a boolean.';
exports[404042] = 'Invalid includeRetryReason. The specified value must be a boolean.'
exports[404043] = 'Invalid clientConfigFile value. The specified value must be a string.';
exports[404044] = 'Invalid asyncExec. The specified value must be a boolean.'

// 405001
exports[405001] = 'Invalid callback. The specified value must be a function.';
Expand Down Expand Up @@ -113,8 +114,8 @@ exports[409013] = 'Invalid requestId. The specified value must be a string.';
// 410001
exports[410001] = 'Fetch-result options must be specified.';
exports[410002] = 'Invalid options. The specified value must be an object.';
exports[410003] = 'A statement id must be specified.';
exports[410004] = 'Invalid statement id. The specified value must be a string.';
exports[410003] = 'A query id/statement id must be specified.';
exports[410004] = 'Invalid query id/statement id. The specified value must be a string.';
exports[410005] = 'Invalid complete callback. The specified value must be a function.';
exports[410006] = 'Invalid streamResult flag. The specified value must be a boolean.';
exports[410007] = 'Invalid fetchAsString value. The specified value must be an Array.';
Expand Down Expand Up @@ -151,3 +152,8 @@ exports[450004] = 'Invalid each() callback. The specified value must be a functi
exports[450005] = 'An end() callback must be specified.';
exports[450006] = 'Invalid end() callback. The specified value must be a function.';
exports[450007] = 'Operation failed because the statement is still in progress.';

// 460001
exports[460001] = 'Invalid queryId: %s';
exports[460002] = 'Cannot retrieve data. No information returned from server for query %s';
exports[460003] = 'Status of query %s is %s, results are unavailable';
22 changes: 22 additions & 0 deletions lib/constants/query_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

const code = {};

code.RUNNING = 'RUNNING';
code.ABORTING = 'ABORTING';
code.SUCCESS = 'SUCCESS';
code.FAILED_WITH_ERROR = 'FAILED_WITH_ERROR';
code.ABORTED = 'ABORTED';
code.QUEUED = 'QUEUED';
code.FAILED_WITH_INCIDENT = 'FAILED_WITH_INCIDENT';
code.DISCONNECTED = 'DISCONNECTED';
code.RESUMING_WAREHOUSE = 'RESUMING_WAREHOUSE';
// purposeful typo.Is present in QueryDTO.java
code.QUEUED_REPARING_WAREHOUSE = 'QUEUED_REPARING_WAREHOUSE';
code.RESTARTED = 'RESTARTED';
code.BLOCKED = 'BLOCKED';
code.NO_DATA = 'NO_DATA';

exports.code = code;
6 changes: 6 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ codes.ERR_CONN_CREATE_INVALID_BROWSER_TIMEOUT = 404040;
codes.ERR_CONN_CREATE_INVALID_DISABLED_QUERY_CONTEXT_CACHE = 404041
codes.ERR_CONN_CREATE_INVALID_INCLUDE_RETRY_REASON =404042
codes.ERR_CONN_CREATE_INVALID_CLIENT_CONFIG_FILE = 404043;
codes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC = 404044

// 405001
codes.ERR_CONN_CONNECT_INVALID_CALLBACK = 405001;
Expand Down Expand Up @@ -158,6 +159,11 @@ codes.ERR_STMT_FETCH_ROWS_MISSING_END = 450005;
codes.ERR_STMT_FETCH_ROWS_INVALID_END = 450006;
codes.ERR_STMT_FETCH_ROWS_FETCHING_RESULT = 450007;

// 460001
codes.ERR_GET_RESPONSE_QUERY_INVALID_UUID = 460001;
codes.ERR_GET_RESULTS_QUERY_ID_NO_DATA = 460002;
codes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS = 460003;

exports.codes = codes;

/**
Expand Down
Loading

0 comments on commit f0abbf9

Please sign in to comment.