Skip to content

Commit

Permalink
Merge branch 'main' into fix/get-partitions-paginator
Browse files Browse the repository at this point in the history
  • Loading branch information
nicor88 authored Oct 19, 2023
2 parents 079fdbc + 502939b commit 30ad898
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github: [dbt-athena]
open_collective: dbt-athena
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,39 @@ athena:
}
```

> Notes:
* `lf_inherited_tags` (`default=none`)
* List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be
removed during association of those defined in `lf_tags_config`
* i.e. The default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from
tables and columns before associating the ones currently defined for a given model
* This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
* This list sits outside of `lf_tags_config` so that it can be set at the project level -- for example:

```yaml
models:
my_project:
example:
+lf_inherited_tags: [inherited-tag-1, inherited-tag-2]
```

> Notes:
>
> * `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions)
or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
> * `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
> * Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at
the table and column level
> * Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made
> previously
> * e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed
prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform
where the inherited value is configured nor in the DBT project where the override previously existed but now is
gone)

[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties

Expand Down
11 changes: 11 additions & 0 deletions RELEASING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# How to make a release

* open a Pull Request with a manual bump of in `main/dbt/adapters/athena/__version__.py`
* create a new release from <https://github.com/dbt-athena/dbt-athena/releases>
* be sure to use the same version as in the `__version__.py` file
* be sure to start the release with `v` e.g. v1.6.3
* tag with the same name of the release e.g. v1.6.3
* be sure to clean up release notes grouping by semantic commit type,
e.g. all feat commits should under the same section
* Once the new release is made be sure that the new package version is available in PyPI
in [dbt-athena-community](https://pypi.org/project/dbt-athena-comunity/)
2 changes: 1 addition & 1 deletion dbt/adapters/athena/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.6.2"
version = "1.6.3"
2 changes: 2 additions & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class AthenaCredentials(Credentials):
aws_profile_name: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
poll_interval: float = 1.0
debug_query_state: bool = False
_ALIASES = {"catalog": "database"}
Expand Down Expand Up @@ -84,6 +85,7 @@ def _connection_keys(self) -> Tuple[str, ...]:
"aws_profile_name",
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"endpoint_url",
"s3_data_dir",
"s3_data_naming",
Expand Down
83 changes: 63 additions & 20 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
get_catalog_id,
get_catalog_type,
get_chunks,
is_valid_table_parameter_key,
stringify_table_parameter_value,
)
from dbt.adapters.base import ConstraintSupport, available
from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import CompiledNode, ConstraintType
from dbt.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -156,8 +159,10 @@ def add_lf_tags_to_database(self, relation: AthenaRelation) -> None:
LOGGER.debug(f"Lakeformation is disabled for {relation}")

@available
def add_lf_tags(self, relation: AthenaRelation, lf_tags_config: Dict[str, Any]) -> None:
config = LfTagsConfig(**lf_tags_config)
def add_lf_tags(
self, relation: AthenaRelation, lf_tags_config: Dict[str, Any], lf_inherited_tags: Optional[List[str]]
) -> None:
config = LfTagsConfig(**(lf_tags_config | dict(inherited_tags=lf_inherited_tags)))
if config.enabled:
conn = self.connections.get_thread_connection()
client = conn.handle
Expand Down Expand Up @@ -564,7 +569,7 @@ def _get_one_catalog(
MaxResults=50, # Limit supported by this operation
):
for table in page["TableMetadataList"]:
if relations and table["Name"] in relations:
if relations and table["Name"].lower() in relations:
catalog.extend(
self._get_one_table_for_non_glue_catalog(
table, schema, information_schema.path.database
Expand Down Expand Up @@ -668,16 +673,28 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
CatalogId=src_catalog_id, DatabaseName=src_relation.schema, Name=src_relation.identifier
).get("Table")

src_table_partitions = glue_client.get_partitions(
CatalogId=src_catalog_id, DatabaseName=src_relation.schema, TableName=src_relation.identifier
).get("Partitions")
src_table_get_partitions_paginator = glue_client.get_paginator("get_partitions")
src_table_partitions_result = src_table_get_partitions_paginator.paginate(
**{
"CatalogId": src_catalog_id,
"DatabaseName": src_relation.schema,
"TableName": src_relation.identifier,
}
)
src_table_partitions = src_table_partitions_result.build_full_result().get("Partitions")

data_catalog = self._get_data_catalog(src_relation.database)
target_catalog_id = get_catalog_id(data_catalog)

target_table_partitions = glue_client.get_partitions(
CatalogId=target_catalog_id, DatabaseName=target_relation.schema, TableName=target_relation.identifier
).get("Partitions")
target_get_partitions_paginator = glue_client.get_paginator("get_partitions")
target_table_partitions_result = target_get_partitions_paginator.paginate(
**{
"CatalogId": target_catalog_id,
"DatabaseName": target_relation.schema,
"TableName": target_relation.identifier,
}
)
target_table_partitions = target_table_partitions_result.build_full_result().get("Partitions")

target_table_version = {
"Name": target_relation.identifier,
Expand Down Expand Up @@ -814,11 +831,12 @@ def persist_docs_to_glue(
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

# By default, there is no need to update Glue Table
need_udpate_table = False
need_to_update_table = False
# Get Table from Glue
table = glue_client.get_table(CatalogId=catalog_id, DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
table_input = self._get_table_input(table)
table_parameters = table_input["Parameters"]
# Update table description
if persist_relation_docs:
# Prepare dbt description
Expand All @@ -829,16 +847,40 @@ def persist_docs_to_glue(
glue_table_comment = table["Parameters"].get("comment", "")
# Update description if it's different
if clean_table_description != glue_table_description or clean_table_description != glue_table_comment:
updated_table["Description"] = clean_table_description
updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"])
updated_table_parameters["comment"] = clean_table_description
updated_table["Parameters"] = updated_table_parameters
need_udpate_table = True
table_input["Description"] = clean_table_description
table_parameters["comment"] = clean_table_description
need_to_update_table = True

# Get dbt model meta if available
meta: Dict[str, Any] = model.get("config", {}).get("meta", {})
# Add some of dbt model config fields as table meta
meta["unique_id"] = model.get("unique_id")
meta["materialized"] = model.get("config", {}).get("materialized")
# Get dbt runtime config to be able to get dbt project metadata
runtime_config: RuntimeConfig = self.config
# Add dbt project metadata to table meta
meta["dbt_project_name"] = runtime_config.project_name
meta["dbt_project_version"] = runtime_config.version
# Prepare meta values for table properties and check if update is required
for meta_key, meta_value_raw in meta.items():
if is_valid_table_parameter_key(meta_key):
meta_value = stringify_table_parameter_value(meta_value_raw)
if meta_value is not None:
# Check that meta value is already attached to Glue table
current_meta_value: Optional[str] = table_parameters.get(meta_key)
if current_meta_value is None or current_meta_value != meta_value:
# Update Glue table parameter only if needed
table_parameters[meta_key] = meta_value
need_to_update_table = True
else:
LOGGER.warning(f"Meta value for key '{meta_key}' is not supported and will be ignored")
else:
LOGGER.warning(f"Meta key '{meta_key}' is not supported and will be ignored")

# Update column comments
if persist_column_docs:
# Process every column
for col_obj in updated_table["StorageDescriptor"]["Columns"]:
for col_obj in table_input["StorageDescriptor"]["Columns"]:
# Get column description from dbt
col_name = col_obj["Name"]
if col_name in model["columns"]:
Expand All @@ -850,15 +892,16 @@ def persist_docs_to_glue(
# Update column description if it's different
if glue_col_comment != clean_col_comment:
col_obj["Comment"] = clean_col_comment
need_udpate_table = True
need_to_update_table = True

# Update Glue Table only if table/column description is modified.
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
if need_to_update_table:
table_input["Parameters"] = table_parameters
glue_client.update_table(
CatalogId=catalog_id,
DatabaseName=relation.schema,
TableInput=updated_table,
TableInput=table_input,
SkipArchive=skip_archive_table_version,
)

Expand Down
56 changes: 39 additions & 17 deletions dbt/adapters/athena/lakeformation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""AWS Lakeformation permissions management helper utilities."""

from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Sequence, Set, Union

from mypy_boto3_lakeformation import LakeFormationClient
from mypy_boto3_lakeformation.type_defs import (
AddLFTagsToResourceResponseTypeDef,
BatchPermissionsRequestEntryTypeDef,
ColumnLFTagTypeDef,
DataCellsFilterTypeDef,
GetResourceLFTagsResponseTypeDef,
LFTagPairTypeDef,
RemoveLFTagsFromResourceResponseTypeDef,
ResourceTypeDef,
)
Expand All @@ -24,6 +26,7 @@ class LfTagsConfig(BaseModel):
enabled: bool = False
tags: Optional[Dict[str, str]] = None
tags_columns: Optional[Dict[str, Dict[str, List[str]]]] = None
inherited_tags: List[str] = []


class LfTagsManager:
Expand All @@ -33,6 +36,7 @@ def __init__(self, lf_client: LakeFormationClient, relation: AthenaRelation, lf_
self.table = relation.identifier
self.lf_tags = lf_tags_config.tags
self.lf_tags_columns = lf_tags_config.tags_columns
self.lf_inherited_tags = set(lf_tags_config.inherited_tags)

def process_lf_tags_database(self) -> None:
if self.lf_tags:
Expand All @@ -49,21 +53,31 @@ def process_lf_tags(self) -> None:
self._apply_lf_tags_table(table_resource, existing_lf_tags)
self._apply_lf_tags_columns()

@staticmethod
def _column_tags_to_remove(
lf_tags_columns: List[ColumnLFTagTypeDef], lf_inherited_tags: Set[str]
) -> Dict[str, Dict[str, List[str]]]:
to_remove = {}

for column in lf_tags_columns:
non_inherited_tags = [tag for tag in column["LFTags"] if not tag["TagKey"] in lf_inherited_tags]
for tag in non_inherited_tags:
tag_key = tag["TagKey"]
tag_value = tag["TagValues"][0]
if tag_key not in to_remove:
to_remove[tag_key] = {tag_value: [column["Name"]]}
elif tag_value not in to_remove[tag_key]:
to_remove[tag_key][tag_value] = [column["Name"]]
else:
to_remove[tag_key][tag_value].append(column["Name"])

return to_remove

def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTypeDef) -> None:
lf_tags_columns = existing_lf_tags.get("LFTagsOnColumns", [])
logger.debug(f"COLUMNS: {lf_tags_columns}")
if lf_tags_columns:
to_remove = {}
for column in lf_tags_columns:
for tag in column["LFTags"]:
tag_key = tag["TagKey"]
tag_value = tag["TagValues"][0]
if tag_key not in to_remove:
to_remove[tag_key] = {tag_value: [column["Name"]]}
elif tag_value not in to_remove[tag_key]:
to_remove[tag_key][tag_value] = [column["Name"]]
else:
to_remove[tag_key][tag_value].append(column["Name"])
to_remove = LfTagsManager._column_tags_to_remove(lf_tags_columns, self.lf_inherited_tags)
logger.debug(f"TO REMOVE: {to_remove}")
for tag_key, tag_config in to_remove.items():
for tag_value, columns in tag_config.items():
Expand All @@ -75,18 +89,26 @@ def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTyp
)
self._parse_and_log_lf_response(response, columns, {tag_key: tag_value}, "remove")

@staticmethod
def _table_tags_to_remove(
lf_tags_table: List[LFTagPairTypeDef], lf_tags: Optional[Dict[str, str]], lf_inherited_tags: Set[str]
) -> Dict[str, Sequence[str]]:
return {
tag["TagKey"]: tag["TagValues"]
for tag in lf_tags_table
if tag["TagKey"] not in (lf_tags or {})
if tag["TagKey"] not in lf_inherited_tags
}

def _apply_lf_tags_table(
self, table_resource: ResourceTypeDef, existing_lf_tags: GetResourceLFTagsResponseTypeDef
) -> None:
lf_tags_table = existing_lf_tags.get("LFTagsOnTable", [])
logger.debug(f"EXISTING TABLE TAGS: {lf_tags_table}")
logger.debug(f"CONFIG TAGS: {self.lf_tags}")

to_remove = {
tag["TagKey"]: tag["TagValues"]
for tag in lf_tags_table
if tag["TagKey"] not in self.lf_tags # type: ignore
}
to_remove = LfTagsManager._table_tags_to_remove(lf_tags_table, self.lf_tags, self.lf_inherited_tags)

logger.debug(f"TAGS TO REMOVE: {to_remove}")
if to_remove:
response = self.lf_client.remove_lf_tags_from_resource(
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/athena/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def get_boto3_session(connection: Connection) -> boto3.session.Session:
return boto3.session.Session(
aws_access_key_id=connection.credentials.aws_access_key_id,
aws_secret_access_key=connection.credentials.aws_secret_access_key,
aws_session_token=connection.credentials.aws_session_token,
region_name=connection.credentials.region_name,
profile_name=connection.credentials.aws_profile_name,
)
28 changes: 27 additions & 1 deletion dbt/adapters/athena/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
import json
import re
from enum import Enum
from typing import Generator, List, Optional, TypeVar
from typing import Any, Generator, List, Optional, TypeVar

from mypy_boto3_athena.type_defs import DataCatalogTypeDef

from dbt.adapters.athena.constants import LOGGER


def clean_sql_comment(comment: str) -> str:
split_and_strip = [line.strip() for line in comment.split("\n")]
return " ".join(line for line in split_and_strip if line)


def stringify_table_parameter_value(value: Any) -> Optional[str]:
"""Convert any variable to string for Glue Table property."""
try:
if isinstance(value, (dict, list)):
value_str: str = json.dumps(value)
else:
value_str = str(value)
return value_str[:512000]
except (TypeError, ValueError) as e:
# Handle non-stringifiable objects and non-serializable objects
LOGGER.warning(f"Non-stringifiable object. Error: {str(e)}")
return None


def is_valid_table_parameter_key(key: str) -> bool:
"""Check if key is valid for Glue Table property according to official documentation."""
# Simplified version of key pattern which works with re
# Original pattern can be found here https://docs.aws.amazon.com/glue/latest/webapi/API_Table.html
key_pattern: str = r"^[\u0020-\uD7FF\uE000-\uFFFD\t]*$"
return len(key) <= 255 and bool(re.match(key_pattern, key))


def get_catalog_id(catalog: Optional[DataCatalogTypeDef]) -> Optional[str]:
return catalog["Parameters"]["catalog-id"] if catalog and catalog["Type"] == AthenaCatalogType.GLUE.value else None

Expand Down
Loading

0 comments on commit 30ad898

Please sign in to comment.