Skip to content

Commit

Permalink
[BUGFIX] Add support for partition fields of type timestamp (#6)
Browse files Browse the repository at this point in the history
* fix glue partition handling (#2)

* Add support for partition fields of type timestamp

Co-authored-by: nicor88 <[email protected]>
Co-authored-by: yusuf.mahtab <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2022
1 parent 628894e commit c415681
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Upload Python Package
on:
push:
branches:
- 'master'
- 'main'

permissions:
contents: read
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
## v1.0.3
* Fix issue on fetching partitions from glue, using pagination
21 changes: 12 additions & 9 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ def clean_up_partitions(
with boto3_client_lock:
glue_client = boto3.client('glue', region_name=client.region_name)
s3_resource = boto3.resource('s3', region_name=client.region_name)
partitions = glue_client.get_partitions(
# CatalogId='123456789012', # Need to make this configurable if it is different from default AWS Account ID
DatabaseName=database_name,
TableName=table_name,
Expression=where_condition
)
p = re.compile('s3://([^/]*)/(.*)')
for partition in partitions["Partitions"]:
paginator = glue_client.get_paginator("get_partitions")
partition_params = {
"DatabaseName": database_name,
"TableName": table_name,
"Expression": where_condition,
"ExcludeColumnSchema": True,
}
partition_pg = paginator.paginate(**partition_params)
partitions = partition_pg.build_full_result().get('Partitions')
s3_rg = re.compile('s3://([^/]*)/(.*)')
for partition in partitions:
logger.debug("Deleting objects for partition '{}' at '{}'", partition["Values"], partition["StorageDescriptor"]["Location"])
m = p.match(partition["StorageDescriptor"]["Location"])
m = s3_rg.match(partition["StorageDescriptor"]["Location"])
if m is not None:
bucket_name = m.group(1)
prefix = m.group(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
{%- set value = "'" + col + "'" -%}
{%- elif column_type == 'date' -%}
{%- set value = "'" + col|string + "'" -%}
{%- elif column_type == 'timestamp' -%}
{%- set value = "'" + col|string + "'" -%}
{%- else -%}
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
package_name = "dbt-athena-community"

dbt_version = "1.0"
package_version = "1.0.2"
package_version = "1.0.3"
description = """The athena adapter plugin for dbt (data build tool)"""

if not package_version.startswith(dbt_version):
Expand Down

0 comments on commit c415681

Please sign in to comment.