Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero-Downtime option for CTAS recreation #95

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,38 @@ _Additional information_

### Usage notes

**Zero-Downtime Tables**
Starting from adapter version `1.0.3` there is a way to do `dbt run` without downtime on table update
It works in the following way:
1. Enable zero-downtime for tables
a) Add var `table_zero_downtime` to the `dbt_project.yml`
```
vars:
table_zero_downtime: true
```
b) Alternatively add tag `table_zero_downtime` to the specific model with `table` materialization
```
{{config(tags=['table_zero_downtime'])}}
```
2. Create a companion view for your table
a) It's usually is very simple like `select * from {{ref('table_model')}}`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curios question: why not let the code do this themselves? Like create the real table with a temp name and the name which corresponds to the filename as a view with this simple select star?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jankatins Good catch actually - Thanks! Haven't considered such an option to be honest. Will think on it - since it might eliminate several downsides of the solution

b) Move all tests to a companion view, since tests on the table are not working properly in all cases
c) Documentation for the table will not have columns, but everything will be fine with created view

3. Cleaning up stale objects like ctas
a) We've added a complimentary set of macroses to cleanup objects that not exist in Git
Checkout these links:
- https://github.com/SOVALINUX/dbt-utils/blob/main/macros/sql/delete_stale_objects.sql
- https://github.com/SOVALINUX/athena-utils/blob/main/macros/dbt_utils/sql/delete_stale_objects.sql
And `on-run-end` hook I can do the following trick:
```
on-run-end: "{% do athena_utils.delete_stale_ctas_run_end([target.schema, generate_schema_name('some_extra_schema', '')], False, '') %}"
```

### Notes on Docker
If you ever going to add this connector to the Docker, please use Dockerfile from dbt v1.1 or higher
https://github.com/dbt-labs/dbt-core/blob/1.1.latest/docker/Dockerfile

### Models

#### Table Configuration
Expand Down
73 changes: 72 additions & 1 deletion dbt/adapters/athena/relation.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,87 @@
from dataclasses import dataclass
import json

from dbt.adapters.base.relation import BaseRelation, Policy
from typing import (
Optional, Type, TypeVar
)
from dbt.contracts.relation import (
RelationType
)

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.events import AdapterLogger
logger = AdapterLogger("Athena")
ZERODOWNTIME_CTAS = {}

@dataclass
class AthenaIncludePolicy(Policy):
database: bool = False
schema: bool = True
identifier: bool = True

Self = TypeVar('Self', bound='AthenaRelation')

@dataclass(frozen=True, eq=False, repr=False)
class AthenaRelation(BaseRelation):
quote_character: str = ""
include_policy: Policy = AthenaIncludePolicy()

@classmethod
def create(
cls: Type[Self],
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
type: Optional[RelationType] = None,
**kwargs,
) -> Self:
# Override default implementation to handle zero-downtime table recreation in Athena
# Tables created with zero-downtime has the names like `ctas_<original_name>_<epoch_time_millis>`
# This method called from several points:
# 1: table.sql with all these extra parameters inside kwargs
# it's actually a place where the decision on zero-downtime tables takes place
# 2: compilation.py:_compile_node() when all the models are being rendered with jinja
# it's not called directly, but through
# providers.py: ref implementations
# While jinja renders raw SQL it calls builtin ref function, which calls this method
#
# In case 1 we mark the model as ZERODOWNTIME_CTAS
# In case 2 if the model marked as ZERODOWNTIME_CTAS, then we give the actual name in DB instead of original one

if 'model' in kwargs and 'graph' in kwargs and 'ctas_id' in kwargs:
model = kwargs.get("model")
graph = kwargs.get("graph")
alias = kwargs.get("alias")
model_old_name = '{}.{}.{}'.format(model.get('resource_type'), model.get('package_name'), alias)
alias_full_name = '{}.{}.{}'.format(database, schema, alias)
gmodel = graph.get('nodes').get(model_old_name)
ctas_id = kwargs.get("ctas_id")
if gmodel is not None and alias_full_name not in ZERODOWNTIME_CTAS:
# TODO: It's actually questionable do we need to update the global graph
old_name = model.get('relation_name')
model_id_parts = model.get('unique_id').split('.')[0:-1]
model_id_parts.append(ctas_id)
model_rel_parts = model.get('relation_name').split('.')[0:-1]
model_rel_parts.append(ctas_id)
gmodel.update({
'name': ctas_id,
'alias': alias,
'unique_id': '.'.join(model_id_parts),
'relation_name': '.'.join(model_rel_parts),
'ctas': ctas_id,
})

ZERODOWNTIME_CTAS.update({alias_full_name: ctas_id})
id_full_name = '{}.{}.{}'.format(database, schema, identifier)
final_id = identifier
if 'graph' not in kwargs and id_full_name in ZERODOWNTIME_CTAS:
final_id = ZERODOWNTIME_CTAS.get(id_full_name)
kwargs.update({
'path': {
'database': database,
'schema': schema,
'identifier': final_id,
},
'type': type,
})
return cls.from_dict(kwargs)
22 changes: 16 additions & 6 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
{% materialization table, adapter='athena' -%}
{%- set identifier = model['alias'] -%}

{{ run_hooks(pre_hooks) }}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
{% if var('table_zero_downtime', false) or 'table_zero_downtime' in config.get('tags') %}
{%- set current_ts = (modules.datetime.datetime.utcnow() - modules.datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000 -%}
{%- set ctas_id_str = "ctas_{0}_{1}".format(identifier, current_ts) -%}
{%- set ctas_id = ctas_id_str[0:ctas_id_str.index('.')] -%}

{%- set target_relation = api.Relation.create(identifier=ctas_id,
schema=schema,
database=database,
type='table') -%}

{{ run_hooks(pre_hooks) }}

{%- if old_relation is not none -%}
type='table', alias=identifier, ctas_id=ctas_id, model=model, graph=graph) -%}
{%- else -%}
{%- if old_relation is not none -%}
{{ adapter.drop_relation(old_relation) }}
{%- endif -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
type='table') -%}
{%- endif -%}

-- build model
Expand Down
2 changes: 2 additions & 0 deletions test/integration/athena.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ target:
schema: "{{ env_var('DBT_TEST_ATHENA_SCHEMA', 'dbt_integration_tests') }}"
region_name: "{{ env_var('DBT_TEST_ATHENA_REGION', 'eu-west-1') }}"
s3_staging_dir: "{{ env_var('DBT_TEST_ATHENA_S3_STAGING_DIR') }}"
aws_profile_name: "{{ env_var('DBT_TEST_AWS_PROFILE') }}"
work_group: "{{ env_var('DBT_TEST_ATHENA_WORKGROUP', 'primary') }}"
sequences:
test_dbt_empty: empty
test_dbt_base: base
Expand Down