Skip to content

Commit

Permalink
SNOW-502598: Add async query execution (#672)
Browse files Browse the repository at this point in the history
* SNOW-502598: Add async query execution

* SNOW-502598: Add tests

* SNOW-502598: Add tests

* SNOW-502598: Remove redundant requestAsync() function

* SNOW-502598: Replace for loop with while loop

* SNOW-502598: Edit comment for query status retry

* SNOW-502598: Remove test comment

* SNOW-502598: Parse data back into JSON

* SNOW-502598: Revert normalizing response

* SNOW-502598: Use async for before/after tests

* SNOW-502598: Extract running and error statuses

* SNOW-502598: Refactor error into the correct type

* SNOW-502598: Keep asyncExec option for execute only

* SNOW-502598: Format unused params in tests

* SNOW-502598: Refactor getQueryStatusThrowIfError
  • Loading branch information
sfc-gh-ext-simba-lf authored Nov 6, 2023
1 parent 6cc2fda commit d7c33ea
Show file tree
Hide file tree
Showing 10 changed files with 913 additions and 24 deletions.
168 changes: 168 additions & 0 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { v4: uuidv4 } = require('uuid');
const Url = require('url');
const QueryString = require('querystring');
const GSErrors = require('../constants/gs_errors')
const QueryStatus = require('../constants/query_status');

var Util = require('../util');
var Errors = require('../errors');
Expand Down Expand Up @@ -37,6 +38,15 @@ function Connection(context)
// generate an id for the connection
var id = uuidv4();

// async max retry and retry pattern from python connector
const asyncNoDataMaxRetry = 24;
const asyncRetryPattern = [1, 1, 2, 3, 4, 8, 10];
const asyncRetryInMilliseconds = 500;

// Custom regex based on uuid validate
// Unable to directly use uuid validate because the queryId returned from the server doesn't match the regex
const queryIdRegex = new RegExp(/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000)$/i);

//Make session tokens available for testing
this.getTokens = function ()
{
Expand Down Expand Up @@ -411,6 +421,164 @@ function Connection(context)
return this;
};

/**
* Gets the response containing the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {Object} the query response
*/
async function getQueryResponse(queryId) {
// Check if queryId exists and is valid uuid
Errors.checkArgumentExists(Util.exists(queryId),
ErrorCodes.ERR_CONN_FETCH_RESULT_MISSING_QUERY_ID);
Errors.checkArgumentValid(queryIdRegex.test(queryId),
ErrorCodes.ERR_GET_RESPONSE_QUERY_INVALID_UUID, queryId);

// Form the request options
const options =
{
method: 'GET',
url: Url.format(
{
pathname: `/monitoring/queries/${queryId}`
}),
};

// Get the response containing the query status
const response = await services.sf.requestAsync(options);

return response['data'];
}

/**
* Extracts the status of the query from the query response.
*
* @param {Object} queryResponse
*
* @returns {String} the query status.
*/
function extractQueryStatus(queryResponse) {
const queries = queryResponse['data']['queries'];
let status = QueryStatus.code.NO_DATA; // default status
if (queries.length > 0) {
status = queries[0]['status'];
}

return status;
}

/**
* Gets the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatus = async function (queryId) {
return extractQueryStatus(await getQueryResponse(queryId));
};

/**
* Gets the status of the query based on queryId and throws if there's an error.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatusThrowIfError = async function (queryId) {
const status = this.getQueryStatus(queryId);

let message, code, sqlState = null;

if (this.isAnError(status)) {
const response = await getQueryResponse(queryId);
message = response['message'] || '';
code = response['code'] || -1;

if (response['data']) {
message += response['data']['queries'].length > 0 ? response['data']['queries'][0]['errorMessage'] : '';
sqlState = response['data']['sqlState'];
}

throw Errors.createOperationFailedError(
code, response, message, sqlState);
}

return status;
};

/**
* Gets the results from a previously ran query based on queryId
*
* @param {Object} options
*
* @returns {Object}
*/
this.getResultsFromQueryId = async function (options) {
const queryId = options.queryId;
let status, noDataCounter = 0, retryPatternPos = 0;

// Wait until query has finished executing
let queryStillExecuting = true;
while (queryStillExecuting) {
// Check if query is still running and trigger exception if it failed
status = await this.getQueryStatusThrowIfError(queryId);
queryStillExecuting = this.isStillRunning(status);
if (!queryStillExecuting) {
break;
}

// Timeout based on query status retry rules
await new Promise((resolve) => {
setTimeout(() => resolve(), asyncRetryInMilliseconds * asyncRetryPattern[retryPatternPos]);
});

// If no data, increment the no data counter
if (QueryStatus.code[status] == QueryStatus.code.NO_DATA) {
noDataCounter++;
// Check if retry for no data is exceeded
if (noDataCounter > asyncNoDataMaxRetry) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NO_DATA, true, queryId);
}
}

if (retryPatternPos < asyncRetryPattern.length - 1) {
retryPatternPos++;
}
}

if (QueryStatus.code[status] != QueryStatus.code.SUCCESS) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS, true, queryId, status);
}

return this.fetchResult(options);
};

/**
* Checks whether the given status is currently running.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isStillRunning = function (status) {
return QueryStatus.runningStatuses.includes(QueryStatus.code[status]);
};

/**
* Checks whether the given status means that there has been an error.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isAnError = function (status) {
return QueryStatus.errorStatuses.includes(QueryStatus.code[status]);
};

/**
* Returns a serialized version of this connection.
*
Expand Down
40 changes: 36 additions & 4 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var statementTypes =
FILE_POST_EXEC: 'FILE_POST_EXEC'
};

const queryCodes = {
QUERY_IN_PROGRESS: '333333', // GS code: the query is in progress
QUERY_IN_PROGRESS_ASYNC: '333334' // GS code: the query is detached
};

exports.createContext = function (
options, services, connectionConfig)
{
Expand Down Expand Up @@ -364,6 +369,12 @@ function createContextPreExec(
RowMode.checkRowModeValid(rowMode);
}

// if an asyncExec flag is specified, make sure it's boolean
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),
ErrorCodes.ERR_CONN_EXEC_STMT_INVALID_ASYNC_EXEC);
}

// create a statement context
var statementContext = createStatementContext();

Expand All @@ -374,6 +385,7 @@ function createContextPreExec(
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// if a binds array is specified, add it to the statement context
if (Util.exists(statementOptions.binds))
Expand Down Expand Up @@ -682,10 +694,13 @@ function invokeStatementComplete(statement, context)
streamResult = context.connectionConfig.getStreamResult();
}

// if the result will be streamed later,
// if the result will be streamed later or in asyncExec mode,
// invoke the complete callback right away
if (streamResult) {
context.complete(Errors.externalize(context.resultError), statement);
} else if (context.asyncExec) {
// return the result object with the query ID inside.
context.complete(null, statement, context.result);
} else {
process.nextTick(function () {
// aggregate all the rows into an array and pass this
Expand Down Expand Up @@ -779,6 +794,12 @@ function createOnStatementRequestSuccRow(statement, context)
// if we don't already have a result
if (!context.result)
{
if (body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC) {
context.result = {
queryId: body.data.queryId
};
return;
}
if (body.data.resultIds != undefined && body.data.resultIds.length > 0)
{
//multi statements
Expand Down Expand Up @@ -1330,6 +1351,11 @@ function sendRequestPreExec(statementContext, onResultAvailable)
json.queryContextDTO = statementContext.services.sf.getQueryContextDTO();
}

// include the asyncExec flag if a value was specified
if (Util.exists(statementContext.asyncExec)) {
json.asyncExec = statementContext.asyncExec;
}

// use the snowflake service to issue the request
sendSfRequest(statementContext,
{
Expand Down Expand Up @@ -1645,9 +1671,15 @@ function buildResultRequestCallback(
statementContext.queryId = body.data.queryId;

// if the result is not ready yet, extract the result url from the response
// and issue a GET request to try to fetch the result again
if (body && (body.code === '333333' || body.code === '333334'))
{
// and issue a GET request to try to fetch the result again unless asyncExec is enabled.
if (body && (body.code === queryCodes.QUERY_IN_PROGRESS
|| body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC)) {

if (statementContext.asyncExec) {
await onResultAvailable.call(null, err, body);
return;
}

// extract the result url from the response and try to get the result
// again
sendSfRequest(statementContext,
Expand Down
10 changes: 8 additions & 2 deletions lib/constants/error_messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ exports[409010] = 'Invalid streamResult flag. The specified value must be a bool
exports[409011] = 'Invalid fetchAsString value. The specified value must be an Array.';
exports[409012] = 'Invalid fetchAsString type: %s. The supported types are: String, Boolean, Number, Date, Buffer, and JSON.';
exports[409013] = 'Invalid requestId. The specified value must be a string.';
exports[409014] = 'Invalid asyncExec. The specified value must be a boolean.'

// 410001
exports[410001] = 'Fetch-result options must be specified.';
exports[410002] = 'Invalid options. The specified value must be an object.';
exports[410003] = 'A statement id must be specified.';
exports[410004] = 'Invalid statement id. The specified value must be a string.';
exports[410003] = 'A query id/statement id must be specified.';
exports[410004] = 'Invalid query id/statement id. The specified value must be a string.';
exports[410005] = 'Invalid complete callback. The specified value must be a function.';
exports[410006] = 'Invalid streamResult flag. The specified value must be a boolean.';
exports[410007] = 'Invalid fetchAsString value. The specified value must be an Array.';
Expand Down Expand Up @@ -152,3 +153,8 @@ exports[450004] = 'Invalid each() callback. The specified value must be a functi
exports[450005] = 'An end() callback must be specified.';
exports[450006] = 'Invalid end() callback. The specified value must be a function.';
exports[450007] = 'Operation failed because the statement is still in progress.';

// 460001
exports[460001] = 'Invalid queryId: %s';
exports[460002] = 'Cannot retrieve data. No information returned from server for query %s';
exports[460003] = 'Status of query %s is %s, results are unavailable';
45 changes: 45 additions & 0 deletions lib/constants/query_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

const code = {};

code.RUNNING = 'RUNNING';
code.ABORTING = 'ABORTING';
code.SUCCESS = 'SUCCESS';
code.FAILED_WITH_ERROR = 'FAILED_WITH_ERROR';
code.ABORTED = 'ABORTED';
code.QUEUED = 'QUEUED';
code.FAILED_WITH_INCIDENT = 'FAILED_WITH_INCIDENT';
code.DISCONNECTED = 'DISCONNECTED';
code.RESUMING_WAREHOUSE = 'RESUMING_WAREHOUSE';
// purposeful typo.Is present in QueryDTO.java
code.QUEUED_REPARING_WAREHOUSE = 'QUEUED_REPARING_WAREHOUSE';
code.RESTARTED = 'RESTARTED';
code.BLOCKED = 'BLOCKED';
code.NO_DATA = 'NO_DATA';

// All running query statuses
const runningStatuses =
[
code.RUNNING,
code.RESUMING_WAREHOUSE,
code.QUEUED,
code.QUEUED_REPARING_WAREHOUSE,
code.NO_DATA,
];

// All error query statuses
const errorStatuses =
[
code.ABORTING,
code.FAILED_WITH_ERROR,
code.ABORTED,
code.FAILED_WITH_INCIDENT,
code.DISCONNECTED,
code.BLOCKED,
];

exports.code = code;
exports.runningStatuses = runningStatuses;
exports.errorStatuses = errorStatuses;
6 changes: 6 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ codes.ERR_CONN_EXEC_STMT_INVALID_STREAM_RESULT = 409010;
codes.ERR_CONN_EXEC_STMT_INVALID_FETCH_AS_STRING = 409011;
codes.ERR_CONN_EXEC_STMT_INVALID_FETCH_AS_STRING_VALUES = 409012;
codes.ERR_CONN_EXEC_STMT_INVALID_REQUEST_ID = 409013;
codes.ERR_CONN_EXEC_STMT_INVALID_ASYNC_EXEC = 409014;

// 410001
codes.ERR_CONN_FETCH_RESULT_MISSING_OPTIONS = 410001;
Expand Down Expand Up @@ -159,6 +160,11 @@ codes.ERR_STMT_FETCH_ROWS_MISSING_END = 450005;
codes.ERR_STMT_FETCH_ROWS_INVALID_END = 450006;
codes.ERR_STMT_FETCH_ROWS_FETCHING_RESULT = 450007;

// 460001
codes.ERR_GET_RESPONSE_QUERY_INVALID_UUID = 460001;
codes.ERR_GET_RESULTS_QUERY_ID_NO_DATA = 460002;
codes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS = 460003;

exports.codes = codes;

/**
Expand Down
9 changes: 8 additions & 1 deletion lib/http/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ HttpClient.prototype.requestAsync = async function (options)
{
const requestOptions = prepareRequestOptions.call(this, options);

return axios.request(requestOptions);
let response = await axios.request(requestOptions);

if (Util.isString(response['data']) &&
response['headers']['content-type'] === 'application/json') {
response['data'] = JSON.parse(response['data']);
}

return response;
};

/**
Expand Down
Loading

0 comments on commit d7c33ea

Please sign in to comment.