Skip to content

Commit

Permalink
Start adding post-storage update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tw4l committed Sep 25, 2024
1 parent d301a7a commit bcd2003
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
52 changes: 49 additions & 3 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,36 @@ async def update_storage_refs(self, org: Organization) -> bool:
res = await self.orgs.find_one_and_update({"_id": org.id}, {"$set": set_dict})
return res is not None

async def update_file_storage_refs(
self, org: Organization, previous_storage: StorageRef, new_storage: StorageRef
) -> bool:
"""Update storage refs for all crawl and profile files in given org"""
res = await self.crawls_db.update_many(
{"_id": org.id, "files.$.storage": previous_storage},
{"$set": {"files.$.storage": new_storage}},
)
if not res:
return False

res = await self.profiles_db.update_many(
{"_id": org.id, "resource.storage": previous_storage},
{"$set": {"resource.storage": new_storage}},
)
if not res:
return False

return True

async def unset_file_presigned_urls(self, org: Organization) -> bool:
"""Unset all presigned URLs for files in org"""
res = await self.crawls_db.update_many(
{"_id": org.id}, {"$set": {"files.$.presignedUrl": None}}
)
if not res:
return False

return True

async def update_subscription_data(
self, update: SubscriptionUpdate
) -> Optional[Organization]:
Expand Down Expand Up @@ -933,13 +963,29 @@ async def get_org_metrics(self, org: Organization) -> dict[str, int]:
"publicCollectionsCount": public_collections_count,
}

async def is_crawl_running(self, oid: UUID) -> bool:
async def is_crawl_running(self, org: Organization) -> bool:
"""Return boolean indicating whether any crawls are currently running in org"""
workflows_running_count = await self.crawls_db.count_documents(
running_count = await self.crawls_db.count_documents(
{"oid": org.id, "state": {"$in": RUNNING_STATES}}
)
if workflows_running_count > 0:
if running_count > 0:
return True
return False

async def has_files_stored(self, org: Organization) -> bool:
"""Return boolean indicating whether any files are stored on org"""
crawl_count = await self.crawls_db.count_documents(
{"_id": org.id, "files.1": {"$exists": True}},
)
if crawl_count > 0:
return True

profile_count = await self.profiles_db.count_documents(
{"_id": org.id, "resource": {"$exists": True}},
)
if profile_count > 0:
return True

return False

async def get_all_org_slugs(self) -> dict[str, list[str]]:
Expand Down
59 changes: 47 additions & 12 deletions backend/btrixcloud/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,26 +249,61 @@ async def update_storage_refs(
if await self.org_ops.is_crawl_running(org.id):
raise HTTPException(status_code=400, detail="crawl_running")

prev_storage = org.storage
prev_storage_replicas = org.storageReplicas

org.storage = storage_refs.storage
org.storageReplicas = storage_refs.storageReplicas

await self.org_ops.update_storage_refs(org)

# TODO: Handle practical consequences of changing buckets
# - If previous primary bucket(s) had files stored, copy or move those
# into new storage and make necessary updates (e.g. regenerate presigned
# URLs?)
# - If replica location is added, replicate everything in primary
# to new replica storage location
# - If replica location is removed, start jobs to delete content?
# (or do we want to handle that manually?)

# We can set the org to read-only while handling these details
# Think through timing and/or how to communicate status of jobs to
# user, since background jobs don't block
asyncio.create_task(
self.run_post_storage_update_tasks(
OrgStorageRefs(storage=prev_storage, storageReplicas=prev_storage_refs),
storage_refs,
org,
)
)

return {"updated": True}

async def run_post_storage_update_tasks(
self,
prev_storage_refs: StorageRef,
new_storage_refs: StorageRef,
org: Organization,
):
"""Handle tasks necessary after changing org storage"""
if not await self.org_ops.has_files_stored(org):
return

if new_storage_refs.storage != prev_storage_refs.storage:
await self.org_ops.update_read_only(org, True, "Updating storage")

# TODO: Copy files from from previous to new primary storage
# (Ideally block on this, otherwise unset read-only on completion in
# operator?)

await self.org_ops.update_file_storage_refs(
org, prev_storage_refs.storage, new_storage_refs.storage
)

await self.org_ops.unset_file_presigned_urls(org)

await self.org_ops.update_read_only(org, False)

if new_storage_refs.storageReplicas != prev_storage_refs.storageReplicas:
# If replica location added, kick off background jobs to replicate
# primary storage to new replica storage location (non-blocking)

# If replica location is removed, start jobs to delete files from
# removed replica location (non-blocking)

# If we also changed primary storage in this update, we should make
# sure all files are successfully copied before doing anything to
# the replicas
pass

def get_available_storages(self, org: Organization) -> List[StorageRef]:
"""return a list of available default + custom storages"""
refs: List[StorageRef] = []
Expand Down

0 comments on commit bcd2003

Please sign in to comment.