Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HCK-8210: extend logs #157

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions forward_engineering/helpers/tableHelper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const _ = require('lodash');
const { isEmpty, toUpper, trim } = require('lodash');
const { preSpace } = require('../utils/preSpace');

module.exports = app => {
Expand All @@ -14,20 +14,17 @@ module.exports = app => {
};

const getCopyOptions = copyOptions => {
if (_.isEmpty(copyOptions)) {
if (isEmpty(copyOptions)) {
return '';
}

return 'STAGE_COPY_OPTIONS = (\n' + tab(toOptions(copyOptions)) + '\n)';
};

const addOptions = (options, comment) => {
const allOptions = _.trim(
tab(options.filter(statement => Boolean(_.trim(statement, '\t\n '))).join('\n')),
'\t\n',
);
const allOptions = trim(tab(options.filter(statement => Boolean(trim(statement, '\t\n '))).join('\n')), '\t\n');

if (_.trim(comment)) {
if (trim(comment)) {
return allOptions + '\n\t' + comment;
}

Expand Down Expand Up @@ -127,7 +124,7 @@ module.exports = app => {
dynamic: preSpace(dynamic && 'DYNAMIC'),
catalogSync: catalogSync ? `CATALOG_SYNC = '${catalogSync}'\n` : '',
storageSerializationPolicy: storageSerializationPolicy
? `STORAGE_SERIALIZATION_POLICY = ${storageSerializationPolicy}\n`
? `STORAGE_SERIALIZATION_POLICY = ${toUpper(storageSerializationPolicy)}\n`
: '',
changeTracking: changeTracking ? 'CHANGE_TRACKING = TRUE\n' : '',
defaultDdlCollation: defaultDdlCollation ? `DEFAULT_DDL_COLLATION = '${defaultDdlCollation}'\n` : '',
Expand Down
2 changes: 1 addition & 1 deletion reverse_engineering/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const getDbCollectionsNames = async (connectionInfo, logger, cb) => {
await snowflakeHelper.connect(logger, connectionInfo);
const schemasInfo = await snowflakeHelper.getSchemasInfo();
logger.log('info', { schemas: schemasInfo }, 'Found schemas');
const namesBySchemas = await snowflakeHelper.getEntitiesNames();
const namesBySchemas = await snowflakeHelper.getEntitiesNames({ logger });

logger.log('info', { entities: namesBySchemas }, 'Found entities');

Expand Down
8 changes: 8 additions & 0 deletions reverse_engineering/helpers/errorMessages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = {
CONNECTION_ERROR: 'Connection error',
OKTA_SSO_ERROR: "Can't get SSO URL. Please, check the authenticator",
OKTA_CREDENTIALS_ERROR:
'Incorrect Okta username/password or MFA is enabled. Please, check credentials or use the "Identity Provider SSO (via external browser)" for MFA auth',
OKTA_MFA_ERROR:
'Native Okta auth doesn\'t support MFA. Please, use the "Identity Provider SSO (via external browser)" auth instead',
};
215 changes: 122 additions & 93 deletions reverse_engineering/helpers/snowflakeHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,12 @@ const snowflake = require('snowflake-sdk');
const axios = require('axios');
const uuid = require('uuid');
const BSON = require('bson');
const errorMessages = require('./errorMessages');

const ALREADY_CONNECTED_STATUS = 405502;
const CANT_REACH_SNOWFLAKE_ERROR_STATUS = 401001;
const CONNECTION_TIMED_OUT_CODE = 'CONNECTION_TIMED_OUT';

let connection;
let containers = {};

const noConnectionError = { message: 'Connection error' };
const oktaAuthenticatorError = { message: "Can't get SSO URL. Please, check the authenticator" };
const oktaCredentialsError = {
message:
'Incorrect Okta username/password or MFA is enabled. Please, check credentials or use the "Identity Provider SSO (via external browser)" for MFA auth',
};
const oktaMFAError = {
message:
'Native Okta auth doesn\'t support MFA. Please, use the "Identity Provider SSO (via external browser)" auth instead',
};

const DEFAULT_CLIENT_APP_ID = 'JavaScript';
const DEFAULT_CLIENT_APP_VERSION = '1.5.1';
const DEFAULT_WAREHOUSE = 'COMPUTE_WH';
Expand All @@ -33,6 +20,11 @@ const SECONDS_IN_DAY = 86400;
const SECONDS_IN_HOUR = 3600;
const SECONDS_IN_MINUTE = 60;

const noConnectionError = { message: errorMessages.CONNECTION_ERROR };

let connection;
let containers = {};

const connect = async (
logger,
{
Expand All @@ -56,13 +48,47 @@ const connect = async (

logger.log(
'info',
`Connection name: ${name}\nCloud platform: ${cloudPlatform}\nHost: ${host}\nAuth type: ${authType}\nUsername: ${username}\nWarehouse: ${warehouse}\nRole: ${role}`,
`Connection name: ${name}\n` +
`Cloud platform: ${cloudPlatform}\n` +
`Host: ${host}\n` +
`Auth type: ${authType}\n` +
`Username: ${username}\n` +
`Warehouse: ${warehouse}\n` +
`Role: ${role}`,
'Connection',
);

const connectionFallbackStrategy = err => {
if (err.code !== CANT_REACH_SNOWFLAKE_ERROR_STATUS || hasCloudPlatform(account)) {
throw err;
}

const message = _.isString(err) ? err : _.get(err, 'message', 'Reverse Engineering error');
logger.log(
'info',
`Can't reach Snowflake server. Trying to add cloudPlatformName. \nInitial error: ${message}`,
'Connection',
);

return connect(logger, {
host: `${host}.${_.toLower(cloudPlatform)}`,
username,
password,
authType,
authenticator,
proofKey,
token,
role,
warehouse,
name,
cloudPlatform,
queryRequestTimeout,
});
};

let authPromise;
if (authType === 'okta') {
authPromise = authByOkta(logger, {
return authByOkta(logger, {
account,
accessUrl,
username,
Expand All @@ -71,7 +97,7 @@ const connect = async (
role,
warehouse,
timeout,
});
}).catch(connectionFallbackStrategy);
}
if (authType === 'externalbrowser') {
authPromise = authByExternalBrowser(logger, {
Expand All @@ -89,39 +115,15 @@ const connect = async (
authPromise = authByCredentials({ account, username, password, role, warehouse, timeout });
}

return await authPromise.catch(err => {
if (err.code !== CANT_REACH_SNOWFLAKE_ERROR_STATUS || hasCloudPlatform(account)) {
throw err;
}

const message = _.isString(err) ? err : _.get(err, 'message', 'Reverse Engineering error');
logger.log(
'info',
`Can't reach Snowflake server. Trying to add cloudPlatformName. \nInitial error: ${message}`,
'Connection',
);

return connect(logger, {
host: `${host}.${_.toLower(cloudPlatform)}`,
username,
password,
authType,
authenticator,
proofKey,
token,
role,
warehouse,
name,
cloudPlatform,
queryRequestTimeout,
});
});
return authPromise.catch(connectionFallbackStrategy);
};

const authByOkta = async (
logger,
{ account, accessUrl, username, password, authenticator, role, timeout, warehouse = DEFAULT_WAREHOUSE },
) => {
const oktaCredentialsError = { message: errorMessages.OKTA_CREDENTIALS_ERROR };

logger.log('info', `Authenticator: ${authenticator}`, 'Connection');
const accountName = getAccountName(account);
const ssoUrlsData = await axios.post(
Expand All @@ -142,7 +144,7 @@ const authByOkta = async (
logger.log('info', `Token URL: ${tokenUrl}\nSSO URL: ${ssoUrl}`, 'Connection');

if (!tokenUrl || !ssoUrl) {
return Promise.reject(oktaAuthenticatorError);
return Promise.reject({ message: errorMessages.OKTA_SSO_ERROR });
}

const authNData = await axios
Expand All @@ -158,7 +160,7 @@ const authByOkta = async (
const status = _.get(authNData, 'data.status', 'SUCCESS');
const authToken = _.get(authNData, 'data.sessionToken', '');
if (status.startsWith('MFA')) {
return Promise.reject(oktaMFAError);
return Promise.reject({ message: errorMessages.OKTA_MFA_ERROR });
}

const identityProviderTokenData = await axios.post(tokenUrl, { username, password }).catch(err => {
Expand Down Expand Up @@ -346,10 +348,22 @@ const authByExternalBrowser = async (
);
logger.log('info', `Fallback to ${defaultWarehouse} warehouse`, 'Connection');

const logError = err => logger.log('info', `WAREHOUSE error: ${err}`, 'Connection');

const logAndReturnEmptyArray = err => {
logError(err);
return [];
};

execute(`USE WAREHOUSE "${removeQuotes(defaultWarehouse)}";`).then(resolve, async err => {
if (err) {
logError(err);
}

const currentInfo = await execute(
`select current_warehouse() as warehouse, current_role() as role;`,
).catch(err => []);
).catch(logAndReturnEmptyArray);

const infoRow = _.first(currentInfo);
const currentWarehouse = _.get(infoRow, 'WAREHOUSE', '');
const currentRole = _.get(infoRow, 'ROLE', '');
Expand Down Expand Up @@ -529,15 +543,35 @@ const getRowsByDatabases = entitiesRows => {
}, {});
};

const getEntitiesNames = async () => {
const databases = await showDatabases().catch(e => []);
const tablesRows = await showTablesByDatabases(databases).catch(e => []);
const externalTableRows = await showExternalTables().catch(e => []);
const viewsRows = await showViews().catch(e => []);
const materializedViewsRows = await showMaterializedViews().catch(e => []);
const getEntitiesNames = async ({ logger }) => {
const logErrorAndReturnEmptyArray = err => {
logger.log('error', err, 'SHOW command error');
return [];
};

const logTablesMeta = ({ tables = [] }) => {
if (!tables.length) {
return;
}
const getMeta = table =>
`${table.database_name}.${table.name}: rows=${table.rows}; is_dynamic=${table.is_dynamic}; is_external=${table.is_external}; is_iceberg=${table.is_iceberg};`;
const combinedMeta = tables.map(getMeta);

logger.log('info', combinedMeta, 'Tables metadata');
};

const databases = await showDatabases().catch(logErrorAndReturnEmptyArray);
const tablesRows = await showTablesByDatabases(databases).catch(logErrorAndReturnEmptyArray);
const flatTableRows = tablesRows.flatMap(row => row.value).filter(Boolean);

logTablesMeta({ tables: flatTableRows });

const externalTableRows = await showExternalTables().catch(logErrorAndReturnEmptyArray);
const viewsRows = await showViews().catch(logErrorAndReturnEmptyArray);
const materializedViewsRows = await showMaterializedViews().catch(logErrorAndReturnEmptyArray);

const entitiesRows = [
...tablesRows.flatMap(row => row.value).filter(Boolean),
...flatTableRows,
...externalTableRows,
...viewsRows.map(annotateView),
...materializedViewsRows.map(annotateView),
Expand Down Expand Up @@ -571,7 +605,7 @@ const getFullEntityName = (schemaName, tableName) => {
};

const addQuotes = string => {
if (/^\".*\"$/.test(string)) {
if (/^".*"$/.test(string)) {
return string;
}

Expand Down Expand Up @@ -885,7 +919,7 @@ const getJsonSchema = async (logger, limit, tableName) => {
};

const removeQuotes = str => {
return (str || '').replace(/^\"([\s\S]*)\"$/im, '$1');
return (str || '').replace(/^"([\s\S]*)"$/im, '$1');
};

const removeLinear = str => {
Expand All @@ -907,44 +941,43 @@ const handleClusteringKey = (fieldsNames, keysExpression) => {
const items = keysExpression.split(',');

return items.reduce((keys, item) => {
const arguments = item.split('(');
const args = item.split('(');
let expression = '';
const clusteringKeys = arguments
.map(argument => {
const rawName = _.get(_.trim(argument).match(/^([\S]+)/), 1);
if (!rawName) {
if (expression) {
expression += '(';
}
expression += argument;
return false;
}
const name = removeQuotes(_.last(getVariantName(_.trim(rawName)).split('.')));
const fieldName = fieldsNames.find(fieldName => _.toUpper(fieldName) === _.toUpper(name));
if (!fieldName) {
if (expression) {
expression += '(';
}
expression += argument;
return false;
}

if (name === _.trim(removeQuotes(item))) {
return {
name: fieldName,
};
const clusteringKeys = args.reduce((acc, arg) => {
const rawName = _.get(_.trim(arg).match(/^(\S+)/), 1);
if (!rawName) {
if (expression) {
expression += '(';
}

expression += arg;
return acc;
}
const name = removeQuotes(_.last(getVariantName(_.trim(rawName)).split('.')));
const fieldName = fieldsNames.find(fieldName => _.toUpper(fieldName) === _.toUpper(name));
if (!fieldName) {
if (expression) {
expression += '(';
}
expression += argument.replace(new RegExp(`^${name}`), '${name}');
expression += arg;
return acc;
}

return {
name: fieldName,
};
})
.filter(Boolean);
if (name === _.trim(removeQuotes(item))) {
acc.push({ name: fieldName });

return acc;
}

if (expression) {
expression += '(';
}
expression += arg.replace(new RegExp(`^${name}`), '${name}');

acc.push({ name: fieldName });

return acc;
}, []);

if (!_.isEmpty(clusteringKeys)) {
return [
Expand Down Expand Up @@ -1048,11 +1081,7 @@ const getEntityData = async (fullName, logger) => {
const checkExternalMetaFields = fields => {
const metaField = _.first(fields) || {};

if (metaField.name === 'VALUE' && metaField.type === 'VARIANT') {
return true;
}

return false;
return metaField.name === 'VALUE' && metaField.type === 'VARIANT';
};

const getFileFormatOptions = stageData => {
Expand Down Expand Up @@ -1317,7 +1346,7 @@ const getProcedures = async (dbName, schemaName) => {

return rows.map(row => {
const procedureArguments =
row['ARGUMENT_SIGNATURE'] === '()' ? '' : row['ARGUMENT_SIGNATURE'].replace(/[\(\)]/gm, '');
row['ARGUMENT_SIGNATURE'] === '()' ? '' : row['ARGUMENT_SIGNATURE'].replace(/[()]/gm, '');

return {
name: row['PROCEDURE_NAME'],
Expand Down