Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt-athena-community support #203

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
matrix:
python-version: [ "3.11"] # "3.10", "3.12"]
dbt-version: ["1.7.0"] # "1.6.0", , "1.8.0b1"]
data-platform: ["redshift", "snowflake", "bigquery"]
data-platform: ["redshift", "snowflake", "bigquery", "athena"]

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -58,4 +58,8 @@ jobs:
BIGQUERY_PRIVATE_KEY: ${{ secrets.BIGQUERY_PRIVATE_KEY }}
BIGQUERY_PRIVATE_KEY_ID: ${{ secrets.BIGQUERY_PRIVATE_KEY_ID }}
BIGQUERY_CLIENT_EMAIL: ${{ secrets.BIGQUERY_CLIENT_EMAIL }}
BIGQUERY_CLIENT_ID: ${{ secrets.BIGQUERY_CLIENT_ID }}
BIGQUERY_CLIENT_ID: ${{ secrets.BIGQUERY_CLIENT_ID }}
ATHENA_TEST_DBNAME: ${{ secrets.ATHENA_TEST_DBNAME }}
ATHENA_AWS_REGION: ${{ secrets.ATHENA_AWS_REGION }}
ATHENA_TEST_BUCKET: ${{ secrets.ATHENA_TEST_BUCKET }}
ATHENA_TEST_WORKGROUP: ${{ secrets.ATHENA_TEST_WORKGROUP }}
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This package provides:
* Spark
* Synapse
* Azure SQL
* AWS Athena via [dbt-athena-community](https://pypi.org/project/dbt-athena-community/)>=1.4.1 ([version changed quoting mechanism](https://github.com/dbt-athena/dbt-athena/pull/152))
* Tested with [Athena Engine version 3](https://docs.aws.amazon.com/athena/latest/ug/engine-versions-reference-0003.html)
* [Example minimal IAM permissions](./athena_iam_permissions,json)


![sample docs](etc/sample_docs.png)

Expand Down Expand Up @@ -56,6 +60,7 @@ The macros assume that you:
- an accessible set of files (Spark)
2. Have the appropriate permissions on to create tables using that scaffolding
3. Have already created the database/project and/or schema/dataset in which dbt will create external tables (or snowpiped tables)
4. Have set the top-level key `query-comment: ''` in your `dbt_project.yml` file (Athena only)

## Spec

Expand Down
61 changes: 61 additions & 0 deletions athena_iam_permissions,json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "athena_glue",
"Effect": "Allow",
"Action": [
"glue:BatchCreatePartition",
"athena:StartQueryExecution",
"athena:ListDataCatalogs",
"glue:DeleteDatabase",
"glue:GetPartitions",
"glue:BatchDeletePartition",
"glue:UpdateTable",
"athena:GetQueryResults",
"glue:DeleteTable",
"athena:GetDatabase",
"athena:GetDataCatalog",
"athena:ListQueryExecutions",
"athena:GetWorkGroup",
"athena:StopQueryExecution",
"glue:CreatePartition",
"glue:UpdatePartition",
"glue:CreateTable",
"glue:GetTables",
"glue:BatchUpdatePartition",
"glue:BatchGetPartition",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"athena:ListDatabases",
"glue:CreateDatabase",
"athena:GetQueryExecution",
"glue:DeletePartition",
"athena:BatchGetQueryExecution"
],
"Resource": "*"
},
{
"Sid": "s3_via_athena",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListBucket",
"s3:DeleteObject",
"s3:GetBucketLocation",
"s3:ListMultipartUploadParts"
],
"Resource": "*",
"Condition": {
"ForAnyValue:StringEquals": {
"aws:CalledVia": "athena.amazonaws.com"
}
}
}
]
}
11 changes: 11 additions & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ clean-targets:
- "target"
- "dbt_packages"

# FIXME: query-comment must be disabled for Athena to work because /* block comments are unsupported in Athena DML
# Removing this line will result in a Runtime Error during the integration test
# `2 of 5 (2) create external table dbt.people_csv_partitioned ...`. The error is
# "FAILED: ParseException line 1:0 cannot recognize input near '/' '*' '{".
# Is there a better way around this?
query-comment: ''

dispatch:
- macro_namespace: dbt_external_tables
search_order: ['dbt_external_tables_integration_tests', 'dbt_external_tables']
Expand All @@ -33,6 +40,8 @@ models:
sources:
dbt_external_tables_integration_tests:
plugins:
athena:
+enabled: "{{ target.type == 'athena' }}"
redshift:
+enabled: "{{ target.type == 'redshift' }}"
snowflake:
Expand All @@ -49,6 +58,8 @@ sources:
tests:
dbt_external_tables_integration_tests:
plugins:
athena:
+enabled: "{{ target.type == 'athena' }}"
redshift:
+enabled: "{{ target.type == 'redshift' }}"
snowflake:
Expand Down
99 changes: 99 additions & 0 deletions integration_tests/models/plugins/athena/athena_external.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: 2

sources:
- name: athena_external
schema: "{{ target.schema }}"
tables:
- name: people_csv_unpartitioned
external: &csv-people
location: "s3://dbt-external-tables-testing/csv/"
row_format: serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
table_properties: "('skip.header.line.count'='1')"
columns: &cols-of-the-people
- name: id
data_type: int
- name: first_name
data_type: varchar(64)
- name: last_name
data_type: varchar(64)
- name: email
data_type: varchar(64)
tests: &equal-to-the-people
- dbt_utils.equality:
compare_model: ref('people')
compare_columns:
- id
- first_name
- last_name
- email

- name: people_csv_partitioned
external:
<<: *csv-people
partitions: &parts-of-the-people
- name: section
data_type: varchar(1)
vals: ['a','b','c','d']
path_macro: dbt_external_tables.key_value
columns: *cols-of-the-people
tests: *equal-to-the-people

# ensure that all partitions are created
- name: people_csv_multipartitioned
external:
<<: *csv-people
location: "s3://dbt-external-tables-testing/"
partitions:
- name: file_format
data_type: varchar(4)
vals: ['csv']
path_macro: dbt_external_tables.value_only
- name: section
data_type: varchar(1)
vals: ['a','b','c','d']
path_macro: dbt_external_tables.key_value
- name: some_date
data_type: date
vals:
macro: dbt.dates_in_range
args:
start_date_str: '2020-01-01'
end_date_str: '2020-02-01'
in_fmt: "%Y-%m-%d"
out_fmt: "%Y-%m-%d"
path_macro: dbt_external_tables.year_month_day
- name: file_name
data_type: varchar(10)
vals: ['people', 'not_people']
path_macro: dbt_external_tables.value_only
columns: *cols-of-the-people

- name: people_csv_multipartitioned_hive_compatible
external:
<<: *csv-people
hive_compatible_partitions: true
location: "s3://dbt-external-tables-testing/"
partitions:
- name: file_format
data_type: varchar(4)
vals: ['csv']
path_macro: dbt_external_tables.value_only
- name: section
data_type: varchar(1)
vals: ['a','b','c','d']
path_macro: dbt_external_tables.key_value
- name: some_date
data_type: date
vals:
macro: dbt.dates_in_range
args:
start_date_str: '2020-01-01'
end_date_str: '2020-02-01'
in_fmt: "%Y-%m-%d"
out_fmt: "%Y-%m-%d"
path_macro: dbt_external_tables.year_month_day
- name: file_name
data_type: varchar(10)
vals: ['people', 'not_people']
path_macro: dbt_external_tables.value_only
columns: *cols-of-the-people
10 changes: 10 additions & 0 deletions integration_tests/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ integration_tests:
target: postgres
outputs:

athena:
type: athena
database: "{{ env_var('ATHENA_TEST_DBNAME', 'AwsDataCatalog') }}"
region_name: "{{ env_var('ATHENA_AWS_REGION') }}"
s3_staging_dir: "s3://{{ env_var('ATHENA_TEST_BUCKET') }}"
work_group: "{{ env_var('ATHENA_TEST_WORKGROUP') }}"
schema: dbt_external_tables_integration_tests_athena
threads: 1
retries: 1

redshift:
type: redshift
host: "{{ env_var('REDSHIFT_TEST_HOST') }}"
Expand Down
35 changes: 35 additions & 0 deletions macros/plugins/athena/create_external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{% macro athena__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}

create external table {{source(source_node.source_name, source_node.name).render_hive()}} (
{% for column in columns %}
{{column.name}} {{column.data_type}}
{{- ',' if not loop.last -}}
{% endfor %}
)
{% if external.comment -%} comment '{{external.comment}}' {%- endif %}
{% if external.partitions -%}
partitioned by (
{% for partition in external.partitions %}
{{partition.name}} {{partition.data_type}}
{%- if partition.comment %} comment '{{partition.comment}}' {%- endif -%}
{{- ', ' if not loop.last -}}
{% endfor %}
)
{%- endif %}
{% if external.clusters and external.num_buckets -%}
clustered by (
{%- for column in external.clusters -%}
{{column}}{{', ' if not loop.last}}
{%- endfor -%}
) into num_buckets {{external.num_buckets}}
{%- endif %}
{% if external.row_format -%} row format {{external.row_format}} {%- endif %}
{% if external.file_format -%} stored as {{external.file_format}} {%- endif %}
{% if external.serde_properties -%} with serdeproperties {{external.serde_properties}} {%- endif %}
{% if external.location -%} location '{{external.location}}' {%- endif %}
{% if external.table_properties -%} tblproperties {{external.table_properties}} {%- endif %}
;
{% endmacro %}
19 changes: 19 additions & 0 deletions macros/plugins/athena/get_external_build_plan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro athena__get_external_build_plan(source_node) %}
{% set build_plan = [] %}
{% set old_relation = adapter.get_relation(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.identifier
) %}
{% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %}
{% if create_or_replace %}
{% set build_plan = [
dbt_external_tables.dropif(source_node),
dbt_external_tables.create_external_table(source_node)
] + dbt_external_tables.refresh_external_table(source_node)
%}
{% else %}
{% set build_plan = dbt_external_tables.refresh_external_table(source_node) %}
{% endif %}
{% do return(build_plan) %}
{% endmacro %}
6 changes: 6 additions & 0 deletions macros/plugins/athena/helpers/dropif.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% macro athena__dropif(node) %}
{% set ddl %}
drop table if exists {{source(node.source_name, node.name).render_hive()}}
{% endset %}
{{return(ddl)}}
{% endmacro %}
61 changes: 61 additions & 0 deletions macros/plugins/athena/refresh_external_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{% macro athena__refresh_external_table(source_node) %}
{# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #}
{%- set partitions = source_node.external.partitions -%}
{%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%}
{%- if partitions -%}
{%- if hive_compatible_partitions -%}
{% set ddl -%}
msck repair table {{source(source_node.source_name, source_node.name).render_hive()}}
{%- endset %}
{{ return([ddl]) }}
{% else %}
{# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #}
{%- set part_len = partitions|length -%}
{%- set get_partitions_sql -%}
select * from
{%- for partition in partitions %} (
{%- set part_num = loop.index -%}
{%- if partition.vals.macro -%}
{%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%}
{%- elif partition.vals is string -%}
{%- set vals = [partition.vals] -%}
{%- else -%}
{%- set vals = partition.vals -%}
{%- endif -%}
{%- for val in vals %}
select
'"{{ partition.name }}"' as name_{{ part_num }},
'"{{ val }}"' as val_{{ part_num }},
'"{{ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) }}"' as path_{{ part_num }}
{{ 'union all' if not loop.last else ') ' }}
{%- endfor -%}
{{ 'cross join' if not loop.last }}
{%- endfor -%}
{%- endset -%}
{%- set finals = [] -%}
{%- if execute -%}
{%- set results = run_query(get_partitions_sql) -%}
{%- for row in results -%}
{%- set partition_parts = [] -%}
{%- set path_parts = [] -%}
{%- for i in range(0, part_len) -%}
{%- do partition_parts.append({
'name': row[i * 3][1:-1],
'value': row[i * 3 + 1][1:-1]
}) -%}
{%- do path_parts.append(row[i * 3 + 2][1:-1]) -%}
{%- endfor -%}
{%- set construct = {
'partition_by': partition_parts,
'path': path_parts | join('/')
} -%}
{% do finals.append(construct) %}
{%- endfor -%}
{%- endif -%}
{%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals, is_athena=True) -%}
{{ return(ddl) }}
{% do return([]) %}
{% endif %}
{% endif %}
{% do return([]) %}
{% endmacro %}
Loading
Loading