diff --git a/README.md b/README.md index 717a2875..14028342 100644 --- a/README.md +++ b/README.md @@ -259,17 +259,39 @@ athena: } ``` -> Notes: +* `lf_inherited_tags` (`default=none`) + * List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be + removed during association of those defined in `lf_tags_config` + * i.e. The default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from + tables and columns before associating the ones currently defined for a given model + * This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly + * This list sits outside of `lf_tags_config` so that it can be set at the project level -- for example: + +```yaml +models: + my_project: + example: + +lf_inherited_tags: [inherited-tag-1, inherited-tag-2] +``` + +> Notes: > > * `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. - > We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use - > [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) - or - > [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. +> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use +> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or +> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. > * `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table - > which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every - > dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and - > apply changes if they occur: drop, create, update filters and their permissions. +> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every +> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and +> apply changes if they occur: drop, create, update filters and their permissions. +> * Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at + the table and column level +> * Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made +> previously +> * e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed + prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform + where the inherited value is configured nor in the DBT project where the override previously existed but now is + gone) [create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 655fb56c..2da2e83b 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -159,8 +159,10 @@ def add_lf_tags_to_database(self, relation: AthenaRelation) -> None: LOGGER.debug(f"Lakeformation is disabled for {relation}") @available - def add_lf_tags(self, relation: AthenaRelation, lf_tags_config: Dict[str, Any]) -> None: - config = LfTagsConfig(**lf_tags_config) + def add_lf_tags( + self, relation: AthenaRelation, lf_tags_config: Dict[str, Any], lf_inherited_tags: Optional[List[str]] + ) -> None: + config = LfTagsConfig(**(lf_tags_config | dict(inherited_tags=lf_inherited_tags))) if config.enabled: conn = self.connections.get_thread_connection() client = conn.handle diff --git a/dbt/adapters/athena/lakeformation.py b/dbt/adapters/athena/lakeformation.py index 9fc8047b..0804a688 100644 --- a/dbt/adapters/athena/lakeformation.py +++ b/dbt/adapters/athena/lakeformation.py @@ -1,13 +1,15 @@ """AWS Lakeformation permissions management helper utilities.""" -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Sequence, Set, Union from mypy_boto3_lakeformation import LakeFormationClient from mypy_boto3_lakeformation.type_defs import ( AddLFTagsToResourceResponseTypeDef, BatchPermissionsRequestEntryTypeDef, + ColumnLFTagTypeDef, DataCellsFilterTypeDef, GetResourceLFTagsResponseTypeDef, + LFTagPairTypeDef, RemoveLFTagsFromResourceResponseTypeDef, ResourceTypeDef, ) @@ -24,6 +26,7 @@ class LfTagsConfig(BaseModel): enabled: bool = False tags: Optional[Dict[str, str]] = None tags_columns: Optional[Dict[str, Dict[str, List[str]]]] = None + inherited_tags: List[str] = [] class LfTagsManager: @@ -33,6 +36,7 @@ def __init__(self, lf_client: LakeFormationClient, relation: AthenaRelation, lf_ self.table = relation.identifier self.lf_tags = lf_tags_config.tags self.lf_tags_columns = lf_tags_config.tags_columns + self.lf_inherited_tags = set(lf_tags_config.inherited_tags) def process_lf_tags_database(self) -> None: if self.lf_tags: @@ -49,21 +53,31 @@ def process_lf_tags(self) -> None: self._apply_lf_tags_table(table_resource, existing_lf_tags) self._apply_lf_tags_columns() + @staticmethod + def _column_tags_to_remove( + lf_tags_columns: List[ColumnLFTagTypeDef], lf_inherited_tags: Set[str] + ) -> Dict[str, Dict[str, List[str]]]: + to_remove = {} + + for column in lf_tags_columns: + non_inherited_tags = [tag for tag in column["LFTags"] if not tag["TagKey"] in lf_inherited_tags] + for tag in non_inherited_tags: + tag_key = tag["TagKey"] + tag_value = tag["TagValues"][0] + if tag_key not in to_remove: + to_remove[tag_key] = {tag_value: [column["Name"]]} + elif tag_value not in to_remove[tag_key]: + to_remove[tag_key][tag_value] = [column["Name"]] + else: + to_remove[tag_key][tag_value].append(column["Name"]) + + return to_remove + def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTypeDef) -> None: lf_tags_columns = existing_lf_tags.get("LFTagsOnColumns", []) logger.debug(f"COLUMNS: {lf_tags_columns}") if lf_tags_columns: - to_remove = {} - for column in lf_tags_columns: - for tag in column["LFTags"]: - tag_key = tag["TagKey"] - tag_value = tag["TagValues"][0] - if tag_key not in to_remove: - to_remove[tag_key] = {tag_value: [column["Name"]]} - elif tag_value not in to_remove[tag_key]: - to_remove[tag_key][tag_value] = [column["Name"]] - else: - to_remove[tag_key][tag_value].append(column["Name"]) + to_remove = LfTagsManager._column_tags_to_remove(lf_tags_columns, self.lf_inherited_tags) logger.debug(f"TO REMOVE: {to_remove}") for tag_key, tag_config in to_remove.items(): for tag_value, columns in tag_config.items(): @@ -75,6 +89,17 @@ def _remove_lf_tags_columns(self, existing_lf_tags: GetResourceLFTagsResponseTyp ) self._parse_and_log_lf_response(response, columns, {tag_key: tag_value}, "remove") + @staticmethod + def _table_tags_to_remove( + lf_tags_table: List[LFTagPairTypeDef], lf_tags: Optional[Dict[str, str]], lf_inherited_tags: Set[str] + ) -> Dict[str, Sequence[str]]: + return { + tag["TagKey"]: tag["TagValues"] + for tag in lf_tags_table + if tag["TagKey"] not in (lf_tags or {}) + if tag["TagKey"] not in lf_inherited_tags + } + def _apply_lf_tags_table( self, table_resource: ResourceTypeDef, existing_lf_tags: GetResourceLFTagsResponseTypeDef ) -> None: @@ -82,11 +107,8 @@ def _apply_lf_tags_table( logger.debug(f"EXISTING TABLE TAGS: {lf_tags_table}") logger.debug(f"CONFIG TAGS: {self.lf_tags}") - to_remove = { - tag["TagKey"]: tag["TagValues"] - for tag in lf_tags_table - if tag["TagKey"] not in self.lf_tags # type: ignore - } + to_remove = LfTagsManager._table_tags_to_remove(lf_tags_table, self.lf_tags, self.lf_inherited_tags) + logger.debug(f"TAGS TO REMOVE: {to_remove}") if to_remove: response = self.lf_client.remove_lf_tags_from_resource( diff --git a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql index a5ed6812..11350ee7 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -6,6 +6,7 @@ {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set lf_tags_config = config.get('lf_tags_config') %} + {% set lf_inherited_tags = config.get('lf_inherited_tags') %} {% set lf_grants = config.get('lf_grants') %} {% set partitioned_by = config.get('partitioned_by') %} {% set target_relation = this.incorporate(type='table') %} @@ -106,7 +107,7 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(target_relation, lf_tags_config) }} + {{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }} {% endif %} {% if lf_grants is not none %} diff --git a/dbt/include/athena/macros/materializations/models/table/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql index 989bf63b..7df54cec 100644 --- a/dbt/include/athena/macros/materializations/models/table/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -3,6 +3,7 @@ {%- set identifier = model['alias'] -%} {%- set lf_tags_config = config.get('lf_tags_config') -%} + {%- set lf_inherited_tags = config.get('lf_inherited_tags') -%} {%- set lf_grants = config.get('lf_grants') -%} {%- set table_type = config.get('table_type', default='hive') | lower -%} @@ -111,7 +112,7 @@ {{ run_hooks(post_hooks) }} {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(target_relation, lf_tags_config) }} + {{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }} {% endif %} {% if lf_grants is not none %} diff --git a/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql index ae787a81..7e4c8b2a 100644 --- a/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql +++ b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql @@ -2,6 +2,7 @@ {%- set identifier = model['alias'] -%} {%- set lf_tags_config = config.get('lf_tags_config') -%} + {%- set lf_inherited_tags = config.get('lf_inherited_tags') -%} {%- set lf_grants = config.get('lf_grants') -%} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} @@ -32,7 +33,7 @@ {%- endcall %} {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(target_relation, lf_tags_config) }} + {{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }} {% endif %} {% if lf_grants is not none %} diff --git a/dbt/include/athena/macros/materializations/seeds/helpers.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql index a789498f..6f1daaba 100644 --- a/dbt/include/athena/macros/materializations/seeds/helpers.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -91,6 +91,7 @@ {%- set identifier = model['alias'] -%} {%- set lf_tags_config = config.get('lf_tags_config') -%} + {%- set lf_inherited_tags = config.get('lf_inherited_tags') -%} {%- set lf_grants = config.get('lf_grants') -%} {%- set column_override = config.get('column_types', {}) -%} @@ -179,7 +180,7 @@ {% do adapter.delete_from_s3(tmp_s3_location) %} {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(relation, lf_tags_config) }} + {{ adapter.add_lf_tags(relation, lf_tags_config, lf_inherited_tags) }} {% endif %} {% if lf_grants is not none %} diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 08d41c57..0ad26448 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -128,6 +128,7 @@ {%- set table_type = config.get('table_type', 'hive') -%} {%- set lf_tags_config = config.get('lf_tags_config') -%} + {%- set lf_inherited_tags = config.get('lf_inherited_tags') -%} {%- set lf_grants = config.get('lf_grants') -%} {{ log('Checking if target table exists') }} @@ -230,7 +231,7 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(target_relation, lf_tags_config) }} + {{ adapter.add_lf_tags(target_relation, lf_tags_config, lf_inherited_tags) }} {% endif %} {% if lf_grants is not none %} diff --git a/tests/unit/test_lakeformation.py b/tests/unit/test_lakeformation.py index ab061c09..3a05030c 100644 --- a/tests/unit/test_lakeformation.py +++ b/tests/unit/test_lakeformation.py @@ -2,6 +2,7 @@ import pytest from tests.unit.constants import AWS_REGION, DATA_CATALOG_NAME, DATABASE_NAME +import dbt.adapters.athena.lakeformation as lakeformation from dbt.adapters.athena.lakeformation import LfTagsConfig, LfTagsManager from dbt.adapters.athena.relation import AthenaRelation @@ -74,3 +75,70 @@ def test__parse_lf_response(self, dbt_debug_caplog, response, identifier, column manager = LfTagsManager(lf_client, relation, LfTagsConfig()) manager._parse_and_log_lf_response(response, columns, lf_tags, verb) assert expected in dbt_debug_caplog.getvalue() + + @pytest.mark.parametrize( + "lf_tags_columns,lf_inherited_tags,expected", + [ + pytest.param( + [{"Name": "my_column", "LFTags": [{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}]}], + {"inherited"}, + {}, + id="retains-inherited-tag", + ), + pytest.param( + [{"Name": "my_column", "LFTags": [{"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]}]}], + {}, + {"not-inherited": {"oh-no-im-not": ["my_column"]}}, + id="removes-non-inherited-tag", + ), + pytest.param( + [ + { + "Name": "my_column", + "LFTags": [ + {"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]}, + {"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}, + ], + } + ], + {"inherited"}, + {"not-inherited": {"oh-no-im-not": ["my_column"]}}, + id="removes-non-inherited-tag-among-inherited", + ), + pytest.param([], {}, {}, id="handles-empty"), + ], + ) + def test__column_tags_to_remove(self, lf_tags_columns, lf_inherited_tags, expected): + assert lakeformation.LfTagsManager._column_tags_to_remove(lf_tags_columns, lf_inherited_tags) == expected + + @pytest.mark.parametrize( + "lf_tags_table,lf_tags,lf_inherited_tags,expected", + [ + pytest.param( + [ + {"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]}, + {"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}, + ], + {"not-inherited": "some-preexisting-value"}, + {"inherited"}, + {}, + id="retains-being-set-and-inherited", + ), + pytest.param( + [ + {"TagKey": "not-inherited", "TagValues": ["oh-no-im-not"]}, + {"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}, + ], + {}, + {"inherited"}, + {"not-inherited": ["oh-no-im-not"]}, + id="removes-preexisting-not-being-set", + ), + pytest.param( + [{"TagKey": "inherited", "TagValues": ["oh-yes-i-am"]}], {}, {"inherited"}, {}, id="retains-inherited" + ), + pytest.param([], None, {}, {}, id="handles-empty"), + ], + ) + def test__table_tags_to_remove(self, lf_tags_table, lf_tags, lf_inherited_tags, expected): + assert lakeformation.LfTagsManager._table_tags_to_remove(lf_tags_table, lf_tags, lf_inherited_tags) == expected