Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed Mar 3, 2024
1 parent ad57f01 commit 82aa8b4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 85 deletions.
165 changes: 80 additions & 85 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class MigrationStatus(str, Enum):
"""
NOT_STARTED = "NOT_STARTED"
UPLOADED_TO_AZURE = "UPLOADED_TO_AZURE"
UPLOADED_NOT_INDEXED = "UPLOADED_NOT_INDEXED"
CHANGED_DB = "CHANGED_DB"
FINISHED = "FINISHED" # Meaning it is uploaded to Azure, DB updated, and deleted from disk.

Expand Down Expand Up @@ -481,10 +482,8 @@ def migrate_bundle(self, bundle_uuid):
self.logger.info("Getting Bundle info")
bundle = self.get_bundle(bundle_uuid)
bundle_location = self.get_bundle_location(bundle_uuid)
print(bundle)
print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid))
print(bundle_location)
# print("contents.gz" in bundle_location)

# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
Expand All @@ -500,14 +499,6 @@ def migrate_bundle(self, bundle_uuid):
except Exception as e:
# print(e)
if "Path ''" in str(e):
# if bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# try:
# if "contents.gz" in bundle_location:
# bundle_info = self.get_bundle_info(bundle_uuid, new_bundle_location)
# print(bundle_info)
# except:
# pass

for i in range(0, 11):
try:
bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}')
Expand All @@ -529,87 +520,92 @@ def migrate_bundle(self, bundle_uuid):
self.bundle_manager._model.update_bundle(
bundle,
{
'storage_type': StorageType.DISK_STORAGE.value,
'is_dir': bundle_info['type'] == 'directory',
}
)
self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid)
bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED
bundle_migration_status.error_message = ''
print(self.bundle_manager._model.get_bundle_locations(bundle_uuid))

print(bundle_info, bundle_location, is_bundle_rm)
# print(bundle_migration_status)

# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# print(1)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# print(bundle_migration_status)
# print(2)

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.error_message = ''
return
# elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = ''
# elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )[0]):
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = ''

# Upload to Azure.
# print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
print("UPLOADING")
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
print("Changing DB")
self.logger.info("Changing DB")
start_time = time.time()
self.modify_bundle_data(bundle, bundle_uuid, is_dir)
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB
bundle_migration_status.error_message = None

# Delete the bundle from disk.
if self.delete and not is_bundle_rm:
self.logger.info("Deleting from disk")
start_time = time.time()
delete_status = self.delete_original_bundle(bundle_uuid)
if not delete_status:
self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# if os.path.lexists(disk_location):
# # Delete it.
# path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED
bundle_migration_status.error_message = None
# # Normal Migration
# if not is_bundle_rm:
# is_dir = bundle_info['type'] == 'directory'
# target_location = self.blob_target_location(bundle_uuid, is_dir)
# disk_location = self.get_bundle_disk_location(bundle_uuid)

# # print(1)

# # Don't migrate currently running bundles
# if bundle.state not in State.FINAL_STATES:
# bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
# return

# # Don't migrate linked bundles
# if self.is_linked_bundle(bundle_uuid):
# bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
# return

# # print(bundle_migration_status)
# # print(2)

# # if db already changed
# # TODO: Check if bundle_location is azure (see other places in code base.)
# if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.error_message = ''
# return
# # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# # bundle_migration_status.status = MigrationStatus.CHANGED_DB
# # bundle_migration_status.error_message = ''
# # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# # bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# # )[0]):
# # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# # bundle_migration_status.error_message = ''

# # Upload to Azure.
# # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
# if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
# print("UPLOADING")
# self.logger.info("Uploading to Azure")
# start_time = time.time()
# self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
# self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
# success, reason = self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )
# if not success:
# raise ValueError(f"SanityCheck failed with {reason}")
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = None

# # Change bundle metadata in database to point to the Azure Blob location (not disk)
# if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
# print("Changing DB")
# self.logger.info("Changing DB")
# start_time = time.time()
# self.modify_bundle_data(bundle, bundle_uuid, is_dir)
# self.times["modify_bundle_data"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = None

# # Delete the bundle from disk.
# if self.delete and not is_bundle_rm:
# self.logger.info("Deleting from disk")
# start_time = time.time()
# delete_status = self.delete_original_bundle(bundle_uuid)
# if not delete_status:
# self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# # if os.path.lexists(disk_location):
# # # Delete it.
# # path_util.remove(disk_bundle_location)
# self.times["delete_original_bundle"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.FINISHED
# bundle_migration_status.error_message = None

self.times["migrate_bundle"].append(time.time() - total_start_time)

Expand Down Expand Up @@ -637,7 +633,6 @@ def log_times(self):
self.logger.info(json.dumps(output_dict, sort_keys=True, indent=4))

def write_bundle_statuses(self):
return
new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses])
if self.existing_bundle_migration_statuses is None:
self.existing_bundle_migration_statuses = new_records
Expand Down
11 changes: 11 additions & 0 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3175,6 +3175,17 @@ def add_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None:
}
connection.execute(cl_bundle_location.insert().values(bundle_location_value))

def remove_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None:
"""
Removes a new bundle location to the specified bundle.
Args:
bundle_uuid (str): The uuid for the bundle which we want to remove a BundleLocation to.
bundle_store_uuid (str): The uuid for the bundle store we are associating with the new BundleLocation.
"""
with self.engine.begin() as connection:
connection.execute(cl_bundle_location.delete().where((cl_bundle_location.c.bundle_uuid == bundle_uuid) & (cl_bundle_location.c.bundle_store_uuid == bundle_store_uuid)))

def get_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> dict:
"""
Returns data about the location associated with the specified bundle and bundle store.
Expand Down

0 comments on commit 82aa8b4

Please sign in to comment.