Skip to content

Commit

Permalink
feat: add iceberg high availability on table using rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Jrmyy committed Apr 20, 2023
1 parent 16464bc commit 78719f7
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 23 deletions.
18 changes: 13 additions & 5 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def s3_table_location(
s3_data_naming: str,
schema_name: str,
table_name: str,
s3_path_table_part: Optional[str] = None,
external_location: Optional[str] = None,
is_temporary_table: bool = False,
) -> str:
Expand All @@ -201,12 +202,17 @@ def s3_table_location(
if external_location and not is_temporary_table:
return external_location.rstrip("/")

if not s3_path_table_part:
s3_path_table_part = table_name

mapping = {
"uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())),
"table": path.join(self.s3_table_prefix(s3_data_dir), table_name),
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())),
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name),
"schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4())),
"table": path.join(self.s3_table_prefix(s3_data_dir), s3_path_table_part),
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), s3_path_table_part, str(uuid4())),
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, s3_path_table_part),
"schema_table_unique": path.join(
self.s3_table_prefix(s3_data_dir), schema_name, s3_path_table_part, str(uuid4())
),
}
table_location = mapping.get(s3_data_naming)

Expand Down Expand Up @@ -280,7 +286,9 @@ def upload_seed_to_s3(
client = conn.handle

# TODO: consider using the workgroup default location when configured
s3_location = self.s3_table_location(s3_data_dir, s3_data_naming, database_name, table_name, external_location)
s3_location = self.s3_table_location(
s3_data_dir, s3_data_naming, database_name, table_name, external_location=external_location
)
bucket, prefix = self._parse_s3_path(s3_location)

file_name = f"{table_name}.csv"
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AthenaIncludePolicy(Policy):
class AthenaRelation(BaseRelation):
quote_character: str = '"' # Presto quote character
include_policy: Policy = field(default_factory=lambda: AthenaIncludePolicy())
s3_path_table_part: Optional[str] = None

def render_hive(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@
alter table {{ relation.render_hive() }} set tblproperties ('classification' = '{{ format }}')
{%- endcall %}
{%- endmacro %}

{% macro athena__rename_relation(from_relation, to_relation) %}
{% call statement('rename_relation') -%}
alter table {{ from_relation.render_hive() }} rename to `{{ to_relation.identifier }}`
{%- endcall %}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
{%- set location_property = 'external_location' -%}
{%- set partition_property = 'partitioned_by' -%}
{%- set work_group_output_location_enforced = adapter.is_work_group_output_location_enforced() -%}
{%- set location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier, external_location, temporary) -%}
{%- set location = adapter.s3_table_location(s3_data_dir,
s3_data_naming,
relation.schema,
relation.identifier,
relation.s3_path_table_part,
external_location,
temporary) -%}

{%- if materialized == 'table_hive_ha' -%}
{%- set location = location.replace('__ha', '') -%}
Expand Down
49 changes: 38 additions & 11 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,46 @@

{{ run_hooks(pre_hooks) }}

-- cleanup
{%- if old_relation is not none -%}
{{ drop_relation(old_relation) }}
{%- endif -%}
{%- if old_relation is none or table_type != 'iceberg' -%}
{%- if old_relation is not none -%}
{{ drop_relation(old_relation) }}
{%- endif -%}

-- build model
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% if table_type != 'iceberg' %}
{{ set_table_classification(target_relation) }}
{% endif %}
{%- if table_type != 'iceberg' -%}
{{ set_table_classification(target_relation) }}
{%- endif -%}
{%- else -%}
{%- set tmp_relation = api.Relation.create(identifier=target_relation.identifier ~ '__ha',
schema=schema,
database=database,
s3_path_table_part=target_relation.identifier,
type='table') -%}
{%- if tmp_relation is not none -%}
{%- do drop_relation(tmp_relation) -%}
{%- endif -%}

{%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%}
-- If we have this, it means that at least the first renaming occurred but there was an issue
-- afterwards, therefore we are in weird state. The easiest and cleanest should be to remove
-- the backup relation. It won't have an impact because since we are in the else condition, that
-- means that old relation exists therefore no downtime yet.
{%- if old_relation_bkp is not none -%}
{%- do drop_relation(old_relation_bkp) -%}
{%- endif -%}

{%- call statement('main') -%}
{{ create_table_as(False, tmp_relation, sql) }}
{%- endcall -%}

{{ rename_relation(old_relation, old_relation_bkp) }}
{{ rename_relation(tmp_relation, target_relation) }}

{{ drop_relation(old_relation_bkp) }}
{%- endif -%}

{{ run_hooks(post_hooks) }}

Expand Down
34 changes: 28 additions & 6 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,24 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
assert "Got an error when attempting to open a Athena connection due to foobar" in dbt_error_caplog.getvalue()

@pytest.mark.parametrize(
("s3_data_dir", "s3_data_naming", "external_location", "is_temporary_table", "expected"),
("s3_data_dir", "s3_data_naming", "s3_path_table_part", "external_location", "is_temporary_table", "expected"),
(
pytest.param(None, "table", None, False, "s3://my-bucket/test-dbt/tables/table", id="table naming"),
pytest.param(None, "uuid", None, False, "s3://my-bucket/test-dbt/tables/uuid", id="uuid naming"),
pytest.param(None, "table", None, None, False, "s3://my-bucket/test-dbt/tables/table", id="table naming"),
pytest.param(None, "uuid", None, None, False, "s3://my-bucket/test-dbt/tables/uuid", id="uuid naming"),
pytest.param(
None, "table_unique", None, False, "s3://my-bucket/test-dbt/tables/table/uuid", id="table_unique naming"
None,
"table_unique",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/table/uuid",
id="table_unique naming",
),
pytest.param(
None,
"schema_table",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/schema/table",
id="schema_table naming",
Expand All @@ -258,6 +265,7 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
None,
"schema_table_unique",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/schema/table/uuid",
id="schema_table_unique naming",
Expand All @@ -266,13 +274,15 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
"s3://my-data-bucket/",
"schema_table_unique",
None,
None,
False,
"s3://my-data-bucket/schema/table/uuid",
id="data_dir set",
),
pytest.param(
"s3://my-data-bucket/",
"schema_table_unique",
None,
"s3://path/to/external/",
False,
"s3://path/to/external",
Expand All @@ -281,18 +291,30 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
pytest.param(
"s3://my-data-bucket/",
"schema_table_unique",
None,
"s3://path/to/external/",
True,
"s3://my-data-bucket/schema/table/uuid",
id="external_location set and temporary",
),
pytest.param(
None,
"schema_table_unique",
"other_table",
None,
False,
"s3://my-bucket/test-dbt/tables/schema/other_table/uuid",
id="s3_path_table_part set",
),
),
)
@patch("dbt.adapters.athena.impl.uuid4", return_value="uuid")
def test_s3_table_location(self, _, s3_data_dir, s3_data_naming, external_location, is_temporary_table, expected):
def test_s3_table_location(
self, _, s3_data_dir, s3_data_naming, external_location, s3_path_table_part, is_temporary_table, expected
):
self.adapter.acquire_connection("dummy")
assert expected == self.adapter.s3_table_location(
s3_data_dir, s3_data_naming, "schema", "table", external_location, is_temporary_table
s3_data_dir, s3_data_naming, "schema", "table", s3_path_table_part, external_location, is_temporary_table
)

def test_s3_table_location_exc(self):
Expand Down

0 comments on commit 78719f7

Please sign in to comment.