From bcd2003acd0fefb2674d64eaffad19bd37ae9625 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Wed, 25 Sep 2024 10:08:07 -0400 Subject: [PATCH] Start adding post-storage update logic --- backend/btrixcloud/orgs.py | 52 ++++++++++++++++++++++++++++-- backend/btrixcloud/storages.py | 59 +++++++++++++++++++++++++++------- 2 files changed, 96 insertions(+), 15 deletions(-) diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index fd0177a5b..ccf47a9dd 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -462,6 +462,36 @@ async def update_storage_refs(self, org: Organization) -> bool: res = await self.orgs.find_one_and_update({"_id": org.id}, {"$set": set_dict}) return res is not None + async def update_file_storage_refs( + self, org: Organization, previous_storage: StorageRef, new_storage: StorageRef + ) -> bool: + """Update storage refs for all crawl and profile files in given org""" + res = await self.crawls_db.update_many( + {"_id": org.id, "files.$.storage": previous_storage}, + {"$set": {"files.$.storage": new_storage}}, + ) + if not res: + return False + + res = await self.profiles_db.update_many( + {"_id": org.id, "resource.storage": previous_storage}, + {"$set": {"resource.storage": new_storage}}, + ) + if not res: + return False + + return True + + async def unset_file_presigned_urls(self, org: Organization) -> bool: + """Unset all presigned URLs for files in org""" + res = await self.crawls_db.update_many( + {"_id": org.id}, {"$set": {"files.$.presignedUrl": None}} + ) + if not res: + return False + + return True + async def update_subscription_data( self, update: SubscriptionUpdate ) -> Optional[Organization]: @@ -933,13 +963,29 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]: "publicCollectionsCount": public_collections_count, } - async def is_crawl_running(self, oid: UUID) -> bool: + async def is_crawl_running(self, org: Organization) -> bool: """Return boolean indicating whether any crawls are currently running in org""" - workflows_running_count = await self.crawls_db.count_documents( + running_count = await self.crawls_db.count_documents( {"oid": org.id, "state": {"$in": RUNNING_STATES}} ) - if workflows_running_count > 0: + if running_count > 0: + return True + return False + + async def has_files_stored(self, org: Organization) -> bool: + """Return boolean indicating whether any files are stored on org""" + crawl_count = await self.crawls_db.count_documents( + {"_id": org.id, "files.1": {"$exists": True}}, + ) + if crawl_count > 0: + return True + + profile_count = await self.profiles_db.count_documents( + {"_id": org.id, "resource": {"$exists": True}}, + ) + if profile_count > 0: return True + return False async def get_all_org_slugs(self) -> dict[str, list[str]]: diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index eeac23fbf..27f7f7207 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -249,26 +249,61 @@ async def update_storage_refs( if await self.org_ops.is_crawl_running(org.id): raise HTTPException(status_code=400, detail="crawl_running") + prev_storage = org.storage + prev_storage_replicas = org.storageReplicas + org.storage = storage_refs.storage org.storageReplicas = storage_refs.storageReplicas await self.org_ops.update_storage_refs(org) - # TODO: Handle practical consequences of changing buckets - # - If previous primary bucket(s) had files stored, copy or move those - # into new storage and make necessary updates (e.g. regenerate presigned - # URLs?) - # - If replica location is added, replicate everything in primary - # to new replica storage location - # - If replica location is removed, start jobs to delete content? - # (or do we want to handle that manually?) - - # We can set the org to read-only while handling these details - # Think through timing and/or how to communicate status of jobs to - # user, since background jobs don't block + asyncio.create_task( + self.run_post_storage_update_tasks( + OrgStorageRefs(storage=prev_storage, storageReplicas=prev_storage_refs), + storage_refs, + org, + ) + ) return {"updated": True} + async def run_post_storage_update_tasks( + self, + prev_storage_refs: StorageRef, + new_storage_refs: StorageRef, + org: Organization, + ): + """Handle tasks necessary after changing org storage""" + if not await self.org_ops.has_files_stored(org): + return + + if new_storage_refs.storage != prev_storage_refs.storage: + await self.org_ops.update_read_only(org, True, "Updating storage") + + # TODO: Copy files from from previous to new primary storage + # (Ideally block on this, otherwise unset read-only on completion in + # operator?) + + await self.org_ops.update_file_storage_refs( + org, prev_storage_refs.storage, new_storage_refs.storage + ) + + await self.org_ops.unset_file_presigned_urls(org) + + await self.org_ops.update_read_only(org, False) + + if new_storage_refs.storageReplicas != prev_storage_refs.storageReplicas: + # If replica location added, kick off background jobs to replicate + # primary storage to new replica storage location (non-blocking) + + # If replica location is removed, start jobs to delete files from + # removed replica location (non-blocking) + + # If we also changed primary storage in this update, we should make + # sure all files are successfully copied before doing anything to + # the replicas + pass + def get_available_storages(self, org: Organization) -> List[StorageRef]: """return a list of available default + custom storages""" refs: List[StorageRef] = []