From 73f9f7b6ced3c79c29428e795bc0cbd0073cde3b Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 5 Jan 2024 14:01:34 +0100 Subject: [PATCH] Enceladus ECS rollback script: first version for evaluation (#2201) * Enceladus ECS rollback script, * Enceladus ECS mapping script: -m/--mapping-tables option added --- .../migration/dataset_paths_ecs_rollback.py | 292 ++++++++++++++++++ scripts/migration/dataset_paths_to_ecs.py | 53 +++- scripts/migration/menas_db.py | 3 + scripts/migration/migrate_menas.py | 2 +- 4 files changed, 339 insertions(+), 11 deletions(-) create mode 100644 scripts/migration/dataset_paths_ecs_rollback.py diff --git a/scripts/migration/dataset_paths_ecs_rollback.py b/scripts/migration/dataset_paths_ecs_rollback.py new file mode 100644 index 000000000..edb2a26f0 --- /dev/null +++ b/scripts/migration/dataset_paths_ecs_rollback.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 + +# Copyright 2023 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from typing import List + +from constants import * +from menas_db import MenasDb +from dataclasses import dataclass + +# python package needed are denoted in requirements.txt, so to fix missing dependencies, just run +# pip install -r requirements.txt + + +DEFAULT_SKIP_PREFIXES = [] +DEFAULT_DATASETS_ONLY = False + +MAPPING_FIELD_HDFS_PATH = "hdfsPath" +MAPPING_FIELD_HDFS_PUBLISH_PATH = "hdfsPublishPath" +MAPPING_FIELD_HDFS_ALL = "all" + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog='dataset_paths_ecs_rollback', + description='Menas MongoDB rollback script changes to ECS', + formatter_class=argparse.ArgumentDefaultsHelpFormatter # prints default values, too, on help (-h) + ) + + parser.add_argument('-n', '--dryrun', action='store_true', default=DEFAULT_DRYRUN, + help="If specified, skip the actual changes, just print what would be done.") + parser.add_argument('-v', '--verbose', action="store_true", default=DEFAULT_VERBOSE, + help="Prints extra information while running.") + + parser.add_argument('target', metavar="TARGET", help="connection string for target MongoDB") + parser.add_argument('-t', '--target-database', dest="targetdb", default=DEFAULT_DB_NAME, + help="Name of db on target to be affected.") + + parser.add_argument('-s', '--skip-prefixes', dest="skipprefixes", metavar="SKIP_PREFIX", default=DEFAULT_SKIP_PREFIXES, + nargs="+", help="Paths with these prefixes will be skipped from rollback.") + + parser.add_argument('-f', '--fields-to-map', dest='fieldstomap', choices=[MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL], + default=MAPPING_FIELD_HDFS_ALL, help="Rollback either item's 'hdfsPath', 'hdfsPublishPath' or 'all'.") + + parser.add_argument('-o', '--only-datasets', dest='onlydatasets', action='store_true', default=DEFAULT_DATASETS_ONLY, + help="if specified, only dataset rollback path changes will be done (not MTs).") + + input_options_group = parser.add_mutually_exclusive_group(required=True) + input_options_group.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], + nargs="+", help='list datasets names to rollback path changes in') + input_options_group.add_argument('-m', '--mapping-tables', dest="mtables", metavar="MTABLE_NAME", default=[], + nargs="+", help='list mapping tables names to rollback path changes in') + + + return parser.parse_args() + + +@dataclass +class RollbackSettings: + skip_prefixes: List[str] + fields_to_map: str # HDFS_MAPPING_FIELD_HDFS_* + + +def update_data_for_item(item: dict, rollback_settings: RollbackSettings) -> (dict, dict): + """ + Prepares dicts of set and unset fields based on the item and rollback settings. + For hdfsPath to be rollbacked, the following conditions must be met: + - bakHdfsPath must exits on item + - bakHdfsPath must not start with any of prefixes given in rollback_settings.skip_prefixes + - rollback_settings.skip_prefixes must be MAPPING_FIELD_HDFS_PATH or MAPPING_FIELD_HDFS_ALL + + For hdfsPublishPath to be rollbacked, the following conditions must be met: + - bakHdfsPublishPath must exits on item + - bakHdfsPublishPath must not start with any of prefixes given in rollback_settings.skip_prefixes + - rollback_settings.skip_prefixes must be MAPPING_FIELD_HDFS_PUBLISH_PATH or MAPPING_FIELD_HDFS_ALL + + :param item: + :param rollback_settings: + :return: dict of fields to be set on item, dict of fields to be unset on item + """ + def starts_with_one_of_prefixes(path: str, prefixes: List[str]) -> bool: + return any(path.startswith(prefix) for prefix in prefixes) + + data_update_set = {} + data_update_unset = {} + + # rollback bakHdfsPath -> hdfsPath + has_bak_hdfs_path = "bakHdfsPath" in item + if has_bak_hdfs_path: + bak_hdfs_path = item["bakHdfsPath"] + hdfs_to_be_rollbacked = True if rollback_settings.fields_to_map in {MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_ALL} else False + + if hdfs_to_be_rollbacked and not starts_with_one_of_prefixes(bak_hdfs_path, rollback_settings.skip_prefixes): + data_update_set["hdfsPath"] = bak_hdfs_path + data_update_unset["bakHdfsPath"] = "" + + # rollback bakHdfsPublishPath -> hdfsPublishPath + has_bak_hdfs_publish_path = "bakHdfsPublishPath" in item + if has_bak_hdfs_publish_path: + bak_hdfs_publish_path = item["bakHdfsPublishPath"] + hdfs_publish_to_be_rollbacked = True if rollback_settings.fields_to_map in {MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL} else False + + if hdfs_publish_to_be_rollbacked and not starts_with_one_of_prefixes(bak_hdfs_publish_path, rollback_settings.skip_prefixes): + data_update_set["hdfsPublishPath"] = bak_hdfs_publish_path + data_update_unset["bakHdfsPublishPath"] = "" + + return data_update_set, data_update_unset + + +def data_update_to_nice_string(data_update: dict) -> str: + mappings = [] + + # hdfsPath + if "hdfsPath" in data_update: + hdfs_path = data_update["hdfsPath"] + mappings.append(f"hdfsPath: -> {hdfs_path}") + + # hdfsPublishPath + if "hdfsPublishPath" in data_update: + hdfs_publish_path = data_update["hdfsPublishPath"] + mappings.append(f"hdfsPublishPath: -> {hdfs_publish_path}") + + return ", ".join(mappings) + +def rollback_pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: str, entity_names_list: List[str], + mapping_settings: RollbackSettings, dryrun: bool) -> None: + + assert entity_name == "dataset" or entity_name == "mapping table", "this method supports datasets and MTs only!" + + if not entity_names_list: + print("No {}s to rollback path-changes in {}, skipping.".format(entity_name, collection_name)) + return + + print("Rollbacking path-change of collection {} started".format(collection_name)) + dataset_collection = target_db.mongodb[collection_name] + + query = {"$and": [ + {"name": {"$in": entity_names_list}} # dataset/MT name + ]} + + docs_count = dataset_collection.count_documents(query) + docs = dataset_collection.find(query) + + print("Found: {} {} documents for a potential path change. In progress ... ".format(docs_count, entity_name)) + + patched = 0 + failed_count = 0 + for item in docs: + # item preview + if verbose: + print("Rollbacking path changes for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + + update_data_set, update_data_unset = update_data_for_item(item, mapping_settings) + if update_data_set != {} and update_data_unset != {}: + if dryrun: + print(" *would rollback* {}".format(data_update_to_nice_string(update_data_set))) + print("") + + else: + try: + if verbose: + print(" *rollbacking*: {}".format(data_update_to_nice_string(update_data_set))) + + update_result = dataset_collection.update_one( + {"$and": [ + {"_id": item["_id"]} + ]}, + {"$set": update_data_set, "$unset": update_data_unset} + ) + if update_result.acknowledged and verbose: + print("Successfully rollbacked changed path for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + print("") + + except Exception as e: + print("Warning: Error while change-path rollbacking for {} '{}' v{} (_id={}): {}".format(entity_name, item["name"], item["version"], item["_id"], e)) + failed_count += 1 + else: + patched += 1 + else: + if verbose: + print(" Nothing left to rollback for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + print("") + + print("Successfully rollbacked {} of {} {} entries, failed: {}".format(patched, docs_count, entity_name, failed_count)) + print("") + + +def rollback_pathchange_collections_by_ds_names(target_db: MenasDb, + supplied_ds_names: List[str], + mapping_settings: RollbackSettings, + onlydatasets: bool, + dryrun: bool) -> None: + + if verbose: + print("Dataset names given: {}".format(supplied_ds_names)) + + ds_names_found = target_db.get_distinct_ds_names_from_ds_names(supplied_ds_names, migration_free_only=False) + print('Dataset names to rollback path-changes (actually found db): {}'.format(ds_names_found)) + + if onlydatasets: + print("Configured *NOT* to rollback path-changes for related mapping tables.") + else: + mapping_table_found_for_dss = target_db.get_distinct_mapping_tables_from_ds_names(ds_names_found, migration_free_only=False) + print('MTs for path-change rollback: {}'.format(mapping_table_found_for_dss)) + + + print("") + rollback_pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_settings, dryrun) + if not onlydatasets: + rollback_pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_settings, dryrun) + + + +def rollback_pathchange_collections_by_mt_names(target_db: MenasDb, + supplied_mt_names: List[str], + mapping_settings: RollbackSettings, + dryrun: bool) -> None: + + if verbose: + print("Mapping table names given: {}".format(supplied_mt_names)) + + mt_names_found = target_db.get_distinct_mt_names_from_mt_names(supplied_mt_names, migration_free_only=False) + print('Mapping table names to rollback path-changes (actually found db): {}'.format(mt_names_found)) + + print("") + rollback_pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mt_names_found, mapping_settings, dryrun) + + +def run(parsed_args: argparse.Namespace): + target_conn_string = parsed_args.target + target_db_name = parsed_args.targetdb + + dryrun = args.dryrun # if set, only path change rollback description will be printed, no actual patching will run + + skip_prefixes = args.skipprefixes + fields_to_map = args.fieldstomap # argparse allow only one of HDFS_MAPPING_FIELD_HDFS_* + rollback_settings = RollbackSettings(skip_prefixes, fields_to_map) + + print('Menas mongo ECS paths mapping ROLLBACK') + print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) + print('Skipping prefixes: {}'.format(skip_prefixes)) + print(' target connection-string: {}'.format(target_conn_string)) + print(' target DB: {}'.format(target_db_name)) + target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose) + + dataset_names = parsed_args.datasets + only_datasets = parsed_args.onlydatasets + mt_names = parsed_args.mtables + + if dataset_names: + print('Dataset names supplied: {}'.format(dataset_names)) + rollback_pathchange_collections_by_ds_names(target_db, dataset_names, rollback_settings, only_datasets, dryrun=dryrun) + + elif mt_names: + if only_datasets: + raise Exception("Invalid run options: -o/--only-datasets cannot be used with -m/--mapping-tables, only for -d/--datasets") + + print('Mapping table names supplied: {}'.format(mt_names)) + rollback_pathchange_collections_by_mt_names(target_db, mt_names, rollback_settings, dryrun=dryrun) + + else: + # should not happen (-d/-m is exclusive and required) + raise Exception("Invalid run options: DS names (-d ds1 ds2 ...).., MT names (-m mt1 mt2 ... ) must be given.") + + print("Done.") + print("") + + +if __name__ == '__main__': + args = parse_args() + + # globals script vars + verbose = args.verbose + run(args) + + ## Examples runs: + # Dry-run example: + # python dataset_paths_ecs_rollback.py mongodb://localhost:27017/admin -d MyDataset1 AnotherDatasetB -t menas_remap_test -n + # Verbose run example + # python dataset_paths_ecs_rollback.py mongodb://localhost:27017/admin -d DatasetA -t menas_remap_test -v diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index c0f47b5c0..6c279525f 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -44,9 +44,9 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument('-n', '--dryrun', action='store_true', default=DEFAULT_DRYRUN, - help="if specified, skip the actual changes, just print what would be done.") + help="If specified, skip the actual changes, just print what would be done.") parser.add_argument('-v', '--verbose', action="store_true", default=DEFAULT_VERBOSE, - help="prints extra information while running.") + help="Prints extra information while running.") parser.add_argument('target', metavar="TARGET", help="connection string for target MongoDB") parser.add_argument('-t', '--target-database', dest="targetdb", default=DEFAULT_DB_NAME, @@ -56,20 +56,23 @@ def parse_args() -> argparse.Namespace: help="Service URL to use for path change mapping.") parser.add_argument('-p', '--mapping-prefix', dest="mappingprefix", default=DEFAULT_MAPPING_PREFIX, - help="This prefix will be prepended to mapped path by the Mapping service") + help="This prefix will be prepended to mapped path by the Mapping service.") parser.add_argument('-s', '--skip-prefixes', dest="skipprefixes", metavar="SKIP_PREFIX", default=DEFAULT_SKIP_PREFIXES, - nargs="+", help="Path with these prefixes will be skipped from mapping") + nargs="+", help="Paths with these prefixes will be skipped from mapping.") parser.add_argument('-f', '--fields-to-map', dest='fieldstomap', choices=[MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL], - default=MAPPING_FIELD_HDFS_ALL, help="Map either item's 'hdfsPath', 'hdfsPublishPath' or 'all'") - - parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], - nargs="+", help='list datasets names to change paths in') + default=MAPPING_FIELD_HDFS_ALL, help="Map either item's 'hdfsPath', 'hdfsPublishPath' or 'all'.") parser.add_argument('-o', '--only-datasets', dest='onlydatasets', action='store_true', default=DEFAULT_DATASETS_ONLY, - help="if specified, mapping table changes will NOT be done.") + help="if specified, only dataset path changes will be done (not MTs). Cannot be used with -m/--mapping-tables") + + input_options_group = parser.add_mutually_exclusive_group(required=True) + input_options_group.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], + nargs="+", help='list datasets names to change paths in') + input_options_group.add_argument('-m', '--mapping-tables', dest="mtables", metavar="MTABLE_NAME", default=[], + nargs="+", help='list mapping tables names to change paths in') return parser.parse_args() @@ -232,6 +235,21 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, if not onlydatasets: pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_settings, dryrun) +def pathchange_collections_by_mt_names(target_db: MenasDb, + supplied_mt_names: List[str], + mapping_settings: MappingSettings, + dryrun: bool) -> None: + + if verbose: + print("Mapping table names given: {}".format(supplied_mt_names)) + + mt_names_found = target_db.get_distinct_mt_names_from_mt_names(supplied_mt_names, migration_free_only=False) + print('Mapping table names to path change (actually found db): {}'.format(mt_names_found)) + + print("") + pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mt_names_found, mapping_settings, dryrun) + + def pre_run_mapping_service_check(svc_url: str, path_prefix_to_add: str): test_path = "/bigdatahdfs/datalake/publish/pcub/just/a/path/to/test/the/service/" @@ -265,7 +283,22 @@ def run(parsed_args: argparse.Namespace): dataset_names = parsed_args.datasets only_datasets = parsed_args.onlydatasets - pathchange_collections_by_ds_names(target_db, dataset_names, mapping_settings, only_datasets, dryrun=dryrun) + mt_names = parsed_args.mtables + + if dataset_names: + print('Dataset names supplied: {}'.format(dataset_names)) + pathchange_collections_by_ds_names(target_db, dataset_names, mapping_settings, only_datasets, dryrun=dryrun) + + elif mt_names: + if only_datasets: + raise Exception("Invalid run options: -o/--only-datasets cannot be used with -m/--mapping-tables, only for -d/--datasets") + + print('Mapping table names supplied: {}'.format(mt_names)) + pathchange_collections_by_mt_names(target_db, mt_names, mapping_settings, dryrun=dryrun) + + else: + # should not happen (-d/-m is exclusive and required) + raise Exception("Invalid run options: DS names (-d ds1 ds2 ...).., MT names (-m mt1 mt2 ... ) must be given.") print("Done.") print("") diff --git a/scripts/migration/menas_db.py b/scripts/migration/menas_db.py index 7cd675f8f..41d5ccf1a 100644 --- a/scripts/migration/menas_db.py +++ b/scripts/migration/menas_db.py @@ -101,6 +101,9 @@ def ensure_collections_exist(collection_names: List[str]) -> None: def get_distinct_ds_names_from_ds_names(self, ds_names: List[str], migration_free_only: bool) -> List[str]: return self.get_distinct_entities_ids(ds_names, DATASET_COLLECTION, migration_free_only) + def get_distinct_mt_names_from_mt_names(self, mt_names: List[str], migration_free_only: bool) -> List[str]: + return self.get_distinct_entities_ids(mt_names, MAPPING_TABLE_COLLECTION, migration_free_only) + def get_distinct_schema_names_from_schema_names(self, schema_names: List[str], migration_free_only: bool) -> List[str]: return self.get_distinct_entities_ids(schema_names, SCHEMA_COLLECTION, migration_free_only) diff --git a/scripts/migration/migrate_menas.py b/scripts/migration/migrate_menas.py index f393b13c8..871d2e893 100755 --- a/scripts/migration/migrate_menas.py +++ b/scripts/migration/migrate_menas.py @@ -349,7 +349,7 @@ def run(parsed_args: argparse.Namespace): print('Property definition names supplied: {}'.format(propdef_names)) migrate_propdefs_by_propdef_names(source_db, target_db, propdef_names, dryrun=dryrun) else: - # should not happen (-d/-m is exclusive and required) + # should not happen (-d/-m/-p is exclusive and required) raise Exception("Invalid run options: DS names (-d ds1 ds2 ...).., MT names (-m mt1 mt2 ... ), " "or prop defs (-p prop1 prop2 ...) must be given.")