Skip to content

Commit

Permalink
HCK-7642: Add support for Snowflake dynamic tables (#134)
Browse files Browse the repository at this point in the history
* HCK-7642: Create script generation

* HCK-7642: add alter support

* HCK-7642: update alter script generation

* HCK-7642: small improvements

* HCK-7642: fix code smells

* HCK-7642: fix second comment

* HCK-7642: change type to block
  • Loading branch information
Nightlngale authored Aug 21, 2024
1 parent 913cd4e commit e71b784
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 43 deletions.
34 changes: 32 additions & 2 deletions forward_engineering/configs/templates.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,36 @@ module.exports = {
'\t\t${column_definitions}' +
'${out_of_line_constraints}\n' +
'\t)${tableOptions};\n',
createDynamicTable:
'CREATE OR REPLACE${transient} DYNAMIC TABLE\n' +
'\t${name}\n' +
'${column_definitions}' +
'${targetLag}' +
'${refreshMode}' +
'${initialize}' +
'${clusterKeys}' +
'${dataRetentionTime}' +
'${maxDataExtensionTime}' +
'${comment}' +
'${tagsStatement}' +
'${selectStatement};\n',
createDynamicIcebergTable:
'CREATE DYNAMIC ICEBERG${transient} TABLE\n' +
'\t${name}\n' +
'${column_definitions}' +
'${targetLag}' +
'${warehouse}' +
'${externalVolume}' +
'${catalog}' +
'${baseLocation}' +
'${refreshMode}' +
'${initialize}' +
'${clusterKeys}' +
'${dataRetentionTime}' +
'${comment}' +
'${copyGrants}' +
'${tagsStatement}' +
'${selectStatement};\n',
createLikeTable: 'CREATE TABLE IF NOT EXISTS ${name} LIKE ${source_table}${tableOptions};\n',
createCloneTable: 'CREATE TABLE IF NOT EXISTS ${name} CLONE ${source_table}${tableOptions};\n',
createAsSelect: 'CREATE TABLE IF NOT EXISTS ${name} AS ${selectStatement}${tableOptions};\n',
Expand Down Expand Up @@ -37,11 +67,11 @@ module.exports = {
'CREATE${temporary} STAGE IF NOT EXISTS ${name} ${url}${storageIntegration}${credentials}${encryption};\n',

alterSchema: 'ALTER SCHEMA IF EXISTS ${name} ${operation} ${options};',
alterTable: 'ALTER TABLE IF EXISTS ${name} ${action};',
alterTable: 'ALTER${dynamic} TABLE IF EXISTS ${name} ${action};',
alterView: 'ALTER${materialized} VIEW IF EXISTS ${name} ${action};',

alterSchemaScript: 'ALTER SCHEMA IF EXISTS ${name} ',
alterTableScript: 'ALTER TABLE IF EXISTS ${name} ',
alterTableScript: 'ALTER${dynamic} TABLE IF EXISTS ${name} ',
alterEntityRename: 'RENAME TO ${name};\n',
setPropertySchema: 'SET ${property};\n',
unsetPropertySchema: 'UNSET ${property};\n',
Expand Down
100 changes: 70 additions & 30 deletions forward_engineering/ddlProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module.exports = (baseProvider, options, app) => {
const scriptFormat = options?.targetScriptOptions?.keyword || FORMATS.SNOWSIGHT;

const keyHelper = require('./helpers/keyHelper')(app);
const { getFileFormat, getCopyOptions, addOptions, getAtOrBefore, mergeKeys } =
const { getFileFormat, getCopyOptions, addOptions, getAtOrBefore, mergeKeys, getDynamicTableProps } =
require('./helpers/tableHelper')(app);
const getFormatTypeOptions = require('./helpers/getFormatTypeOptions')(app);
const { getStageCopyOptions } = require('./helpers/getStageCopyOptions')(app);
Expand Down Expand Up @@ -78,18 +78,15 @@ module.exports = (baseProvider, options, app) => {
});

const getOutOfLineConstraints = (
foreignKeyConstraints,
primaryKeyConstraints,
uniqueKeyConstraints,
isParentActivated,
foreignKeyConstraints = [],
primaryKeyConstraints = [],
uniqueKeyConstraints = [],
) => {
const constraints = []
.concat(foreignKeyConstraints || [])
.concat(primaryKeyConstraints)
.concat(uniqueKeyConstraints)
.map(constraint =>
const constraints = [...foreignKeyConstraints, ...primaryKeyConstraints, ...uniqueKeyConstraints].map(
constraint =>
isParentActivated ? commentIfDeactivated(constraint.statement, constraint) : constraint.statement,
);
);

return !_.isEmpty(constraints) ? ',\n\t\t' + constraints.join(',\n\t\t') : '';
};
Expand Down Expand Up @@ -328,7 +325,27 @@ module.exports = (baseProvider, options, app) => {
isCaseSensitive: tableData.isCaseSensitive,
});

if (tableData.selectStatement) {
if (tableData.dynamic) {
const dynamicTableOptions = getDynamicTableProps({
tableData,
tagsStatement,
clusterKeys,
comment,
dataRetentionTime,
copyGrants,
columnDefinitions,
});

const template = tableData.dynamicTableProps.iceberg
? templates.createDynamicIcebergTable
: templates.createDynamicTable;

return assignTemplates(template, {
name: tableData.fullName,
transient,
...dynamicTableOptions,
});
} else if (tableData.selectStatement) {
return assignTemplates(templates.createAsSelect, {
name: tableData.fullName,
selectStatement: tableData.selectStatement,
Expand Down Expand Up @@ -378,31 +395,30 @@ module.exports = (baseProvider, options, app) => {

column_definitions: columnDefinitions,
out_of_line_constraints: getOutOfLineConstraints(
tableData.foreignKeyConstraints,
tableData.compositePrimaryKeys,
tableData.compositeUniqueKeys,
isActivated,
),
});
} else {
return assignTemplates(templates.createTable, {
name: tableData.fullName,
temporary: temporary,
transient: transient,
tableOptions: addOptions(
[clusterKeys, stageFileFormat, copyOptions, dataRetentionTime, copyGrants, tagsStatement],
comment,
),

column_definitions: columnDefinitions,
out_of_line_constraints: getOutOfLineConstraints(
tableData.foreignKeyConstraints,
tableData.compositePrimaryKeys,
tableData.compositeUniqueKeys,
isActivated,
),
});
}

return assignTemplates(templates.createTable, {
name: tableData.fullName,
temporary,
transient,
tableOptions: addOptions(
[clusterKeys, stageFileFormat, copyOptions, dataRetentionTime, copyGrants, tagsStatement],
comment,
),
column_definitions: columnDefinitions,
out_of_line_constraints: getOutOfLineConstraints(
isActivated,
tableData.foreignKeyConstraints,
tableData.compositePrimaryKeys,
tableData.compositeUniqueKeys,
),
});
},

convertColumnDefinition(columnDefinition) {
Expand Down Expand Up @@ -803,6 +819,21 @@ module.exports = (baseProvider, options, app) => {
temporary: firstTab.temporary,
transient: firstTab.transient,
external: firstTab.external,
dynamic: firstTab.dynamic,
dynamicTableProps: {
iceberg: firstTab.iceberg,
warehouse: firstTab.warehouse,
targetLag: firstTab.targetLag,
refreshMode: firstTab.refreshMode,
initialize: firstTab.initialize,
query: firstTab.query,
externalVolume: firstTab.externalVolume,
catalog: firstTab.catalog,
baseLocation: firstTab.baseLocation,
maxDataExtensionTime: !isNaN(firstTab.MAX_DATA_EXTENSION_TIME_IN_DAYS)
? firstTab.MAX_DATA_EXTENSION_TIME_IN_DAYS
: '',
},
selectStatement: firstTab.selectStatement,
isCaseSensitive: firstTab.isCaseSensitive,
clusteringKey: Array.isArray(firstTab.clusteringKey)
Expand Down Expand Up @@ -908,7 +939,14 @@ module.exports = (baseProvider, options, app) => {
},

alterTable(data) {
const alterTableScript = getAlterEntityScript(templates.alterTableScript, data.nameData);
if (data.iceberg) {
return '// Dynamic Iceberg tables are currently only supported for CREATE statements. Specifying DYNAMIC ICEBERG in any other command (for example, ALTER DYNAMIC ICEBERG TABLE <name>) results in an error.';
}

const alterTableScript = getAlterEntityScript(templates.alterTableScript, {
dynamic: data.dynamic,
...data.nameData,
});
const { script } = _.flow(
getAlterEntityRename(templates.alterTableScript, templates.alterEntityRename),
getSetCollectionProperty(alterTableScript),
Expand Down Expand Up @@ -938,6 +976,8 @@ module.exports = (baseProvider, options, app) => {

return {
...data,
dynamic: collection.role.dynamic,
iceberg: collection.compMod?.iceberg?.old || collection.compMod?.iceberg?.new,
formatTypeOptions: {
...data.formatTypeOptions,
typeOptions: formatTypeOptions,
Expand Down
14 changes: 11 additions & 3 deletions forward_engineering/helpers/alterScriptHelpers/common.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
const _ = require('lodash');
const { getDiffCopyOptionsByDefault } = require('./tableCopyOptionsHelper');

const POSSIBLE_CHANGE_CONTAINER_DATA = ['DATA_RETENTION_TIME_IN_DAYS', 'description'];
const POSSIBLE_UNSET_PROPERTIES = ['description', 'DATA_RETENTION_TIME_IN_DAYS', 'MAX_DATA_EXTENSION_TIME_IN_DAYS'];
const POSSIBLE_SET_PROPERTIES = [
'targetLag',
'warehouse',
'DATA_RETENTION_TIME_IN_DAYS',
'MAX_DATA_EXTENSION_TIME_IN_DAYS',
'description',
];
const POSSIBLE_CHANGE_CONTAINER_DATA = _.uniq([...POSSIBLE_UNSET_PROPERTIES, ...POSSIBLE_SET_PROPERTIES]);
const REDUNDANT_OPTIONS = ['id'];

const checkFieldPropertiesChanged = (compMod, propertiesToCheck) => {
Expand Down Expand Up @@ -107,15 +115,15 @@ const prepareAlterSetUnsetData = ({ collection, data }) => {
return acc;
}

if (newData) {
if (newData && POSSIBLE_SET_PROPERTIES.includes(property)) {
return {
...acc,
set: {
...acc.set,
[property]: newData,
},
};
} else if (oldData) {
} else if (oldData && POSSIBLE_UNSET_PROPERTIES.includes(property)) {
return {
...acc,
unset: [...acc.unset, property],
Expand Down
35 changes: 27 additions & 8 deletions forward_engineering/helpers/alterScriptHelpers/commonScript.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ module.exports = ({ getName, getFullName, templates, assignTemplates, tab }) =>
return assignTemplates(templates.alterSchemaScript, { name: schemaFullName });
};

const getAlterEntityScript = (template, { database, isCaseSensitive, newName, schemaName } = {}) => {
const getAlterEntityScript = (template, { dynamic, database, isCaseSensitive, newName, schemaName } = {}) => {
const schemaFullName = getSchemaFullName(database, schemaName, isCaseSensitive);
const tableName = getName(isCaseSensitive, newName);
const tableFullName = getFullName(schemaFullName, tableName);

return assignTemplates(template, { name: tableFullName });
return assignTemplates(template, { dynamic: dynamic ? ' DYNAMIC' : '', name: tableFullName });
};

const getAlterSchemaName = ({ script, data }) => {
Expand All @@ -41,6 +41,28 @@ module.exports = ({ getName, getFullName, templates, assignTemplates, tab }) =>
return { script, data };
};

const mapKeyToKeyword = {
description: 'COMMENT',
targetLag: 'TARGET_LAG',
warehouse: 'WAREHOUSE',
DATA_RETENTION_TIME_IN_DAYS: 'DATA_RETENTION_TIME_IN_DAYS',
MAX_DATA_EXTENSION_TIME_IN_DAYS: 'MAX_DATA_EXTENSION_TIME_IN_DAYS',
};

const getValue = ({ key, propValue, data }, operation) => {
if (key === 'description') {
const scriptFormat = _.get(data, 'options.targetScriptOptions.keyword');

return escapeString(scriptFormat, propValue);
} else if (key === 'targetLag') {
const { targetLagAmount, targetLagType, targetLagDownstream } = data[operation].targetLag ?? {};

return targetLagDownstream ? 'DOWNSTREAM' : `'${targetLagAmount} ${targetLagType}'`;
}

return propValue;
};

const getSetCollectionProperty =
alterScript =>
({ script, data }) => {
Expand All @@ -51,11 +73,8 @@ module.exports = ({ getName, getFullName, templates, assignTemplates, tab }) =>

const setPropertyData = Object.keys(setProperty).map((key, index) => {
const propValue = setProperty[key];
const scriptFormat = _.get(data, 'options.targetScriptOptions.keyword');

const value = key === 'description' ? escapeString(scriptFormat, propValue) : propValue;
key = key === 'description' ? 'COMMENT' : key;
const statement = `${key} = ${value}`;
const value = getValue({ key, data, propValue }, 'setProperty');
const statement = `${mapKeyToKeyword[key]} = ${value}`;

return Boolean(index) ? tab(statement) : statement;
});
Expand All @@ -79,7 +98,7 @@ module.exports = ({ getName, getFullName, templates, assignTemplates, tab }) =>
}

const unsetPropertyData = unsetProperty.map((key, index) => {
key = key === 'description' ? 'COMMENT' : key;
key = mapKeyToKeyword[key];
return Boolean(index) ? `${key}` : `${key}`;
});

Expand Down
59 changes: 59 additions & 0 deletions forward_engineering/helpers/tableHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,70 @@ module.exports = app => {

const mergeKeys = keys => keys.map(key => `"${key.name}"`).join(', ');

function getTargetLag({ targetLagType, targetLagAmount, targetLagDownstream }) {
return `TARGET_LAG = ${targetLagDownstream ? 'DOWNSTREAM' : `'${targetLagAmount} ${targetLagType}'`}\n`;
}

function getSelectStatement(selectStatement) {
const mapStatement = (statement, index, statements) =>
index === statements.length - 1 ? `\t${statement}` : `\t${statement}\n`;

return `AS\n${selectStatement.split('\n').map(mapStatement).join('')}`;
}

const getDynamicTableProps = ({
tableData,
transient,
tagsStatement,
clusterKeys,
comment,
dataRetentionTime,
copyGrants,
columnDefinitions,
}) => {
if (!tableData.dynamicTableProps) {
return {};
}

const { selectStatement } = tableData;
const {
targetLag,
warehouse,
refreshMode,
initialize,
maxDataExtensionTime,
externalVolume,
catalog,
baseLocation,
} = tableData.dynamicTableProps;

return {
targetLag: targetLag ? getTargetLag(targetLag) : '',
warehouse: warehouse ? `WAREHOUSE = ${warehouse}\n` : '',
selectStatement: selectStatement ? getSelectStatement(selectStatement) : '',
externalVolume: externalVolume ? `EXTERNAL_VOLUME = ${externalVolume}\n` : '',
catalog: catalog ? `CATALOG = ${catalog}\n` : '',
baseLocation: baseLocation ? `BASE_LOCATION = ${baseLocation}\n` : '',
column_definitions: columnDefinitions ? `\t(\n\t\t${columnDefinitions}\n\t)\n` : '',
refreshMode: refreshMode ? `REFRESH_MODE = ${refreshMode}\n` : '',
initialize: initialize ? `INITIALIZE = ${initialize}\n` : '',
clusterKeys,
dataRetentionTime: dataRetentionTime ? `${dataRetentionTime.trim()}\n` : '',
maxDataExtensionTime: maxDataExtensionTime
? `MAX_DATA_EXTENSION_TIME_IN_DAYS = ${maxDataExtensionTime}\n`
: '',
copyGrants: copyGrants ? `${copyGrants.trim()}\n` : '',
comment: comment ? `${comment.trim()}\n` : '',
tagsStatement: tagsStatement ? `${tagsStatement.trim()}\n` : '',
};
};

return {
getFileFormat,
getCopyOptions,
addOptions,
getAtOrBefore,
mergeKeys,
getDynamicTableProps,
};
};
Loading

0 comments on commit e71b784

Please sign in to comment.