Skip to content

Commit

Permalink
Feature: Use Snowflake Temporary tables where possible (#249)
Browse files Browse the repository at this point in the history
* Introduce Temporary Table DB Feature
  • Loading branch information
hustic authored Jan 16, 2024
1 parent 8e4dae7 commit b897984
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 9 deletions.
8 changes: 7 additions & 1 deletion sayn/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ def create_table(
db=None,
select=None,
replace=False,
temporary=False,
**ddl,
):
db_name = db or ""
Expand Down Expand Up @@ -823,7 +824,12 @@ def merge_query(
tmp_db = db

create_or_replace = self.create_table(
tmp_table, tmp_schema, tmp_db, select=select, replace=True
tmp_table,
tmp_schema,
tmp_db,
select=select,
replace=True,
temporary=True,
)

merge = self.merge_tables(
Expand Down
52 changes: 51 additions & 1 deletion sayn/database/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

class Snowflake(Database):
def feature(self, feature):
return feature in ("TABLE RENAME CHANGES SCHEMA",)
return feature in ("TABLE RENAME CHANGES SCHEMA")

def create_engine(self, settings):
from snowflake.sqlalchemy import URL
Expand Down Expand Up @@ -80,3 +80,53 @@ def _load_data_batch(self, table, data, schema, db):
temp_file_name=fname,
)
)

def create_table(
self,
table,
schema=None,
db=None,
select=None,
replace=False,
temporary=False,
**ddl,
):
db_name = db or ""
schema_name = schema or ""
full_name = fully_qualify(table, schema, db)
if (
db_name in self._requested_objects
and schema_name in self._requested_objects[db_name]
and table in self._requested_objects[db_name][schema_name]
):
db_info = self._requested_objects[db_name][schema_name][table]
object_type = db_info.get("type")
table_exists = bool(object_type == "table")
view_exists = bool(object_type == "view")
else:
db_info = dict()
table_exists = True
view_exists = True

template = self._jinja_env.get_template("snowflake_create_table.sql")

return template.render(
table_name=table,
full_name=full_name,
view_exists=view_exists,
table_exists=table_exists,
select=select,
replace=True,
temporary=temporary,
can_replace_table=self.feature("CAN REPLACE TABLE"),
needs_cascade=self.feature("NEEDS CASCADE"),
cannot_specify_ddl_select=self.feature("CANNOT SPECIFY DDL IN SELECT"),
all_columns_have_type=len(
[c for c in ddl.get("columns", dict()) if c.get("type") is not None]
),
**ddl,
)


def fully_qualify(name, schema=None, db=None):
return f"{db+'.' if db is not None else ''}{schema+'.' if schema is not None else ''}{name}"
8 changes: 4 additions & 4 deletions sayn/database/templates/create_table.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{%- if not replace %}
CREATE TABLE IF NOT EXISTS {{ full_name }}
{%- elif replace and can_replace_table %}
{%- if table_exists %}
{%- if table_exists %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- elif view_exists %}
{%- elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
CREATE TABLE {{ full_name }}
{%- else %}
{%- else %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- endif %}
{%- endif %}
{%- elif replace and not can_replace_table %}
{% if table_exists %}
DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
Expand Down
83 changes: 83 additions & 0 deletions sayn/database/templates/snowflake_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{%- if temporary %}
CREATE TEMP TABLE {{ full_name }}
{%- elif not replace %}
CREATE TABLE IF NOT EXISTS {{ full_name }}
{%- elif replace and can_replace_table %}
{%- if table_exists %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
CREATE TABLE {{ full_name }}
{%- else %}
CREATE OR REPLACE TABLE {{ full_name }}
{%- endif %}
{%- elif replace and not can_replace_table %}
{% if table_exists %}
DROP TABLE IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
{% elif view_exists %}
DROP VIEW IF EXISTS {{ full_name }}{{ ' CASCADE' if needs_cascade else ''}};
{% endif %}

CREATE TABLE {{ full_name }}
{%- endif %}

{%- if select is undefined or select is none %}
{%- if columns is defined and columns|length > 0 and all_columns_have_type %}
(

{%- for col_def in columns %}
{{ col_def['name'] }} {{ col_def['type'] }}
{{- ' UNIQUE' if col_def.get('unique')}}
{{- ' NOT NULL' if col_def.get('not_null')}}
{{- ',' if not loop.last else ''}}
{%- endfor %}
)
{%- endif %}
{%- endif %}

{%- block table_attributes %}
{%- if partition is defined and partition is not none %}
PARTITION BY {{ partition }}
{% endif %}

{%- if cluster is defined and cluster is not none %}
CLUSTER BY {{ cluster|join(', ') }}
{% endif %}

{%- if distribution is defined and distribution is not none %}
DISTSTYLE {{ distribution['type'] }}
{% if distribution['type'] == 'KEY' %}DISTKEY({{ distribution['column'] }}){% endif %}
{% endif %}

{%- if sorting is defined and sorting is not none %}
{{ sorting['type']+' ' if sorting['type'] else '' }}SORTKEY({{ sorting['columns']|join(', ') }})
{% endif %}
{% endblock -%}

{%- if temporary %}
DATA_RETENTION_TIME_IN_DAYS = 0
{% endif -%}

{%- if select is defined and select is not none %}

AS
{{ select }}

{% endif -%}
;

{% block indexes %}
{% if indexes is defined %}
{% for name, idx_def in indexes.items() %}
CREATE INDEX {{ table_name }}_{{ name }} ON {{ full_name }}({{ ', '.join(idx_def['columns']) }});
{% endfor %}
{% endif %}
{% endblock %}

{% block permissions %}
{% if permissions is defined %}
{% for role, priv in permissions.items() %}
GRANT {{ priv }} ON {{ full_name }} TO {{ role }};
{% endfor %}
{% endif %}
{% endblock %}
4 changes: 2 additions & 2 deletions sayn/database/templates/snowflake_load_batch.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% set file_format = full_table_name + '_csv_format' %}
{% set stage = full_table_name + '_csv_stage' %}

CREATE OR REPLACE FILE FORMAT {{ file_format }}
CREATE OR REPLACE TEMP FILE FORMAT {{ file_format }}
TYPE = 'CSV'
FIELD_DELIMITER = '\t'
SKIP_HEADER = 1
Expand All @@ -11,7 +11,7 @@ CREATE OR REPLACE FILE FORMAT {{ file_format }}
ESCAPE_UNENCLOSED_FIELD = '\\'
;

CREATE OR REPLACE stage {{ stage }}
CREATE OR REPLACE TEMP stage {{ stage }}
file_format = {{ file_format }};

PUT file://{{ temp_file_directory }}/{{ temp_file_name }} @{{ stage }} auto_compress=true;
Expand Down
10 changes: 9 additions & 1 deletion sayn/tasks/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,15 @@ def execute(self, execute, debug, is_full_load, limit=None):
load_schema = self.tmp_schema
if is_full_load or self.mode == "full":
steps.append("Move Table")
is_temporary = False
else:
steps.append("Merge Tables")
is_temporary = True
else:
load_db = self.database
load_table = self.table
load_schema = self.schema
is_temporary = False

self.set_run_steps(steps)

Expand All @@ -458,7 +461,12 @@ def execute(self, execute, debug, is_full_load, limit=None):
create_ddl["columns"] = [c for c in self.columns["columns"]]

query = self.target_db.create_table(
load_table, schema=load_schema, db=load_db, replace=True, **create_ddl
load_table,
schema=load_schema,
db=load_db,
replace=True,
temporary=is_temporary,
**create_ddl,
)
if debug:
self.write_compilation_output(query, "create_table")
Expand Down

0 comments on commit b897984

Please sign in to comment.