Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add iceberg high availability on table using rename #246

Merged
merged 1 commit into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def s3_table_location(
s3_data_naming: str,
schema_name: str,
table_name: str,
s3_path_table_part: Optional[str] = None,
external_location: Optional[str] = None,
is_temporary_table: bool = False,
) -> str:
Expand All @@ -201,12 +202,17 @@ def s3_table_location(
if external_location and not is_temporary_table:
return external_location.rstrip("/")

if not s3_path_table_part:
s3_path_table_part = table_name

mapping = {
"uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())),
"table": path.join(self.s3_table_prefix(s3_data_dir), table_name),
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())),
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name),
"schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4())),
"table": path.join(self.s3_table_prefix(s3_data_dir), s3_path_table_part),
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), s3_path_table_part, str(uuid4())),
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, s3_path_table_part),
"schema_table_unique": path.join(
self.s3_table_prefix(s3_data_dir), schema_name, s3_path_table_part, str(uuid4())
),
}
table_location = mapping.get(s3_data_naming)

Expand Down Expand Up @@ -280,7 +286,9 @@ def upload_seed_to_s3(
client = conn.handle

# TODO: consider using the workgroup default location when configured
s3_location = self.s3_table_location(s3_data_dir, s3_data_naming, database_name, table_name, external_location)
s3_location = self.s3_table_location(
s3_data_dir, s3_data_naming, database_name, table_name, external_location=external_location
)
bucket, prefix = self._parse_s3_path(s3_location)

file_name = f"{table_name}.csv"
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AthenaIncludePolicy(Policy):
class AthenaRelation(BaseRelation):
quote_character: str = '"' # Presto quote character
include_policy: Policy = field(default_factory=lambda: AthenaIncludePolicy())
s3_path_table_part: Optional[str] = None

def render_hive(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@
alter table {{ relation.render_hive() }} set tblproperties ('classification' = '{{ format }}')
{%- endcall %}
{%- endmacro %}

{% macro athena__rename_relation(from_relation, to_relation) %}
{% call statement('rename_relation') -%}
alter table {{ from_relation.render_hive() }} rename to `{{ to_relation.schema }}`.`{{ to_relation.identifier }}`
{%- endcall %}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
{%- set location_property = 'external_location' -%}
{%- set partition_property = 'partitioned_by' -%}
{%- set work_group_output_location_enforced = adapter.is_work_group_output_location_enforced() -%}
{%- set location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier, external_location, temporary) -%}
{%- set location = adapter.s3_table_location(s3_data_dir,
s3_data_naming,
relation.schema,
relation.identifier,
relation.s3_path_table_part,
external_location,
temporary) -%}

{%- if materialized == 'table_hive_ha' -%}
{%- set location = location.replace('__ha', '') -%}
Expand Down
49 changes: 38 additions & 11 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,46 @@

{{ run_hooks(pre_hooks) }}

-- cleanup
{%- if old_relation is not none -%}
{{ drop_relation(old_relation) }}
{%- endif -%}
{%- if old_relation is none or table_type != 'iceberg' -%}
{%- if old_relation is not none -%}
{{ drop_relation(old_relation) }}
{%- endif -%}

-- build model
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% if table_type != 'iceberg' %}
{{ set_table_classification(target_relation) }}
{% endif %}
{%- if table_type != 'iceberg' -%}
{{ set_table_classification(target_relation) }}
{%- endif -%}
{%- else -%}
{%- set tmp_relation = api.Relation.create(identifier=target_relation.identifier ~ '__ha',
nicor88 marked this conversation as resolved.
Show resolved Hide resolved
schema=schema,
database=database,
s3_path_table_part=target_relation.identifier,
nicor88 marked this conversation as resolved.
Show resolved Hide resolved
type='table') -%}
{%- if tmp_relation is not none -%}
{%- do drop_relation(tmp_relation) -%}
{%- endif -%}

{%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%}
-- If we have this, it means that at least the first renaming occurred but there was an issue
-- afterwards, therefore we are in weird state. The easiest and cleanest should be to remove
-- the backup relation. It won't have an impact because since we are in the else condition, that
-- means that old relation exists therefore no downtime yet.
{%- if old_relation_bkp is not none -%}
{%- do drop_relation(old_relation_bkp) -%}
{%- endif -%}

{%- call statement('main') -%}
{{ create_table_as(False, tmp_relation, sql) }}
{%- endcall -%}

{{ rename_relation(old_relation, old_relation_bkp) }}
{{ rename_relation(tmp_relation, target_relation) }}

{{ drop_relation(old_relation_bkp) }}
{%- endif -%}

{{ run_hooks(post_hooks) }}

Expand Down
34 changes: 28 additions & 6 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,24 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
assert "Got an error when attempting to open a Athena connection due to foobar" in dbt_error_caplog.getvalue()

@pytest.mark.parametrize(
("s3_data_dir", "s3_data_naming", "external_location", "is_temporary_table", "expected"),
("s3_data_dir", "s3_data_naming", "s3_path_table_part", "external_location", "is_temporary_table", "expected"),
(
pytest.param(None, "table", None, False, "s3://my-bucket/test-dbt/tables/table", id="table naming"),
pytest.param(None, "uuid", None, False, "s3://my-bucket/test-dbt/tables/uuid", id="uuid naming"),
pytest.param(None, "table", None, None, False, "s3://my-bucket/test-dbt/tables/table", id="table naming"),
pytest.param(None, "uuid", None, None, False, "s3://my-bucket/test-dbt/tables/uuid", id="uuid naming"),
pytest.param(
None, "table_unique", None, False, "s3://my-bucket/test-dbt/tables/table/uuid", id="table_unique naming"
None,
"table_unique",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/table/uuid",
id="table_unique naming",
),
pytest.param(
None,
"schema_table",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/schema/table",
id="schema_table naming",
Expand All @@ -258,6 +265,7 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
None,
"schema_table_unique",
None,
None,
False,
"s3://my-bucket/test-dbt/tables/schema/table/uuid",
id="schema_table_unique naming",
Expand All @@ -266,13 +274,15 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
"s3://my-data-bucket/",
"schema_table_unique",
None,
None,
False,
"s3://my-data-bucket/schema/table/uuid",
id="data_dir set",
),
pytest.param(
"s3://my-data-bucket/",
"schema_table_unique",
None,
"s3://path/to/external/",
False,
"s3://path/to/external",
Expand All @@ -281,18 +291,30 @@ def test_acquire_connection_exc(self, connection_cls, dbt_error_caplog):
pytest.param(
"s3://my-data-bucket/",
"schema_table_unique",
None,
"s3://path/to/external/",
True,
"s3://my-data-bucket/schema/table/uuid",
id="external_location set and temporary",
),
pytest.param(
None,
"schema_table_unique",
"other_table",
None,
False,
"s3://my-bucket/test-dbt/tables/schema/other_table/uuid",
id="s3_path_table_part set",
),
),
)
@patch("dbt.adapters.athena.impl.uuid4", return_value="uuid")
def test_s3_table_location(self, _, s3_data_dir, s3_data_naming, external_location, is_temporary_table, expected):
def test_s3_table_location(
self, _, s3_data_dir, s3_data_naming, external_location, s3_path_table_part, is_temporary_table, expected
):
self.adapter.acquire_connection("dummy")
assert expected == self.adapter.s3_table_location(
s3_data_dir, s3_data_naming, "schema", "table", external_location, is_temporary_table
s3_data_dir, s3_data_naming, "schema", "table", s3_path_table_part, external_location, is_temporary_table
)

def test_s3_table_location_exc(self):
Expand Down