Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Jan 7, 2025
1 parent b21dfbe commit 4fd599e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class SnowflakeAdapter(SQLAdapter):

AdapterSpecificConfigs = SnowflakeConfig
CatalogIntegrations = {
CatalogIntegrationType.iceberg_managed: SnowflakeManagedIcebergCatalogIntegration,
CatalogIntegrationType.managed: SnowflakeManagedIcebergCatalogIntegration,
CatalogIntegrationType.glue: SnowflakeGlueCatalogIntegration,
}

Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any

return config_dict

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
return cls.from_dict(cls.parse_relation_config(relation_config))

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
Expand Down
12 changes: 8 additions & 4 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,21 @@ def default(cls) -> Self:


def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]:
if isinstance(catalog_info, Dict):
breakpoint()
if not catalog_info:
return None
elif isinstance(catalog_info, dict):
catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info)
else:
catalog_config = SnowflakeCatalogConfig.parse_relation_config(catalog_info) # type: ignore
catalog_config = SnowflakeCatalogConfig.from_relation_config(catalog_info)

if catalog_config.table_format != TableFormat.default():
catalog_name = "snowflake_managed"
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_config.name,
table_format=catalog_config.table_format,
catalog_type=CatalogIntegrationType.managed,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=catalog_config.external_volume,
)
catalogs_client.add_catalog(
Expand Down Expand Up @@ -86,7 +90,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: Optional[str]
catalog: Optional[str] = None
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

Expand Down
13 changes: 8 additions & 5 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -69,14 +69,17 @@
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
{%- set catalog_integration = adapter.get_catalog_integration(model.catalog) -%}

{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
11 changes: 7 additions & 4 deletions dbt/include/snowflake/macros/relations/dynamic_table/replace.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{%- if dynamic_table.catalog -%}
{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -68,13 +68,16 @@
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}

create or replace dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_replace(self, project, scenario):
assert relation_type == scenario.final.relation_type, scenario.error_message
if relation_type == "dynamic_table":
dynamic_table = describe_dynamic_table(project, scenario.name)
assert dynamic_table.catalog.table_format == scenario.final.table_format
assert dynamic_table.catalog is not None
else:
pytest.skip()

Expand Down

0 comments on commit 4fd599e

Please sign in to comment.