diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 0ffbbbb8..49a21d73 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -18,7 +18,7 @@ jobs: matrix: python-version: [ "3.11"] # "3.10", "3.12"] dbt-version: ["1.7.0"] # "1.6.0", , "1.8.0b1"] - data-platform: ["redshift", "snowflake", "bigquery"] + data-platform: ["redshift", "snowflake", "bigquery", "athena"] steps: - uses: actions/checkout@v3 @@ -58,4 +58,8 @@ jobs: BIGQUERY_PRIVATE_KEY: ${{ secrets.BIGQUERY_PRIVATE_KEY }} BIGQUERY_PRIVATE_KEY_ID: ${{ secrets.BIGQUERY_PRIVATE_KEY_ID }} BIGQUERY_CLIENT_EMAIL: ${{ secrets.BIGQUERY_CLIENT_EMAIL }} - BIGQUERY_CLIENT_ID: ${{ secrets.BIGQUERY_CLIENT_ID }} \ No newline at end of file + BIGQUERY_CLIENT_ID: ${{ secrets.BIGQUERY_CLIENT_ID }} + ATHENA_TEST_DBNAME: ${{ secrets.ATHENA_TEST_DBNAME }} + ATHENA_AWS_REGION: ${{ secrets.ATHENA_AWS_REGION }} + ATHENA_TEST_BUCKET: ${{ secrets.ATHENA_TEST_BUCKET }} + ATHENA_TEST_WORKGROUP: ${{ secrets.ATHENA_TEST_WORKGROUP }} \ No newline at end of file diff --git a/README.md b/README.md index 51e34d31..f0dac18c 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,10 @@ This package provides: * Spark * Synapse * Azure SQL +* AWS Athena via [dbt-athena-community](https://pypi.org/project/dbt-athena-community/)>=1.4.1 ([version changed quoting mechanism](https://github.com/dbt-athena/dbt-athena/pull/152)) + * Tested with [Athena Engine version 3](https://docs.aws.amazon.com/athena/latest/ug/engine-versions-reference-0003.html) + * [Example minimal IAM permissions](./athena_iam_permissions,json) + ![sample docs](etc/sample_docs.png) @@ -56,6 +60,7 @@ The macros assume that you: - an accessible set of files (Spark) 2. Have the appropriate permissions on to create tables using that scaffolding 3. Have already created the database/project and/or schema/dataset in which dbt will create external tables (or snowpiped tables) +4. Have set the top-level key `query-comment: ''` in your `dbt_project.yml` file (Athena only) ## Spec diff --git a/athena_iam_permissions,json b/athena_iam_permissions,json new file mode 100644 index 00000000..70e025b1 --- /dev/null +++ b/athena_iam_permissions,json @@ -0,0 +1,61 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "athena_glue", + "Effect": "Allow", + "Action": [ + "glue:BatchCreatePartition", + "athena:StartQueryExecution", + "athena:ListDataCatalogs", + "glue:DeleteDatabase", + "glue:GetPartitions", + "glue:BatchDeletePartition", + "glue:UpdateTable", + "athena:GetQueryResults", + "glue:DeleteTable", + "athena:GetDatabase", + "athena:GetDataCatalog", + "athena:ListQueryExecutions", + "athena:GetWorkGroup", + "athena:StopQueryExecution", + "glue:CreatePartition", + "glue:UpdatePartition", + "glue:CreateTable", + "glue:GetTables", + "glue:BatchUpdatePartition", + "glue:BatchGetPartition", + "glue:GetDatabases", + "glue:GetTable", + "glue:GetDatabase", + "glue:GetPartition", + "athena:ListDatabases", + "glue:CreateDatabase", + "athena:GetQueryExecution", + "glue:DeletePartition", + "athena:BatchGetQueryExecution" + ], + "Resource": "*" + }, + { + "Sid": "s3_via_athena", + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:GetObject", + "s3:ListBucketMultipartUploads", + "s3:AbortMultipartUpload", + "s3:ListBucket", + "s3:DeleteObject", + "s3:GetBucketLocation", + "s3:ListMultipartUploadParts" + ], + "Resource": "*", + "Condition": { + "ForAnyValue:StringEquals": { + "aws:CalledVia": "athena.amazonaws.com" + } + } + } + ] +} \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 6de61814..1b2d8f04 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -17,6 +17,13 @@ clean-targets: - "target" - "dbt_packages" +# FIXME: query-comment must be disabled for Athena to work because /* block comments are unsupported in Athena DML +# Removing this line will result in a Runtime Error during the integration test +# `2 of 5 (2) create external table dbt.people_csv_partitioned ...`. The error is +# "FAILED: ParseException line 1:0 cannot recognize input near '/' '*' '{". +# Is there a better way around this? +query-comment: '' + dispatch: - macro_namespace: dbt_external_tables search_order: ['dbt_external_tables_integration_tests', 'dbt_external_tables'] @@ -33,6 +40,8 @@ models: sources: dbt_external_tables_integration_tests: plugins: + athena: + +enabled: "{{ target.type == 'athena' }}" redshift: +enabled: "{{ target.type == 'redshift' }}" snowflake: @@ -49,6 +58,8 @@ sources: tests: dbt_external_tables_integration_tests: plugins: + athena: + +enabled: "{{ target.type == 'athena' }}" redshift: +enabled: "{{ target.type == 'redshift' }}" snowflake: diff --git a/integration_tests/models/plugins/athena/athena_external.yml b/integration_tests/models/plugins/athena/athena_external.yml new file mode 100644 index 00000000..919af529 --- /dev/null +++ b/integration_tests/models/plugins/athena/athena_external.yml @@ -0,0 +1,99 @@ +version: 2 + +sources: + - name: athena_external + schema: "{{ target.schema }}" + tables: + - name: people_csv_unpartitioned + external: &csv-people + location: "s3://dbt-external-tables-testing/csv/" + row_format: serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + table_properties: "('skip.header.line.count'='1')" + columns: &cols-of-the-people + - name: id + data_type: int + - name: first_name + data_type: varchar(64) + - name: last_name + data_type: varchar(64) + - name: email + data_type: varchar(64) + tests: &equal-to-the-people + - dbt_utils.equality: + compare_model: ref('people') + compare_columns: + - id + - first_name + - last_name + - email + + - name: people_csv_partitioned + external: + <<: *csv-people + partitions: &parts-of-the-people + - name: section + data_type: varchar(1) + vals: ['a','b','c','d'] + path_macro: dbt_external_tables.key_value + columns: *cols-of-the-people + tests: *equal-to-the-people + + # ensure that all partitions are created + - name: people_csv_multipartitioned + external: + <<: *csv-people + location: "s3://dbt-external-tables-testing/" + partitions: + - name: file_format + data_type: varchar(4) + vals: ['csv'] + path_macro: dbt_external_tables.value_only + - name: section + data_type: varchar(1) + vals: ['a','b','c','d'] + path_macro: dbt_external_tables.key_value + - name: some_date + data_type: date + vals: + macro: dbt.dates_in_range + args: + start_date_str: '2020-01-01' + end_date_str: '2020-02-01' + in_fmt: "%Y-%m-%d" + out_fmt: "%Y-%m-%d" + path_macro: dbt_external_tables.year_month_day + - name: file_name + data_type: varchar(10) + vals: ['people', 'not_people'] + path_macro: dbt_external_tables.value_only + columns: *cols-of-the-people + + - name: people_csv_multipartitioned_hive_compatible + external: + <<: *csv-people + hive_compatible_partitions: true + location: "s3://dbt-external-tables-testing/" + partitions: + - name: file_format + data_type: varchar(4) + vals: ['csv'] + path_macro: dbt_external_tables.value_only + - name: section + data_type: varchar(1) + vals: ['a','b','c','d'] + path_macro: dbt_external_tables.key_value + - name: some_date + data_type: date + vals: + macro: dbt.dates_in_range + args: + start_date_str: '2020-01-01' + end_date_str: '2020-02-01' + in_fmt: "%Y-%m-%d" + out_fmt: "%Y-%m-%d" + path_macro: dbt_external_tables.year_month_day + - name: file_name + data_type: varchar(10) + vals: ['people', 'not_people'] + path_macro: dbt_external_tables.value_only + columns: *cols-of-the-people diff --git a/integration_tests/profiles.yml b/integration_tests/profiles.yml index 1c366bc9..4f8976c4 100644 --- a/integration_tests/profiles.yml +++ b/integration_tests/profiles.yml @@ -10,6 +10,16 @@ integration_tests: target: postgres outputs: + athena: + type: athena + database: "{{ env_var('ATHENA_TEST_DBNAME', 'AwsDataCatalog') }}" + region_name: "{{ env_var('ATHENA_AWS_REGION') }}" + s3_staging_dir: "s3://{{ env_var('ATHENA_TEST_BUCKET') }}" + work_group: "{{ env_var('ATHENA_TEST_WORKGROUP') }}" + schema: dbt_external_tables_integration_tests_athena + threads: 1 + retries: 1 + redshift: type: redshift host: "{{ env_var('REDSHIFT_TEST_HOST') }}" diff --git a/macros/plugins/athena/create_external_table.sql b/macros/plugins/athena/create_external_table.sql new file mode 100644 index 00000000..8a4d7a50 --- /dev/null +++ b/macros/plugins/athena/create_external_table.sql @@ -0,0 +1,35 @@ +{% macro athena__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + + create external table {{source(source_node.source_name, source_node.name).render_hive()}} ( + {% for column in columns %} + {{column.name}} {{column.data_type}} + {{- ',' if not loop.last -}} + {% endfor %} + ) + {% if external.comment -%} comment '{{external.comment}}' {%- endif %} + {% if external.partitions -%} + partitioned by ( + {% for partition in external.partitions %} + {{partition.name}} {{partition.data_type}} + {%- if partition.comment %} comment '{{partition.comment}}' {%- endif -%} + {{- ', ' if not loop.last -}} + {% endfor %} + ) + {%- endif %} + {% if external.clusters and external.num_buckets -%} + clustered by ( + {%- for column in external.clusters -%} + {{column}}{{', ' if not loop.last}} + {%- endfor -%} + ) into num_buckets {{external.num_buckets}} + {%- endif %} + {% if external.row_format -%} row format {{external.row_format}} {%- endif %} + {% if external.file_format -%} stored as {{external.file_format}} {%- endif %} + {% if external.serde_properties -%} with serdeproperties {{external.serde_properties}} {%- endif %} + {% if external.location -%} location '{{external.location}}' {%- endif %} + {% if external.table_properties -%} tblproperties {{external.table_properties}} {%- endif %} + ; +{% endmacro %} \ No newline at end of file diff --git a/macros/plugins/athena/get_external_build_plan.sql b/macros/plugins/athena/get_external_build_plan.sql new file mode 100644 index 00000000..84689fb9 --- /dev/null +++ b/macros/plugins/athena/get_external_build_plan.sql @@ -0,0 +1,19 @@ +{% macro athena__get_external_build_plan(source_node) %} + {% set build_plan = [] %} + {% set old_relation = adapter.get_relation( + database = source_node.database, + schema = source_node.schema, + identifier = source_node.identifier + ) %} + {% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %} + {% if create_or_replace %} + {% set build_plan = [ + dbt_external_tables.dropif(source_node), + dbt_external_tables.create_external_table(source_node) + ] + dbt_external_tables.refresh_external_table(source_node) + %} + {% else %} + {% set build_plan = dbt_external_tables.refresh_external_table(source_node) %} + {% endif %} + {% do return(build_plan) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/plugins/athena/helpers/dropif.sql b/macros/plugins/athena/helpers/dropif.sql new file mode 100644 index 00000000..fe9cc08d --- /dev/null +++ b/macros/plugins/athena/helpers/dropif.sql @@ -0,0 +1,6 @@ +{% macro athena__dropif(node) %} + {% set ddl %} + drop table if exists {{source(node.source_name, node.name).render_hive()}} + {% endset %} + {{return(ddl)}} +{% endmacro %} \ No newline at end of file diff --git a/macros/plugins/athena/refresh_external_tables.sql b/macros/plugins/athena/refresh_external_tables.sql new file mode 100644 index 00000000..65c0822f --- /dev/null +++ b/macros/plugins/athena/refresh_external_tables.sql @@ -0,0 +1,61 @@ +{% macro athena__refresh_external_table(source_node) %} + {# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #} + {%- set partitions = source_node.external.partitions -%} + {%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%} + {%- if partitions -%} + {%- if hive_compatible_partitions -%} + {% set ddl -%} + msck repair table {{source(source_node.source_name, source_node.name).render_hive()}} + {%- endset %} + {{ return([ddl]) }} + {% else %} + {# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #} + {%- set part_len = partitions|length -%} + {%- set get_partitions_sql -%} + select * from + {%- for partition in partitions %} ( + {%- set part_num = loop.index -%} + {%- if partition.vals.macro -%} + {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} + {%- elif partition.vals is string -%} + {%- set vals = [partition.vals] -%} + {%- else -%} + {%- set vals = partition.vals -%} + {%- endif -%} + {%- for val in vals %} + select + '"{{ partition.name }}"' as name_{{ part_num }}, + '"{{ val }}"' as val_{{ part_num }}, + '"{{ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) }}"' as path_{{ part_num }} + {{ 'union all' if not loop.last else ') ' }} + {%- endfor -%} + {{ 'cross join' if not loop.last }} + {%- endfor -%} + {%- endset -%} + {%- set finals = [] -%} + {%- if execute -%} + {%- set results = run_query(get_partitions_sql) -%} + {%- for row in results -%} + {%- set partition_parts = [] -%} + {%- set path_parts = [] -%} + {%- for i in range(0, part_len) -%} + {%- do partition_parts.append({ + 'name': row[i * 3][1:-1], + 'value': row[i * 3 + 1][1:-1] + }) -%} + {%- do path_parts.append(row[i * 3 + 2][1:-1]) -%} + {%- endfor -%} + {%- set construct = { + 'partition_by': partition_parts, + 'path': path_parts | join('/') + } -%} + {% do finals.append(construct) %} + {%- endfor -%} + {%- endif -%} + {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals, is_athena=True) -%} + {{ return(ddl) }} + {% do return([]) %} + {% endif %} + {% endif %} + {% do return([]) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/plugins/redshift/helpers/add_partitions.sql b/macros/plugins/redshift/helpers/add_partitions.sql index 844c6605..69ce207a 100644 --- a/macros/plugins/redshift/helpers/add_partitions.sql +++ b/macros/plugins/redshift/helpers/add_partitions.sql @@ -18,7 +18,7 @@ - path (string): The path to be added as a partition for the particular combination of columns defined in the 'partition_by' #} -{% macro redshift_alter_table_add_partitions(source_node, partitions) %} +{% macro redshift_alter_table_add_partitions(source_node, partitions, is_athena=False) %} {{ log("Generating ADD PARTITION statement for partition set between " ~ partitions[0]['path'] ~ " and " ~ (partitions|last)['path']) }} @@ -26,9 +26,9 @@ {% set ddl = [] %} {% if partitions|length > 0 %} - + {% set alter_table_add %} - alter table {{source(source_node.source_name, source_node.name)}} add if not exists + alter table {{source(source_node.source_name, source_node.name)['render_hive' if is_athena else 'render']()}} add if not exists {% endset %} {%- set alters -%} diff --git a/run_test.sh b/run_test.sh index e759c89f..03fdc48f 100755 --- a/run_test.sh +++ b/run_test.sh @@ -15,6 +15,10 @@ if [[ ! -f $VENV ]]; then then echo "Installing dbt-sqlserver" pip install dbt-sqlserver --upgrade --pre + elif [ $1 == 'athena' ] + then + echo "Installing dbt-athena" + pip install dbt-athena-community>=1.6.1 --upgrade --pre else echo "Installing dbt-$1" pip install dbt-$1 --upgrade --pre diff --git a/sample_sources/athena.yml b/sample_sources/athena.yml new file mode 100644 index 00000000..95bf859a --- /dev/null +++ b/sample_sources/athena.yml @@ -0,0 +1,61 @@ +version: 2 + +sources: + - name: dbt + tables: + - name: hrsl + database: awscatalog + schema: dbt + external: + location: 's3://dataforgood-fb-data/csv/' + row_format: "SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'" + serde_properties: "('serialization.format' = '\t\t', 'field.delim' = '\t')" + table_properties: ('has_encrypted_data'='false') + hive_compatible_partitions: true + partitions: + - name: month + data_type: string + - name: country + data_type: string + - name: type + data_type: string + columns: + - name: latitude + data_type: float + - name: longitude + data_type: float + - name: population + data_type: float + + - name: planet_history + database: awscatalog + schema: dbt + external: + location: 's3://osm-pds/planet/' + file_format: ORCFILE + comment: "This is a table comment" + columns: + - name: id + data_type: 'BIGINT' + - name: type + data_type: 'STRING' + - name: tags + data_type: 'MAP' + - name: lat + data_type: 'DECIMAL(9,7)' + - name: lon + data_type: 'DECIMAL(10,7)' + - name: nds + data_type: 'ARRAY>' + - name: members + data_type: 'ARRAY>' + - name: changeset + data_type: 'BIGINT' + - name: timestamp + data_type: 'TIMESTAMP' + - name: uid + data_type: 'BIGINT' + - name: user + data_type: 'STRING' + - name: version + data_type: 'BIGINT' \ No newline at end of file