Skip to content

Commit

Permalink
HCK-8233: iceberg meta (#160)
Browse files Browse the repository at this point in the history
* feat: improved logs for iceberg tables

* fix: use proper describe statement
  • Loading branch information
chulanovskyi-bs authored Oct 3, 2024
1 parent 8d8cc53 commit 79904f1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
3 changes: 2 additions & 1 deletion reverse_engineering/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ const getDbCollectionsData = async (data, logger, cb) => {
getSampleDocSize(quantity, data.recordSamplingSettings),
fullTableName,
);
const entityData = await snowflakeHelper.getEntityData(fullTableName, logger);

const entityData = await snowflakeHelper.getEntityData({ fullTableName, logger });

logger.progress({ message: `Schema inference`, containerName: schema, entityName: table });
logger.log(
Expand Down
40 changes: 25 additions & 15 deletions reverse_engineering/helpers/snowflakeHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ const showMaterializedViews = () => execute('SHOW MATERIALIZED VIEWS;');

const showIcebergTables = ({ options = '' } = {}) => execute(`SHOW ICEBERG TABLES${options};`);

const describeTable = ({ tableType = '', params = '' } = {}) => execute(`DESCRIBE${tableType} TABLE${params};`);

const annotateView = row => ({ ...row, name: `${row.name} (v)` });

const splitEntityNames = names => {
Expand Down Expand Up @@ -546,9 +548,9 @@ const getRowsByDatabases = entitiesRows => {
};

const logErrorAndReturnEmptyArray =
({ logger }) =>
({ logger, query = '' }) =>
err => {
logger.log('error', err, 'SHOW command error');
logger.log('error', err, `"${query}" query execution error`);
return [];
};

Expand All @@ -557,7 +559,7 @@ const logTablesMeta = async ({ logger, tables = [], icebergTables = [] }) => {
return;
}

const logError = logErrorAndReturnEmptyArray({ logger });
const logError = logErrorAndReturnEmptyArray({ logger, query: 'SHOW ICEBERG TABLES' });

const getMeta = async table => {
const { database_name, name, rows, is_dynamic, is_external, is_iceberg } = table;
Expand Down Expand Up @@ -586,7 +588,7 @@ const logTablesMeta = async ({ logger, tables = [], icebergTables = [] }) => {
};

const getEntitiesNames = async ({ logger }) => {
const logError = logErrorAndReturnEmptyArray({ logger });
const logError = logErrorAndReturnEmptyArray({ logger, query: 'SHOW' });

const databases = await showDatabases().catch(logError);
const tablesRows = await showTablesByDatabases(databases).catch(logError);
Expand Down Expand Up @@ -1032,19 +1034,24 @@ const handleClusteringKey = (fieldsNames, keysExpression) => {
}, []);
};

const getEntityData = async (fullName, logger) => {
const [dbName, schemaName, tableName] = fullName.split('.');
const getEntityData = async ({ fullTableName, logger }) => {
const [dbName, schemaName, tableName] = fullTableName.split('.');

try {
let entityLevelData = {};
const rows = await execute(
`select * from "${removeQuotes(dbName)}".information_schema.tables where TABLE_NAME='${removeQuotes(tableName)}' AND TABLE_SCHEMA='${removeQuotes(schemaName)}'`,
);
const data = _.first(rows);
const fields = await execute(`DESC TABLE ${fullName};`).catch(e => []);

const fields = await describeTable({ params: fullTableName }).catch(
logErrorAndReturnEmptyArray({ logger, query: 'DESCRIBE TABLE' }),
);

const fieldsNames = fields.map(field => field.name);
const clusteringKey = handleClusteringKey(fieldsNames, _.get(data, 'CLUSTERING_KEY', ''));
const stageData = await execute(`DESCRIBE TABLE ${fullName} type = stage;`);
const stageData = await describeTable({ params: `${fullTableName} type = stage` });

const fileFormat = _.toUpper(
_.get(
stageData.find(item => item.property === 'TYPE'),
Expand All @@ -1053,11 +1060,10 @@ const getEntityData = async (fullName, logger) => {
),
);
const isDynamic = _.toUpper(_.get(data, 'IS_DYNAMIC', '')) === 'YES';
const isIceberg = _.toUpper(_.get(data, 'IS_ICEBERG', '')) === 'YES';

if (isDynamic) {
const dynamicTableData = await getDynamicTableData({ fullName, logger });
const isIceberg = _.toUpper(_.get(data, 'IS_ICEBERG', '')) === 'YES';

const dynamicTableData = await getDynamicTableData({ fullTableName, logger });
entityLevelData = { ...data, ...dynamicTableData, iceberg: isIceberg };
}

Expand All @@ -1075,7 +1081,7 @@ const getEntityData = async (fullName, logger) => {
external = true;
}
if (external) {
const externalTableData = await getExternalTableData(fullName);
const externalTableData = await getExternalTableData(fullTableName);
entityLevelData = { ...data, ...externalTableData };
}
if (!fileFormat) {
Expand Down Expand Up @@ -1275,8 +1281,8 @@ function getTargetLag(targetLag) {
return getTargetLagStringValue(targetLag);
}

const getDynamicTableData = async ({ fullName, logger }) => {
const [dbName, schemaName, tableName] = fullName.split('.');
const getDynamicTableData = async ({ fullTableName, logger }) => {
const [dbName, schemaName, tableName] = fullTableName.split('.');

try {
const rows = await execute(
Expand Down Expand Up @@ -1321,7 +1327,11 @@ const getDynamicTableData = async ({ fullName, logger }) => {
MAX_DATA_EXTENSION_TIME_IN_DAYS,
};
} catch (error) {
logger.log('error', { error }, `Reverse Engineering error while retrieving schema data from table ${fullName}`);
logger.log(
'error',
{ error },
`Reverse Engineering error while retrieving schema data from table ${fullTableName}`,
);

return {};
}
Expand Down

0 comments on commit 79904f1

Please sign in to comment.