From 140645943c331a0b752e5f09df7713936ce2b19b Mon Sep 17 00:00:00 2001 From: Travis Abendshien <46939827+CyanVoxel@users.noreply.github.com> Date: Mon, 4 Nov 2024 12:20:14 -0800 Subject: [PATCH] feat: create auto-backup of library for use in save failures (Fix #343) (#554) * fix: move `self.to_json()` outside `with` block * feat: add library autosave on start * fix: check if `backup_folder` exists before loading backup --- tagstudio/src/core/library.py | 486 ++++++++++++++++++---------------- 1 file changed, 257 insertions(+), 229 deletions(-) diff --git a/tagstudio/src/core/library.py b/tagstudio/src/core/library.py index a4e8dde71..d96cb8ed4 100644 --- a/tagstudio/src/core/library.py +++ b/tagstudio/src/core/library.py @@ -480,7 +480,7 @@ def verify_default_tags(self, tag_list: list[JsonTag]) -> list[JsonTag]: return tag_list - def open_library(self, path: str | Path) -> int: + def open_library(self, path: str | Path, is_path_file: bool = False) -> int: """ Opens a TagStudio v9+ Library. Returns 0 if library does not exist, 1 if successfully opened, 2 if corrupted. @@ -488,242 +488,264 @@ def open_library(self, path: str | Path) -> int: return_code: int = 2 - _path: Path = self._fix_lib_path(path) + _path: Path = self._fix_lib_path(path) if not is_path_file else Path(path) + lib_path: Path = ( + _path / TS_FOLDER_NAME / "ts_library.json" if not is_path_file else _path + ) + logging.info(f"[LIBRARY] Library Save File Loaded From: {lib_path}") - if (_path / TS_FOLDER_NAME / "ts_library.json").exists(): - try: - with open( - _path / TS_FOLDER_NAME / "ts_library.json", - "r", - encoding="utf-8", - ) as file: - json_dump: JsonLibary = ujson.load(file) - self.library_dir = Path(_path) - self.verify_ts_folders() - major, minor, patch = json_dump["ts-version"].split(".") - - # Load Extension List -------------------------------------- - start_time = time.time() - if "ignored_extensions" in json_dump: - self.ext_list = json_dump.get( - "ignored_extensions", self.default_ext_exclude_list - ) - else: - self.ext_list = json_dump.get( - "ext_list", self.default_ext_exclude_list - ) + # if (lib_path).exists(): + # json_dump: JsonLibary = None - # Sanitizes older lists (v9.2.1) that don't use leading periods. - # Without this, existing lists (including default lists) - # have to otherwise be updated by hand in order to restore - # previous functionality. - sanitized_list: list[str] = [] - for ext in self.ext_list: - if not ext.startswith("."): - ext = "." + ext - sanitized_list.append(ext) - self.ext_list = sanitized_list - - self.is_exclude_list = json_dump.get("is_exclude_list", True) - end_time = time.time() - logging.info( - f"[LIBRARY] Extension list loaded in {(end_time - start_time):.3f} seconds" - ) + try: + with open( + lib_path, + "r", + encoding="utf-8", + ) as file: + json_dump = ujson.load(file) + + except (ujson.JSONDecodeError, FileNotFoundError): + logging.info( + "[LIBRARY][ERROR] Blank/Corrupted Library file found. Searching for Auto Backup..." + ) + backup_folder: Path = ( + self._fix_lib_path(path) / TS_FOLDER_NAME / BACKUP_FOLDER_NAME + ) + if backup_folder.exists(): + auto_backup: Path = None + dir_obj = os.scandir(backup_folder) - # Parse Tags ----------------------------------------------- - if "tags" in json_dump.keys(): - start_time = time.time() - - # Step 1: Verify default built-in tags are present. - json_dump["tags"] = self.verify_default_tags(json_dump["tags"]) - - for tag in json_dump["tags"]: - # Step 2: Create a Tag object and append it to the internal Tags list, - # then map that Tag's ID to its index in the Tags list. - - id = int(tag.get("id", 0)) - - # Don't load tags with duplicate IDs - if id not in {t.id for t in self.tags}: - if id >= self._next_tag_id: - self._next_tag_id = id + 1 - - name = tag.get("name", "") - shorthand = tag.get("shorthand", "") - aliases = tag.get("aliases", []) - subtag_ids = tag.get("subtag_ids", []) - color = tag.get("color", "") - - t = Tag( - id=id, - name=name, - shorthand=shorthand, - aliases=aliases, - subtags_ids=subtag_ids, - color=color, - ) + for backup_file in dir_obj: + if backup_file.is_file() and "ts_library_backup_auto" in str( + backup_file + ): + auto_backup = Path(backup_file) + break - # NOTE: This does NOT use the add_tag_to_library() method! - # That method is only used for Tags added at runtime. - # This process uses the same inner methods, but waits until all of the - # Tags are registered in the Tags list before creating the Tag clusters. - self.tags.append(t) - self._map_tag_id_to_index(t, -1) - self._map_tag_strings_to_tag_id(t) - else: - logging.info( - f"[LIBRARY]Skipping Tag with duplicate ID: {tag}" - ) + if auto_backup and "ts_library_backup_auto" not in str(path): + logging.info(f"[LIBRARY] Loading Auto Backup: {auto_backup}") + return self.open_library(auto_backup, is_path_file=True) - # Step 3: Map each Tag's subtags together now that all Tag objects in it. - for t in self.tags: - self._map_tag_id_to_cluster(t) + else: + self.library_dir = self._fix_lib_path(path) + logging.info(f"[LIBRARY] Library Save Target Directory: {self.library_dir}") + self.verify_ts_folders() + major, minor, patch = json_dump["ts-version"].split(".") - end_time = time.time() - logging.info( - f"[LIBRARY] Tags loaded in {(end_time - start_time):.3f} seconds" + # Load Extension List -------------------------------------- + start_time = time.time() + if "ignored_extensions" in json_dump: + self.ext_list = json_dump.get( + "ignored_extensions", self.default_ext_exclude_list + ) + else: + self.ext_list = json_dump.get("ext_list", self.default_ext_exclude_list) + + # Sanitizes older lists (v9.2.1) that don't use leading periods. + # Without this, existing lists (including default lists) + # have to otherwise be updated by hand in order to restore + # previous functionality. + sanitized_list: list[str] = [] + for ext in self.ext_list: + if not ext.startswith("."): + ext = "." + ext + sanitized_list.append(ext) + self.ext_list = sanitized_list + + self.is_exclude_list = json_dump.get("is_exclude_list", True) + end_time = time.time() + logging.info( + f"[LIBRARY] Extension list loaded in {(end_time - start_time):.3f} seconds" + ) + + # Parse Tags ----------------------------------------------- + if "tags" in json_dump.keys(): + start_time = time.time() + + # Step 1: Verify default built-in tags are present. + json_dump["tags"] = self.verify_default_tags(json_dump["tags"]) + + for tag in json_dump["tags"]: + # Step 2: Create a Tag object and append it to the internal Tags list, + # then map that Tag's ID to its index in the Tags list. + + id = int(tag.get("id", 0)) + + # Don't load tags with duplicate IDs + if id not in {t.id for t in self.tags}: + if id >= self._next_tag_id: + self._next_tag_id = id + 1 + + name = tag.get("name", "") + shorthand = tag.get("shorthand", "") + aliases = tag.get("aliases", []) + subtag_ids = tag.get("subtag_ids", []) + color = tag.get("color", "") + + t = Tag( + id=id, + name=name, + shorthand=shorthand, + aliases=aliases, + subtags_ids=subtag_ids, + color=color, ) - # Parse Entries -------------------------------------------- - if entries := json_dump.get("entries"): - start_time = time.time() - for entry in entries: - if "id" in entry: - id = int(entry["id"]) - if id >= self._next_entry_id: - self._next_entry_id = id + 1 - else: - # Version 9.1.x+ Compatibility - id = self._next_entry_id - self._next_entry_id += 1 - - filename = entry.get("filename", "") - e_path = entry.get("path", "") - fields: list = [] - if "fields" in entry: - # Cast JSON str keys to ints - - for f in entry["fields"]: - f[int(list(f.keys())[0])] = f[list(f.keys())[0]] - del f[list(f.keys())[0]] - fields = entry["fields"] - - # Look through fields for legacy Collation data ---- - if int(major) >= 9 and int(minor) < 1: - for f in fields: - if self.get_field_attr(f, "type") == "collation": - # NOTE: This legacy support will be removed in - # a later version, probably 9.2. - # Legacy Collation data present in v9.0.x - # DATA SHAPE: {name: str, page: int} - - # We'll do an inefficient linear search each - # time to convert the legacy data. - matched = False - collation_id = -1 - for c in self.collations: - if ( - c.title - == self.get_field_attr(f, "content")[ - "name" - ] - ): - c.e_ids_and_pages.append( - ( - id, - int( - self.get_field_attr( - f, "content" - )["page"] - ), - ) - ) - matched = True - collation_id = c.id - if not matched: - c = Collation( - id=self._next_collation_id, - title=self.get_field_attr(f, "content")[ - "name" - ], - e_ids_and_pages=[], - sort_order="", - ) - collation_id = self._next_collation_id - self._next_collation_id += 1 - c.e_ids_and_pages.append( - ( - id, - int( - self.get_field_attr( - f, "content" - )["page"] - ), - ) + # NOTE: This does NOT use the add_tag_to_library() method! + # That method is only used for Tags added at runtime. + # This process uses the same inner methods, but waits until all of the + # Tags are registered in the Tags list before creating the Tag clusters. + self.tags.append(t) + self._map_tag_id_to_index(t, -1) + self._map_tag_strings_to_tag_id(t) + else: + logging.info(f"[LIBRARY]Skipping Tag with duplicate ID: {tag}") + + # Step 3: Map each Tag's subtags together now that all Tag objects in it. + for t in self.tags: + self._map_tag_id_to_cluster(t) + + end_time = time.time() + logging.info( + f"[LIBRARY] Tags loaded in {(end_time - start_time):.3f} seconds" + ) + + # Parse Entries -------------------------------------------- + if entries := json_dump.get("entries"): + start_time = time.time() + for entry in entries: + if "id" in entry: + id = int(entry["id"]) + if id >= self._next_entry_id: + self._next_entry_id = id + 1 + else: + # Version 9.1.x+ Compatibility + id = self._next_entry_id + self._next_entry_id += 1 + + filename = entry.get("filename", "") + e_path = entry.get("path", "") + fields: list = [] + if "fields" in entry: + # Cast JSON str keys to ints + + for f in entry["fields"]: + f[int(list(f.keys())[0])] = f[list(f.keys())[0]] + del f[list(f.keys())[0]] + fields = entry["fields"] + + # Look through fields for legacy Collation data ---- + if int(major) >= 9 and int(minor) < 1: + for f in fields: + if self.get_field_attr(f, "type") == "collation": + # NOTE: This legacy support will be removed in + # a later version, probably 9.2. + # Legacy Collation data present in v9.0.x + # DATA SHAPE: {name: str, page: int} + + # We'll do an inefficient linear search each + # time to convert the legacy data. + matched = False + collation_id = -1 + for c in self.collations: + if ( + c.title + == self.get_field_attr(f, "content")["name"] + ): + c.e_ids_and_pages.append( + ( + id, + int( + self.get_field_attr(f, "content")[ + "page" + ] + ), ) - self.collations.append(c) - self._map_collation_id_to_index(c, -1) - f_id = self.get_field_attr(f, "id") - f.clear() - f[int(f_id)] = collation_id - # Collation Field data present in v9.1.x+ - # DATA SHAPE: int - elif int(major) >= 9 and int(minor) >= 1: - pass - - e = Entry( - id=int(id), - filename=filename, - path=e_path, - fields=fields, - ) - self.entries.append(e) - self._map_entry_id_to_index(e, -1) + ) + matched = True + collation_id = c.id + if not matched: + c = Collation( + id=self._next_collation_id, + title=self.get_field_attr(f, "content")["name"], + e_ids_and_pages=[], + sort_order="", + ) + collation_id = self._next_collation_id + self._next_collation_id += 1 + c.e_ids_and_pages.append( + ( + id, + int( + self.get_field_attr(f, "content")[ + "page" + ] + ), + ) + ) + self.collations.append(c) + self._map_collation_id_to_index(c, -1) + f_id = self.get_field_attr(f, "id") + f.clear() + f[int(f_id)] = collation_id + # Collation Field data present in v9.1.x+ + # DATA SHAPE: int + elif int(major) >= 9 and int(minor) >= 1: + pass + + e = Entry( + id=int(id), + filename=filename, + path=e_path, + fields=fields, + ) + self.entries.append(e) + self._map_entry_id_to_index(e, -1) - end_time = time.time() - logging.info( - f"[LIBRARY] Entries loaded in {(end_time - start_time):.3f} seconds" - ) + end_time = time.time() + logging.info( + f"[LIBRARY] Entries loaded in {(end_time - start_time):.3f} seconds" + ) - # Parse Collations ----------------------------------------- - if "collations" in json_dump.keys(): - start_time = time.time() - for collation in json_dump["collations"]: - # Step 1: Create a Collation object and append it to - # the internal Collations list, then map that - # Collation's ID to its index in the Collations list. - - id = int(collation.get("id", 0)) - if id >= self._next_collation_id: - self._next_collation_id = id + 1 - - title = collation.get("title", "") - e_ids_and_pages = collation.get("e_ids_and_pages", []) - sort_order = collation.get("sort_order", "") - cover_id = collation.get("cover_id", -1) - - c = Collation( - id=id, - title=title, - e_ids_and_pages=e_ids_and_pages, # type: ignore - sort_order=sort_order, - cover_id=cover_id, - ) + # Parse Collations ----------------------------------------- + if "collations" in json_dump.keys(): + start_time = time.time() + for collation in json_dump["collations"]: + # Step 1: Create a Collation object and append it to + # the internal Collations list, then map that + # Collation's ID to its index in the Collations list. + + id = int(collation.get("id", 0)) + if id >= self._next_collation_id: + self._next_collation_id = id + 1 + + title = collation.get("title", "") + e_ids_and_pages = collation.get("e_ids_and_pages", []) + sort_order = collation.get("sort_order", "") + cover_id = collation.get("cover_id", -1) + + c = Collation( + id=id, + title=title, + e_ids_and_pages=e_ids_and_pages, + sort_order=sort_order, + cover_id=cover_id, + ) - # NOTE: This does NOT use the add_collation_to_library() method - # which is intended to be used at runtime. However, there is - # currently no reason why it couldn't be used here, and is - # instead not used for consistency. - self.collations.append(c) - self._map_collation_id_to_index(c, -1) - end_time = time.time() - logging.info( - f"[LIBRARY] Collations loaded in {(end_time - start_time):.3f} seconds" - ) + # NOTE: This does NOT use the add_collation_to_library() method + # which is intended to be used at runtime. However, there is + # currently no reason why it couldn't be used here, and is + # instead not used for consistency. + self.collations.append(c) + self._map_collation_id_to_index(c, -1) + end_time = time.time() + logging.info( + f"[LIBRARY] Collations loaded in {(end_time - start_time):.3f} seconds" + ) - return_code = 1 - except ujson.JSONDecodeError: - logging.info("[LIBRARY][ERROR]: Empty JSON file!") + return_code = 1 + self.save_library_backup_to_disk(is_auto=True) # If the Library is loaded, continue other processes. if return_code == 1: @@ -798,13 +820,13 @@ def save_library_to_disk(self): filename = "ts_library.json" self.verify_ts_folders() - + json_library: JsonLibary = self.to_json() with open( self.library_dir / TS_FOLDER_NAME / filename, "w", encoding="utf-8" ) as outfile: outfile.flush() ujson.dump( - self.to_json(), + json_library, outfile, ensure_ascii=False, escape_forward_slashes=False, @@ -815,16 +837,22 @@ def save_library_to_disk(self): f"[LIBRARY] Library saved to disk in {(end_time - start_time):.3f} seconds" ) - def save_library_backup_to_disk(self) -> str: + def save_library_backup_to_disk(self, is_auto: bool = False) -> str: """ Saves a backup file of the Library to disk at the default TagStudio folder location. Returns the filename used, including the date and time.""" logging.info(f"[LIBRARY] Saving Library Backup to Disk...") start_time = time.time() - filename = f'ts_library_backup_{datetime.datetime.utcnow().strftime("%F_%T").replace(":", "")}.json' + + filename = ( + "ts_library_backup_auto.json" + if is_auto + else f'ts_library_backup_{datetime.datetime.utcnow().strftime("%F_%T").replace(":", "")}.json' + ) self.verify_ts_folders() + json_library: JsonLibary = self.to_json() with open( self.library_dir / TS_FOLDER_NAME / BACKUP_FOLDER_NAME / filename, "w", @@ -832,7 +860,7 @@ def save_library_backup_to_disk(self) -> str: ) as outfile: outfile.flush() ujson.dump( - self.to_json(), + json_library, outfile, ensure_ascii=False, escape_forward_slashes=False,