Skip to content

Commit

Permalink
Database migration making dataset uuid unique.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jul 25, 2024
1 parent 119c334 commit 46ca3b9
Showing 1 changed file with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
"""make dataset uuids unique
Revision ID: 04288b6a5b25
Revises: eee9229a9765
Create Date: 2024-07-24 10:43:48.162813
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql import text

from galaxy.model.custom_types import UUIDType
from galaxy.model.migrations.util import (
_is_sqlite,
create_unique_constraint,
drop_constraint,
)

# revision identifiers, used by Alembic.
revision = "04288b6a5b25"
down_revision = "eee9229a9765"
branch_labels = None
depends_on = None

dataset_table_name = "dataset"
uuid_column = "uuid"
unique_constraint_name = "uq_uuid_column"

new_uuid = "lower(hex(randomblob(16)))" if _is_sqlite() else "REPLACE(gen_random_uuid()::text, '-', '')"


def upgrade():
# sqlite isn't production - ignore the fake uuid please
# https://stackoverflow.com/questions/17277735/using-uuids-in-sqlite
connection = op.get_bind()

op.execute("DROP TABLE IF EXISTS duplicate_datasets_by_uuid")
op.execute("DROP TABLE IF EXISTS temp_duplicates_counts")
op.execute("DROP VIEW IF EXISTS temp_duplicate_datasets_purged")
op.execute("DROP VIEW IF EXISTS temp_duplicate_datasets_active")
op.execute("DROP TABLE IF EXISTS temp_latest_active_duplicate")
op.execute("DROP TABLE IF EXISTS temp_active_mapping")
op.execute("DROP TABLE IF EXISTS hda_datasaet_mapping_pre_uuid_condense")
op.execute("DROP TABLE IF EXISTS ldda_datasaet_mapping_pre_uuid_condense")

# Find and store duplicate UUIDs - create table temp_duplicates_counts
_setup_duplicate_counts(connection)

# Backup all dataset info for duplicated dataset table in
# table duplicate_datasets_by_uuid
_setup_backups_for_duplicated_uuids(connection)

# create views temp_duplicate_datasets_purged and temp_duplicate_datasets_active to
# join against in subsequent queries inspect this data
_setup_duplicated_dataset_views_by_purged_status(connection)

_find_latest_active_dataset_for_each_uuid(connection) # and record in temp_latest_active_duplicate

_map_active_uuids_to_latest(connection) # in temp table temp_active_mapping

_preserve_old_dataset_association_mappings(connection)

_randomize_uuids_for_purged_datasets_with_duplicated_uuids(connection)
_randomize_uuids_for_older_active_datasets_with_duplicated_uuids(connection)

_update_dataset_associations_to_point_to_latest_active_datasets(connection)

# cleanup all working tables except backups required to undo prcoess - these backups are
# duplicate_datasets_by_uuid, ldda_datasaet_mapping_pre_uuid_condense, hda_datasaet_mapping_pre_uuid_condense
op.execute("DROP VIEW IF EXISTS temp_duplicate_datasets_purged")
op.execute("DROP VIEW IF EXISTS temp_duplicate_datasets_active")
op.execute("DROP TABLE IF EXISTS temp_active_mapping")
op.execute("DROP TABLE IF EXISTS temp_latest_active_duplicate")
op.execute("DROP TABLE IF EXISTS temp_duplicates_counts")

# Add a unique constraint to the UUID column
create_unique_constraint(unique_constraint_name, dataset_table_name, [uuid_column])


def downgrade():
connection = op.get_bind()

drop_constraint(unique_constraint_name, dataset_table_name)

_restore_old_mappings(connection)
_restore_dataset_uuids(connection)

# cleanup left behind untracked tables duplicate_datasets_by_uuid, ldda_datasaet_mapping_pre_uuid_condense, hda_datasaet_mapping_pre_uuid_condense
op.execute("DROP TABLE IF EXISTS duplicate_datasets_by_uuid")
op.execute("DROP TABLE IF EXISTS ldda_datasaet_mapping_pre_uuid_condense")
op.execute("DROP TABLE IF EXISTS hda_datasaet_mapping_pre_uuid_condense")


def _restore_old_mappings(connection):
restore_hda_dataset_ids = text(
f"""
UPDATE history_dataset_association
SET dataset_id=mapping.old_dataset_id
FROM hda_datasaet_mapping_pre_uuid_condense AS mapping
WHERE mapping.id = history_dataset_association.id
"""
)
connection.execute(restore_hda_dataset_ids)
restore_ldda_dataset_ids = text(
f"""
UPDATE library_dataset_dataset_association
SET dataset_id=mapping.old_dataset_id
FROM ldda_datasaet_mapping_pre_uuid_condense as mapping
WHERE mapping.id = library_dataset_dataset_association.id
"""
)
connection.execute(restore_ldda_dataset_ids)


def _restore_dataset_uuids(connection):
restore_ldda_dataset_ids = text(
f"""
UPDATE {dataset_table_name}
SET {uuid_column}=backup_datasets.{uuid_column}
FROM duplicate_datasets_by_uuid as backup_datasets
WHERE backup_datasets.id = {dataset_table_name}.id
"""
)
connection.execute(restore_ldda_dataset_ids)


def _setup_duplicate_counts(connection):
duplicate_counts_query = text(
f"""
CREATE TEMP TABLE temp_duplicates_counts AS
SELECT {uuid_column}, COUNT(*)
FROM {dataset_table_name}
GROUP BY {uuid_column}
HAVING COUNT(*) > 1
"""
)
connection.execute(duplicate_counts_query)


def _setup_backups_for_duplicated_uuids(connection):
duplicate_datasets = text(
f"""
CREATE TABLE duplicate_datasets_by_uuid AS
SELECT *
FROM {dataset_table_name}
WHERE {uuid_column} IN (select {uuid_column} from temp_duplicates_counts)
"""
)
connection.execute(duplicate_datasets)


def _setup_duplicated_dataset_views_by_purged_status(connection):
duplicate_purged_datasets_query = text(
f"""
CREATE TEMP VIEW temp_duplicate_datasets_purged AS
SELECT *
FROM duplicate_datasets_by_uuid
WHERE purged = true
"""
)
connection.execute(duplicate_purged_datasets_query)

duplicate_active_datasets_query = text(
f"""
CREATE TEMP VIEW temp_duplicate_datasets_active AS
SELECT *
FROM duplicate_datasets_by_uuid
WHERE purged = false
"""
)
connection.execute(duplicate_active_datasets_query)
_debug(connection, "purged duplicated", text("select count(*) from temp_duplicate_datasets_purged"))
_debug(connection, "active duplicated", text("select count(*) from temp_duplicate_datasets_active"))


def _find_latest_active_dataset_for_each_uuid(connection):
latest_active_duplicate_query = text(
f"""
CREATE TEMP TABLE temp_latest_active_duplicate AS
SELECT {uuid_column}, MAX(id) as latest_dataset_id
FROM temp_duplicate_datasets_active
GROUP BY {uuid_column}
"""
)
connection.execute(latest_active_duplicate_query)
debug_query = text("select * from temp_latest_active_duplicate")
_debug(connection, "latest active table", debug_query)


def _map_active_uuids_to_latest(connection):
active_mapping_query = text(
f"""
CREATE TEMP TABLE temp_active_mapping AS
SELECT d.id as from_dataset_id, l.latest_dataset_id as to_dataset_id, l.{uuid_column} as uuid
FROM temp_duplicate_datasets_active as d
LEFT JOIN temp_latest_active_duplicate l ON d.{uuid_column} = l.{uuid_column}
"""
)
connection.execute(active_mapping_query)
debug_query = text("select * from temp_active_mapping")
_debug(connection, "temp active mapping", debug_query)
debug_query = text("select * from temp_active_mapping as m where m.from_dataset_id != m.to_dataset_id")
_debug(connection, "temp active mapping older...", debug_query)


def _randomize_uuids_for_purged_datasets_with_duplicated_uuids(connection):
updated_purged_uuids = text(
f"""
UPDATE {dataset_table_name}
SET uuid={new_uuid}
WHERE {uuid_column} IN (SELECT {uuid_column} FROM temp_duplicate_datasets_purged) AND purged = true
"""
)
connection.execute(updated_purged_uuids)


def _randomize_uuids_for_older_active_datasets_with_duplicated_uuids(connection):
# sanity check...
unique_datasets_with_uuid_of_latest_active_uuid = text(
f"""
SELECT COUNT(*)
FROM {dataset_table_name} as d
INNER JOIN temp_active_mapping AS a ON d.uuid = a.uuid
GROUP BY d.{uuid_column}
HAVING COUNT(*) = 1
"""
)
duplicate_datasets_with_uuid_of_latest_active_uuid = text(
f"""
SELECT COUNT(*)
FROM {dataset_table_name} as d
INNER JOIN temp_active_mapping AS a ON d.uuid = a.uuid
GROUP BY d.{uuid_column}
HAVING COUNT(*) > 1
"""
)
result = connection.execute(unique_datasets_with_uuid_of_latest_active_uuid)
print(f"(before) unique_datasets_with_uuid_of_latest_active_uuid {result.all()}")
result = connection.execute(duplicate_datasets_with_uuid_of_latest_active_uuid)
print(f"(before) duplicate_datasets_with_uuid_of_latest_active_uuid {result.all()}")

update_older_datasets_alt = text(
f"""
UPDATE {dataset_table_name}
SET uuid={new_uuid}
WHERE EXISTS
(
SELECT *
FROM temp_active_mapping as m
where m.from_dataset_id = ID and m.from_dataset_id != m.to_dataset_id
)
"""
)
result = connection.execute(update_older_datasets_alt)
print(f"Updated older datasets {result.rowcount}")

_debug(
connection,
"(after) unique_datasets_with_uuid_of_latest_active_uuid",
unique_datasets_with_uuid_of_latest_active_uuid,
)
_debug(
connection,
"(after) duplicate_datasets_with_uuid_of_latest_active_uuid",
duplicate_datasets_with_uuid_of_latest_active_uuid,
)

duplicate_active_count = text(
f"""
SELECT COUNT(*)
FROM temp_active_mapping
"""
)
_debug(connection, "(after) duplicate_active_count", duplicate_active_count)
datasets_with_originally_duplicated_uuids = text(
f"""
SELECT COUNT(*)
FROM {dataset_table_name} as d
INNER JOIN temp_active_mapping AS a ON d.uuid = a.uuid
"""
)
_debug(connection, "(after) datasets_with_originally_duplicated_uuids", datasets_with_originally_duplicated_uuids)


def _update_dataset_associations_to_point_to_latest_active_datasets(connection):
# for others select one dataset to represent the dataset in HDAs/LDDAs
update_hda_links = text(
f"""
UPDATE history_dataset_association
SET dataset_id=t.to_dataset_id
FROM temp_active_mapping t
WHERE t.from_dataset_id = dataset_id
"""
)
connection.execute(update_hda_links)
update_ldda_links = text(
f"""
UPDATE library_dataset_dataset_association
SET dataset_id=t.to_dataset_id
FROM temp_active_mapping t
WHERE t.from_dataset_id = dataset_id
"""
)
connection.execute(update_ldda_links)


def _preserve_old_dataset_association_mappings(connection):
old_hda_mappings = text(
f"""
CREATE TABLE hda_datasaet_mapping_pre_uuid_condense AS
SELECT h.id as id, d.id as old_dataset_id
FROM history_dataset_association AS h
INNER JOIN dataset AS d ON h.dataset_id = d.id
INNER JOIN duplicate_datasets_by_uuid AS duplicates ON d.{uuid_column} = duplicates.{uuid_column}
"""
)
connection.execute(old_hda_mappings)
old_ldda_mappings = text(
f"""
CREATE TABLE ldda_datasaet_mapping_pre_uuid_condense AS
SELECT l.id as id, d.id as old_dataset_id
FROM library_dataset_dataset_association aS l
INNER JOIN dataset AS d ON l.dataset_id = d.id
INNER JOIN duplicate_datasets_by_uuid AS duplicates ON d.{uuid_column} = duplicates.{uuid_column}
"""
)
connection.execute(old_ldda_mappings)


def _debug(connection, log_msg, query):
result = connection.execute(query)
print(f"{log_msg} {result.all()}")

0 comments on commit 46ca3b9

Please sign in to comment.