Skip to content

Commit

Permalink
Merge pull request #288 from biothings/add-backup-feature
Browse files Browse the repository at this point in the history
Add backup flexibility Issue#242
  • Loading branch information
NikkiBytes authored Oct 16, 2024
2 parents 41b9f8c + e6132b6 commit 6b37c3a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 49 deletions.
106 changes: 76 additions & 30 deletions discovery/utils/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import zipfile
import io
from datetime import date, datetime
from typing import Union, List, Tuple

import boto3

from discovery.model.dataset import Dataset
from discovery.model.schema import Schema, SchemaClass
from discovery.utils import indices
from discovery.utils.indices import reset

logging.basicConfig(level="INFO")
logger = logging.getLogger("backup")


def json_serial(obj):
Expand Down Expand Up @@ -111,33 +112,79 @@ def daily_backup_routine(format="zip"):
logger.error("Stack trace:", exc_info=True)


def backup_from_file(api):
"""Restore index data from file"""
logger = logging.getLogger("backup_from_file")
if not api:
logger.error("failure to restore from file, no json object passed.")
else:
indices.reset()
api_schema = api["discover_schema"]
api_schema_class = api["discover_schema_class"]
api_dataset = api["discover_dataset"]

for doc in api_schema["docs"]:
file = Schema(**doc)
file.meta.id = doc["_id"]
file.save()

for doc in api_schema_class["docs"]:
file = SchemaClass(**doc)
file.save()
def _backup(backup_data: dict, indices: Union[str, List[str], Tuple[str, ...]] = "all") -> None:
"""
Restore index data, with an option to update selected indices.
for doc in api_dataset["docs"]:
file = Dataset(**doc)
file.save()
Parameters:
- backup_data: dict - JSON object containing the backup data.
- indices: Union[str, List[str], Tuple[str, ...]] - Specifies which indices to update.
Accepts 'all' or any combination of ['schema', 'schema_class', 'dataset'].
"""

# Validate backup_data
if not backup_data:
logger.error("Failure to restore from file, no JSON object passed.")
return

def restore_from_s3(filename=None, bucket="dde"):
# Validate 'indices'
valid_indices = {"schema", "schema_class", "dataset"}

if isinstance(indices, str):
indices = valid_indices if indices == "all" else {indices.strip()}
elif isinstance(indices, (list, tuple)):
indices = set(indices)
else:
logger.error(f"Invalid type for 'indices': {type(indices)}. Must be string, list, or tuple.")
return

if not indices.issubset(valid_indices):
invalid_elements = indices - valid_indices
logger.error(f"Invalid elements in 'indices': {invalid_elements}. Must be a subset of {valid_indices}.")
return

indices_to_reset = list(valid_indices & indices)

# Reset selected indices
for index in indices_to_reset:
reset(indices=index)

# Reset and update target indices based on the indices parameter
if indices == "all" or "schema" in indices_to_reset:
# Update discover_schema
if "discover_schema" in backup_data:
api_schema = backup_data["discover_schema"]
for doc in api_schema["docs"]:
file = Schema(**doc)
file.meta.id = doc["_id"]
file.save()
logger.info("The discover_schema index data was updated successfully.")
else:
logger.info("No discover_schema data found in the API backup")

if indices == "all" or "schema_class" in indices_to_reset:
# Update discover_schema_class
if "discover_schema_class" in backup_data:
api_schema_class = backup_data["discover_schema_class"]
for doc in api_schema_class["docs"]:
file = SchemaClass(**doc)
file.save()
logger.info("The discover_schema_class index data was updated successfully.")
else:
logger.info("No discover_schema_class data found in the API backup")

if indices == "all" or "dataset" in indices_to_reset:
# Update discover_dataset
if "discover_dataset" in backup_data:
api_dataset = backup_data["discover_dataset"]
for doc in api_dataset["docs"]:
file = Dataset(**doc)
file.save()
logger.info("The discover_dataset index data was updated successfully.")
else:
logger.info("No discover_dataset data found in the API backup")

def restore_from_s3(filename: str = None, bucket: str = "dde", indices: Union[str, List[str], Tuple[str, ...]] = "all"):
s3 = boto3.client("s3")

if not filename:
Expand All @@ -153,7 +200,6 @@ def restore_from_s3(filename=None, bucket="dde"):
Bucket=bucket,
Key=filename
)

filename = filename.replace("db_backup/", "")

if filename.endswith(".zip"):
Expand All @@ -166,14 +212,14 @@ def restore_from_s3(filename=None, bucket="dde"):
with zfile.open(json_file) as json_data:
ddeapis = json.load(json_data)
elif filename.endswith(".json"):
ddeapis = json.loads(obj['Body'].read())
ddeapis = json.loads(obj["Body"].read())
else:
raise Exception("Unsupported backup file type!")

backup_from_file(ddeapis)
_backup(ddeapis, indices=indices)


def restore_from_file(filename=None):
def restore_from_file(filename: str = None, indices: Union[str, List[str], Tuple[str, ...]] = "all"):
if filename.endswith(".zip"):
with zipfile.ZipFile(filename, 'r') as zfile:
# Search for a JSON file inside the ZIP
Expand All @@ -188,4 +234,4 @@ def restore_from_file(filename=None):
else:
raise Exception("Unsupported backup file type!")

backup_from_file(ddeapis)
_backup(ddeapis, indices=indices)
49 changes: 30 additions & 19 deletions discovery/utils/indices.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from elasticsearch_dsl import Index
from typing import Union, List, Tuple

from discovery.model.dataset import Dataset
from discovery.model.schema import Schema, SchemaClass
Expand Down Expand Up @@ -26,25 +27,35 @@ def refresh():
Index(Dataset.Index.name).refresh()


def reset():

index_1 = Index(Schema.Index.name)
index_2 = Index(SchemaClass.Index.name)
index_3 = Index(Dataset.Index.name)

if index_1.exists():
index_1.delete()

if index_2.exists():
index_2.delete()

if index_3.exists():
index_3.delete()

Schema.init()
SchemaClass.init()
Dataset.init()

def reset(indices: Union[str, List[str], Tuple[str, ...]] = "all") -> None:
"""
Reset selected indices. Default is to reset all indices.
Parameters:
- indices: Union[str, List[str], Tuple[str, ...]] - Specifies which indices to reset.
Accepts 'all' or any combination of ["schema", "schema_class", "dataset"].
"""

# Define index mapping
index_mapping = {
"schema": Schema,
"schema_class": SchemaClass,
"dataset": Dataset,
}

if isinstance(indices, str):
indices = list(index_mapping.keys()) if indices == "all" else [indices]
elif not isinstance(indices, (list, tuple)):
return

# Filter valid indices and reset them
indices_to_reset = [index for index in indices if index in index_mapping]

for index_name in indices_to_reset:
model = index_mapping[index_name]
index = Index(model.Index.name)
if index.exists():
index.delete()
model.init()

def save_schema_index_meta(meta):
"""save index metadata to Schema ES index"""
Expand Down

0 comments on commit 6b37c3a

Please sign in to comment.