diff --git a/discovery/utils/backup.py b/discovery/utils/backup.py index 7969d7e9..d24cdaae 100644 --- a/discovery/utils/backup.py +++ b/discovery/utils/backup.py @@ -3,14 +3,15 @@ import zipfile import io from datetime import date, datetime +from typing import Union, List, Tuple import boto3 from discovery.model.dataset import Dataset from discovery.model.schema import Schema, SchemaClass -from discovery.utils import indices +from discovery.utils.indices import reset -logging.basicConfig(level="INFO") +logger = logging.getLogger("backup") def json_serial(obj): @@ -111,33 +112,79 @@ def daily_backup_routine(format="zip"): logger.error("Stack trace:", exc_info=True) -def backup_from_file(api): - """Restore index data from file""" - logger = logging.getLogger("backup_from_file") - if not api: - logger.error("failure to restore from file, no json object passed.") - else: - indices.reset() - api_schema = api["discover_schema"] - api_schema_class = api["discover_schema_class"] - api_dataset = api["discover_dataset"] - - for doc in api_schema["docs"]: - file = Schema(**doc) - file.meta.id = doc["_id"] - file.save() - - for doc in api_schema_class["docs"]: - file = SchemaClass(**doc) - file.save() +def _backup(backup_data: dict, indices: Union[str, List[str], Tuple[str, ...]] = "all") -> None: + """ + Restore index data, with an option to update selected indices. - for doc in api_dataset["docs"]: - file = Dataset(**doc) - file.save() + Parameters: + - backup_data: dict - JSON object containing the backup data. + - indices: Union[str, List[str], Tuple[str, ...]] - Specifies which indices to update. + Accepts 'all' or any combination of ['schema', 'schema_class', 'dataset']. + """ + # Validate backup_data + if not backup_data: + logger.error("Failure to restore from file, no JSON object passed.") + return -def restore_from_s3(filename=None, bucket="dde"): + # Validate 'indices' + valid_indices = {"schema", "schema_class", "dataset"} + if isinstance(indices, str): + indices = valid_indices if indices == "all" else {indices.strip()} + elif isinstance(indices, (list, tuple)): + indices = set(indices) + else: + logger.error(f"Invalid type for 'indices': {type(indices)}. Must be string, list, or tuple.") + return + + if not indices.issubset(valid_indices): + invalid_elements = indices - valid_indices + logger.error(f"Invalid elements in 'indices': {invalid_elements}. Must be a subset of {valid_indices}.") + return + + indices_to_reset = list(valid_indices & indices) + + # Reset selected indices + for index in indices_to_reset: + reset(indices=index) + + # Reset and update target indices based on the indices parameter + if indices == "all" or "schema" in indices_to_reset: + # Update discover_schema + if "discover_schema" in backup_data: + api_schema = backup_data["discover_schema"] + for doc in api_schema["docs"]: + file = Schema(**doc) + file.meta.id = doc["_id"] + file.save() + logger.info("The discover_schema index data was updated successfully.") + else: + logger.info("No discover_schema data found in the API backup") + + if indices == "all" or "schema_class" in indices_to_reset: + # Update discover_schema_class + if "discover_schema_class" in backup_data: + api_schema_class = backup_data["discover_schema_class"] + for doc in api_schema_class["docs"]: + file = SchemaClass(**doc) + file.save() + logger.info("The discover_schema_class index data was updated successfully.") + else: + logger.info("No discover_schema_class data found in the API backup") + + if indices == "all" or "dataset" in indices_to_reset: + # Update discover_dataset + if "discover_dataset" in backup_data: + api_dataset = backup_data["discover_dataset"] + for doc in api_dataset["docs"]: + file = Dataset(**doc) + file.save() + logger.info("The discover_dataset index data was updated successfully.") + else: + logger.info("No discover_dataset data found in the API backup") + +def restore_from_s3(filename: str = None, bucket: str = "dde", indices: Union[str, List[str], Tuple[str, ...]] = "all"): s3 = boto3.client("s3") if not filename: @@ -153,7 +200,6 @@ def restore_from_s3(filename=None, bucket="dde"): Bucket=bucket, Key=filename ) - filename = filename.replace("db_backup/", "") if filename.endswith(".zip"): @@ -166,14 +212,14 @@ def restore_from_s3(filename=None, bucket="dde"): with zfile.open(json_file) as json_data: ddeapis = json.load(json_data) elif filename.endswith(".json"): - ddeapis = json.loads(obj['Body'].read()) + ddeapis = json.loads(obj["Body"].read()) else: raise Exception("Unsupported backup file type!") - backup_from_file(ddeapis) + _backup(ddeapis, indices=indices) -def restore_from_file(filename=None): +def restore_from_file(filename: str = None, indices: Union[str, List[str], Tuple[str, ...]] = "all"): if filename.endswith(".zip"): with zipfile.ZipFile(filename, 'r') as zfile: # Search for a JSON file inside the ZIP @@ -188,4 +234,4 @@ def restore_from_file(filename=None): else: raise Exception("Unsupported backup file type!") - backup_from_file(ddeapis) + _backup(ddeapis, indices=indices) diff --git a/discovery/utils/indices.py b/discovery/utils/indices.py index 6317d031..01ff3ac6 100644 --- a/discovery/utils/indices.py +++ b/discovery/utils/indices.py @@ -1,4 +1,5 @@ from elasticsearch_dsl import Index +from typing import Union, List, Tuple from discovery.model.dataset import Dataset from discovery.model.schema import Schema, SchemaClass @@ -26,25 +27,35 @@ def refresh(): Index(Dataset.Index.name).refresh() -def reset(): - - index_1 = Index(Schema.Index.name) - index_2 = Index(SchemaClass.Index.name) - index_3 = Index(Dataset.Index.name) - - if index_1.exists(): - index_1.delete() - - if index_2.exists(): - index_2.delete() - - if index_3.exists(): - index_3.delete() - - Schema.init() - SchemaClass.init() - Dataset.init() - +def reset(indices: Union[str, List[str], Tuple[str, ...]] = "all") -> None: + """ + Reset selected indices. Default is to reset all indices. + Parameters: + - indices: Union[str, List[str], Tuple[str, ...]] - Specifies which indices to reset. + Accepts 'all' or any combination of ["schema", "schema_class", "dataset"]. + """ + + # Define index mapping + index_mapping = { + "schema": Schema, + "schema_class": SchemaClass, + "dataset": Dataset, + } + + if isinstance(indices, str): + indices = list(index_mapping.keys()) if indices == "all" else [indices] + elif not isinstance(indices, (list, tuple)): + return + + # Filter valid indices and reset them + indices_to_reset = [index for index in indices if index in index_mapping] + + for index_name in indices_to_reset: + model = index_mapping[index_name] + index = Index(model.Index.name) + if index.exists(): + index.delete() + model.init() def save_schema_index_meta(meta): """save index metadata to Schema ES index"""