Skip to content

Commit

Permalink
Add support for skipping views when migrating tables and views (#2343)
Browse files Browse the repository at this point in the history
## Changes
Add support for skipping views when migrating tables and views

### Linked issues
Resolves #1937 

### Functionality
- [ ] modified existing command: `databricks labs ucx skip`

### Tests
- [x] added unit tests
- [x] evolved integration tests

---------

Co-authored-by: Eric Vergnaud <[email protected]>
  • Loading branch information
ericvergnaud and Eric Vergnaud authored Aug 12, 2024
1 parent 3fe030a commit ea57c5e
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def skip(w: WorkspaceClient, schema: str | None = None, table: str | None = None
return None
ctx = WorkspaceContext(w)
if table:
return ctx.table_mapping.skip_table(schema, table)
return ctx.table_mapping.skip_table_or_view(schema, table, ctx.tables_crawler.load_one)
return ctx.table_mapping.skip_schema(schema)


Expand Down
17 changes: 11 additions & 6 deletions src/databricks/labs/ucx/hive_metastore/mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re
from collections.abc import Collection
from collections.abc import Collection, Callable
from dataclasses import dataclass
from functools import partial

Expand Down Expand Up @@ -116,19 +116,24 @@ def load(self) -> list[Rule]:
msg = "Please run: databricks labs ucx table-mapping"
raise ValueError(msg) from None

def skip_table(self, schema: str, table: str):
def skip_table_or_view(self, schema_name: str, table_name: str, load_table: Callable[[str, str], Table | None]):
# Marks a table to be skipped in the migration process by applying a table property
try:
table = load_table(schema_name, table_name)
if table is None:
raise NotFound("[TABLE_OR_VIEW_NOT_FOUND]")
self._sql_backend.execute(
f"ALTER TABLE {escape_sql_identifier(schema)}.{escape_sql_identifier(table)} SET TBLPROPERTIES('{self.UCX_SKIP_PROPERTY}' = true)"
f"ALTER {table.kind} {escape_sql_identifier(schema_name)}.{escape_sql_identifier(table_name)} SET TBLPROPERTIES('{self.UCX_SKIP_PROPERTY}' = true)"
)
except NotFound as err:
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err) or "[DELTA_TABLE_NOT_FOUND]" in str(err):
logger.error(f"Failed to apply skip marker for Table {schema}.{table}. Table not found.")
logger.error(f"Failed to apply skip marker for Table {schema_name}.{table_name}. Table not found.")
else:
logger.error(f"Failed to apply skip marker for Table {schema}.{table}: {err!s}", exc_info=True)
logger.error(
f"Failed to apply skip marker for Table {schema_name}.{table_name}: {err!s}", exc_info=True
)
except BadRequest as err:
logger.error(f"Failed to apply skip marker for Table {schema}.{table}: {err!s}", exc_info=True)
logger.error(f"Failed to apply skip marker for Table {schema_name}.{table_name}: {err!s}", exc_info=True)

def skip_schema(self, schema: str):
# Marks a schema to be skipped in the migration process by applying a table property
Expand Down
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def key(self) -> str:
def safe_sql_key(self) -> str:
return escape_sql_identifier(self.key)

@property
def full_name(self) -> str:
return f"{self.catalog}.{self.database}.{self.name}"

def __hash__(self):
return hash(self.key)

Expand Down Expand Up @@ -362,6 +366,12 @@ def snapshot(self) -> list[Table]:
"""
return self._snapshot(partial(self._try_load), partial(self._crawl))

def load_one(self, schema_name: str, table_name: str) -> Table | None:
query = f"SELECT * FROM {escape_sql_identifier(self.full_name)} WHERE database='{schema_name}' AND name='{table_name}' LIMIT 1"
for row in self._fetch(query):
return Table(*row)
return None

@staticmethod
def _parse_table_props(tbl_props: str) -> dict:
pattern = r"([^,\[\]]+)=([^,\[\]]+)"
Expand Down
63 changes: 47 additions & 16 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,30 +370,61 @@ def test_revert_migrated_table(sql_backend, runtime_ctx, make_catalog):

@retried(on=[NotFound], timeout=timedelta(minutes=5))
def test_mapping_skips_tables_databases(ws, sql_backend, runtime_ctx, make_catalog):
src_schema1 = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_schema2 = runtime_ctx.make_schema(catalog_name="hive_metastore")
table_to_migrate = runtime_ctx.make_table(schema_name=src_schema1.name)
table_databricks_dataset = runtime_ctx.make_table(
schema_name=src_schema1.name, external_csv="dbfs:/databricks-datasets/adult/adult.data"
# using lists to avoid MyPi 'too-many-variables' error
src_schemas = [
runtime_ctx.make_schema(catalog_name="hive_metastore"),
runtime_ctx.make_schema(catalog_name="hive_metastore"),
]
src_tables = [
runtime_ctx.make_table(schema_name=src_schemas[0].name), # table to migrate
runtime_ctx.make_table(
schema_name=src_schemas[0].name, external_csv="dbfs:/databricks-datasets/adult/adult.data"
), # table databricks dataset
runtime_ctx.make_table(schema_name=src_schemas[0].name), # table to skip
runtime_ctx.make_table(schema_name=src_schemas[1].name), # table in skipped database
]
src_tables.extend(
[
runtime_ctx.make_table(
schema_name=src_schemas[0].name,
ctas=f"SELECT * FROM {src_tables[0].full_name}",
view=True,
), # view to migrate
runtime_ctx.make_table(
schema_name=src_schemas[0].name,
ctas=f"SELECT * FROM {src_tables[2].full_name}",
view=True,
), # view to skip
runtime_ctx.make_table(
schema_name=src_schemas[1].name,
ctas=f"SELECT * FROM {src_tables[3].full_name}",
view=True,
), # view in schema to skip
]
)
table_to_skip = runtime_ctx.make_table(schema_name=src_schema1.name)
table_in_skipped_database = runtime_ctx.make_table(schema_name=src_schema2.name)

dst_catalog = make_catalog()
dst_schema1 = runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema1.name)
dst_schema2 = runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema2.name)
dst_schemas = [
runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema.name) for src_schema in src_schemas
]

rules = [
Rule.from_src_dst(table_to_migrate, dst_schema1),
Rule.from_src_dst(table_to_skip, dst_schema1),
Rule.from_src_dst(table_databricks_dataset, dst_schema1),
Rule.from_src_dst(table_in_skipped_database, dst_schema2),
Rule.from_src_dst(src_tables[0], dst_schemas[0]),
Rule.from_src_dst(src_tables[1], dst_schemas[0]),
Rule.from_src_dst(src_tables[2], dst_schemas[0]),
Rule.from_src_dst(src_tables[3], dst_schemas[1]),
Rule.from_src_dst(src_tables[4], dst_schemas[0]),
Rule.from_src_dst(src_tables[5], dst_schemas[0]),
Rule.from_src_dst(src_tables[6], dst_schemas[1]),
]
runtime_ctx.with_table_mapping_rules(rules)
table_mapping = runtime_ctx.table_mapping
table_mapping.skip_table(src_schema1.name, table_to_skip.name)
table_mapping.skip_schema(src_schema2.name)
assert len(table_mapping.get_tables_to_migrate(runtime_ctx.tables_crawler)) == 1
table_mapping.skip_table_or_view(src_schemas[0].name, src_tables[2].name, is_view=False)
table_mapping.skip_table_or_view(src_schemas[0].name, src_tables[5].name, is_view=True)
table_mapping.skip_schema(src_schemas[1].name)
tables_to_migrate = table_mapping.get_tables_to_migrate(runtime_ctx.tables_crawler)
full_names = set(tm.src.full_name for tm in tables_to_migrate)
assert full_names == {src_tables[0].full_name, src_tables[4].full_name}


@retried(on=[NotFound], timeout=timedelta(minutes=2))
Expand Down
1 change: 1 addition & 0 deletions tests/integration/hive_metastore/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_describe_all_tables_in_databases(ws, sql_backend, inventory_schema, mak
assert all_tables[view.full_name].object_type == "VIEW"
assert all_tables[view.full_name].view_text == "SELECT 2+2 AS four"
assert all_tables[view.full_name].what == What.VIEW
assert tables.load_one(view.schema_name, view.name).what == What.VIEW


@retried(on=[NotFound], timeout=timedelta(minutes=2))
Expand Down
11 changes: 9 additions & 2 deletions tests/unit/hive_metastore/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,16 @@ def test_skip_happy_path(caplog):
sbe = create_autospec(SqlBackend)
installation = MockInstallation()
mapping = TableMapping(installation, ws, sbe)
mapping.skip_table(schema="schema", table="table")
table = Table(catalog="catalog", database="schema", name="table", object_type="table", table_format="csv")
mapping.skip_table_or_view(schema_name="schema", table_name="table", load_table=lambda _schema, _table: table)
ws.tables.get.assert_not_called()
sbe.execute.assert_called_with(f"ALTER TABLE schema.table SET TBLPROPERTIES('{mapping.UCX_SKIP_PROPERTY}' = true)")
view = Table(
catalog="catalog", database="schema", name="table", object_type="table", table_format="csv", view_text="stuff"
)
mapping.skip_table_or_view(schema_name="schema", table_name="view", load_table=lambda _schema, _table: view)
ws.tables.get.assert_not_called()
sbe.execute.assert_called_with(f"ALTER VIEW schema.view SET TBLPROPERTIES('{mapping.UCX_SKIP_PROPERTY}' = true)")
assert len(caplog.records) == 0
mapping.skip_schema(schema="schema")
sbe.execute.assert_called_with(f"ALTER SCHEMA schema SET DBPROPERTIES('{mapping.UCX_SKIP_PROPERTY}' = true)")
Expand All @@ -217,7 +224,7 @@ def test_skip_missing_table(caplog):
installation = MockInstallation()
sbe.execute.side_effect = NotFound("[TABLE_OR_VIEW_NOT_FOUND]")
mapping = TableMapping(installation, ws, sbe)
mapping.skip_table('foo', table="table")
mapping.skip_table_or_view(schema_name='foo', table_name="table", load_table=lambda schema, table: None)
ws.tables.get.assert_not_called()
assert [rec.message for rec in caplog.records if "table not found" in rec.message.lower()]

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def test_skip_with_table(ws):

ws.statement_execution.execute_statement.assert_called_with(
warehouse_id='test',
statement="ALTER TABLE schema.table SET TBLPROPERTIES('databricks.labs.ucx.skip' = true)",
statement="SELECT * FROM hive_metastore.ucx.tables WHERE database='schema' AND name='table' LIMIT 1",
byte_limit=None,
catalog=None,
schema=None,
Expand Down

0 comments on commit ea57c5e

Please sign in to comment.