Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update table-migration workflows to also capture updated migration progress into the history log #3239

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
905fefc
Remove some unnecessary code.
asnare Nov 6, 2024
c23c38a
Update table history logger to not trigger a refresh of the migration…
asnare Nov 6, 2024
9ebb26c
Consistent import.
asnare Nov 7, 2024
e861963
Update the various table-migration workflows to also update the histo…
asnare Nov 7, 2024
cfe1486
All workflows update the logs table.
asnare Nov 7, 2024
145cd7d
Table migration workflows also update the tables inventory (at the end).
asnare Nov 7, 2024
5289582
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
6e5e4ae
Switch to multi-line f""" """-string.
asnare Nov 11, 2024
778ad10
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
1ce8e05
Fix mock return value for crawler snapshot.
asnare Nov 11, 2024
78787df
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
788789f
Switch to specialisation (limited to TableProgressEncoder) for ensuri…
asnare Nov 12, 2024
891b3b7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
f9bf219
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 13, 2024
da3b15b
Back out changes relating to the way the migration-status information…
asnare Nov 13, 2024
ac8586a
Back out more changes that are either not needed or made on other PRs.
asnare Nov 13, 2024
4b3717f
Remove comment that is no longer relevant.
asnare Nov 13, 2024
c80a9c6
Verify prerequisites for updating the migration-progress prior to the…
asnare Nov 13, 2024
f638cb5
No need to mention the assessment; we won't reach this point of the w…
asnare Nov 13, 2024
2d398f4
Use TODO marker instead of warning to highlight what we'd prefer to h…
asnare Nov 13, 2024
df9f689
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 13, 2024
4e579fd
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
e4d4220
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 18, 2024
14cdc77
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Table utilization per workflow:

| Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation |
|--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------|
| tables | RW | RW | | RO | | RO | |
| tables | RW | RW | | RW | | RW | |
| grants | RW | RW | | RW | | RW | |
| mounts | RW | | | RO | RO | RO | |
| permissions | RW | | RW | RO | | RO | |
Expand All @@ -30,7 +30,7 @@ Table utilization per workflow:
| query_problems | RW | RW | | | | | |
| workflow_problems | RW | RW | | | | | |
| udfs | RW | RW | RO | | | | |
| logs | RW | | RW | RW | | RW | RW |
| logs | RW | RW | RW | RW | RW | RW | RW |
| recon_results | | | | | | | RW |

**RW** - Read/Write, the job generates or updates the table.<br/>
Expand Down
15 changes: 8 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
from collections import defaultdict
from collections.abc import Iterable
from functools import partial, cached_property

from databricks.labs.blueprint.parallel import Threads
Expand All @@ -18,8 +19,11 @@
TableMapping,
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationStatus,
TableMigrationIndex,
)
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Table,
Expand Down Expand Up @@ -56,14 +60,11 @@ def __init__(
self._migrate_grants = migrate_grants
self._external_locations = external_locations

def get_remaining_tables(self) -> list[Table]:
nfx marked this conversation as resolved.
Show resolved Hide resolved
migration_index = self.index(force_refresh=True)
table_rows = []
def check_remaining_tables(self, migration_status: Iterable[TableMigrationStatus]) -> None:
migration_index = TableMigrationIndex(migration_status)
for crawled_table in self._tables_crawler.snapshot():
if not migration_index.is_migrated(crawled_table.database, crawled_table.name):
table_rows.append(crawled_table)
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
return table_rows

def index(self, *, force_refresh: bool = False):
return self._migration_status_refresher.index(force_refresh=force_refresh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def key(self):


class TableMigrationIndex:
def __init__(self, tables: list[TableMigrationStatus]):
def __init__(self, tables: Iterable[TableMigrationStatus]):
self._index = {(ms.src_schema, ms.src_table): ms for ms in tables}

def is_migrated(self, schema: str, table: str) -> bool:
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
self._tables_crawler = tables_crawler

def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh)))
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))

def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
Expand Down
195 changes: 175 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime as dt

from databricks.labs.ucx.assessment.workflows import Assessment
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task
Expand Down Expand Up @@ -57,10 +59,50 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="table_migration")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
depends_on=[
convert_managed_table,
migrate_external_tables_sync,
migrate_dbfs_root_delta_tables,
migrate_dbfs_root_non_delta_tables,
migrate_views,
verify_prerequisites,
],
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[verify_prerequisites, update_table_inventory], job_cluster="table_migration")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(depends_on=[verify_prerequisites, update_migration_status], job_cluster="table_migration")
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateHiveSerdeTablesInPlace(Workflow):
Expand All @@ -86,10 +128,41 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="table_migration")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_prerequisites, migrate_views])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status])
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateExternalTablesCTAS(Workflow):
Expand Down Expand Up @@ -120,10 +193,41 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="table_migration")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status])
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add prereqs

"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class ScanTablesInMounts(Workflow):
Expand All @@ -137,10 +241,30 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
replacing any existing content that might be present."""
ctx.tables_in_mounts.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="table_migration")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status])
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add prereqs


@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateTablesInMounts(Workflow):
Expand All @@ -152,7 +276,38 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

@job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="table_migration")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_prerequisites, migrate_tables_in_mounts_experimental])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.check_remaining_tables(updated_migration_progress)

@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status])
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
history_log = ctx.tables_progress
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
history_log.append_inventory_snapshot(tables_snapshot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add prereqs


@job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log])
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()
Loading
Loading