diff --git a/codalab/migration.py b/codalab/migration.py index edbca5850..4e911d491 100644 --- a/codalab/migration.py +++ b/codalab/migration.py @@ -71,6 +71,7 @@ class MigrationStatus(str, Enum): """ NOT_STARTED = "NOT_STARTED" UPLOADED_TO_AZURE = "UPLOADED_TO_AZURE" + UPLOADED_NOT_INDEXED = "UPLOADED_NOT_INDEXED" CHANGED_DB = "CHANGED_DB" FINISHED = "FINISHED" # Meaning it is uploaded to Azure, DB updated, and deleted from disk. @@ -481,10 +482,8 @@ def migrate_bundle(self, bundle_uuid): self.logger.info("Getting Bundle info") bundle = self.get_bundle(bundle_uuid) bundle_location = self.get_bundle_location(bundle_uuid) - print(bundle) print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid)) print(bundle_location) - # print("contents.gz" in bundle_location) # This is for handling cases where rm -d was run on the bundle is_bundle_rm = False @@ -500,14 +499,6 @@ def migrate_bundle(self, bundle_uuid): except Exception as e: # print(e) if "Path ''" in str(e): - # if bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - # try: - # if "contents.gz" in bundle_location: - # bundle_info = self.get_bundle_info(bundle_uuid, new_bundle_location) - # print(bundle_info) - # except: - # pass - for i in range(0, 11): try: bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}') @@ -529,87 +520,92 @@ def migrate_bundle(self, bundle_uuid): self.bundle_manager._model.update_bundle( bundle, { + 'storage_type': StorageType.DISK_STORAGE.value, 'is_dir': bundle_info['type'] == 'directory', } ) + self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid) + bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED + bundle_migration_status.error_message = '' + print(self.bundle_manager._model.get_bundle_locations(bundle_uuid)) print(bundle_info, bundle_location, is_bundle_rm) # print(bundle_migration_status) - # Normal Migration - if not is_bundle_rm: - is_dir = bundle_info['type'] == 'directory' - target_location = self.blob_target_location(bundle_uuid, is_dir) - disk_location = self.get_bundle_disk_location(bundle_uuid) - - # print(1) - - # Don't migrate currently running bundles - if bundle.state not in State.FINAL_STATES: - bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL - return - - # Don't migrate linked bundles - if self.is_linked_bundle(bundle_uuid): - bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED - return - - # print(bundle_migration_status) - # print(2) - - # if db already changed - # TODO: Check if bundle_location is azure (see other places in code base.) - if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - bundle_migration_status.error_message = '' - return - # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - # bundle_migration_status.status = MigrationStatus.CHANGED_DB - # bundle_migration_status.error_message = '' - # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( - # bundle_uuid, bundle_location, bundle_info, is_dir, target_location - # )[0]): - # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - # bundle_migration_status.error_message = '' - - # Upload to Azure. - # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) - if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): - print("UPLOADING") - self.logger.info("Uploading to Azure") - start_time = time.time() - self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) - self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) - success, reason = self.sanity_check( - bundle_uuid, bundle_location, bundle_info, is_dir, target_location - ) - if not success: - raise ValueError(f"SanityCheck failed with {reason}") - bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - bundle_migration_status.error_message = None - - # Change bundle metadata in database to point to the Azure Blob location (not disk) - if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: - print("Changing DB") - self.logger.info("Changing DB") - start_time = time.time() - self.modify_bundle_data(bundle, bundle_uuid, is_dir) - self.times["modify_bundle_data"].append(time.time() - start_time) - bundle_migration_status.status = MigrationStatus.CHANGED_DB - bundle_migration_status.error_message = None - - # Delete the bundle from disk. - if self.delete and not is_bundle_rm: - self.logger.info("Deleting from disk") - start_time = time.time() - delete_status = self.delete_original_bundle(bundle_uuid) - if not delete_status: - self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") - # if os.path.lexists(disk_location): - # # Delete it. - # path_util.remove(disk_bundle_location) - self.times["delete_original_bundle"].append(time.time() - start_time) - bundle_migration_status.status = MigrationStatus.FINISHED - bundle_migration_status.error_message = None + # # Normal Migration + # if not is_bundle_rm: + # is_dir = bundle_info['type'] == 'directory' + # target_location = self.blob_target_location(bundle_uuid, is_dir) + # disk_location = self.get_bundle_disk_location(bundle_uuid) + + # # print(1) + + # # Don't migrate currently running bundles + # if bundle.state not in State.FINAL_STATES: + # bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL + # return + + # # Don't migrate linked bundles + # if self.is_linked_bundle(bundle_uuid): + # bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED + # return + + # # print(bundle_migration_status) + # # print(2) + + # # if db already changed + # # TODO: Check if bundle_location is azure (see other places in code base.) + # if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # bundle_migration_status.error_message = '' + # return + # # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # # bundle_migration_status.error_message = '' + # # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( + # # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # # )[0]): + # # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # # bundle_migration_status.error_message = '' + + # # Upload to Azure. + # # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) + # if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): + # print("UPLOADING") + # self.logger.info("Uploading to Azure") + # start_time = time.time() + # self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) + # self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) + # success, reason = self.sanity_check( + # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # ) + # if not success: + # raise ValueError(f"SanityCheck failed with {reason}") + # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # bundle_migration_status.error_message = None + + # # Change bundle metadata in database to point to the Azure Blob location (not disk) + # if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: + # print("Changing DB") + # self.logger.info("Changing DB") + # start_time = time.time() + # self.modify_bundle_data(bundle, bundle_uuid, is_dir) + # self.times["modify_bundle_data"].append(time.time() - start_time) + # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # bundle_migration_status.error_message = None + + # # Delete the bundle from disk. + # if self.delete and not is_bundle_rm: + # self.logger.info("Deleting from disk") + # start_time = time.time() + # delete_status = self.delete_original_bundle(bundle_uuid) + # if not delete_status: + # self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") + # # if os.path.lexists(disk_location): + # # # Delete it. + # # path_util.remove(disk_bundle_location) + # self.times["delete_original_bundle"].append(time.time() - start_time) + # bundle_migration_status.status = MigrationStatus.FINISHED + # bundle_migration_status.error_message = None self.times["migrate_bundle"].append(time.time() - total_start_time) @@ -637,7 +633,6 @@ def log_times(self): self.logger.info(json.dumps(output_dict, sort_keys=True, indent=4)) def write_bundle_statuses(self): - return new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses]) if self.existing_bundle_migration_statuses is None: self.existing_bundle_migration_statuses = new_records diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index a8265614a..5dc539be7 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -3175,6 +3175,17 @@ def add_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None: } connection.execute(cl_bundle_location.insert().values(bundle_location_value)) + def remove_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> None: + """ + Removes a new bundle location to the specified bundle. + + Args: + bundle_uuid (str): The uuid for the bundle which we want to remove a BundleLocation to. + bundle_store_uuid (str): The uuid for the bundle store we are associating with the new BundleLocation. + """ + with self.engine.begin() as connection: + connection.execute(cl_bundle_location.delete().where((cl_bundle_location.c.bundle_uuid == bundle_uuid) & (cl_bundle_location.c.bundle_store_uuid == bundle_store_uuid))) + def get_bundle_location(self, bundle_uuid: str, bundle_store_uuid: str) -> dict: """ Returns data about the location associated with the specified bundle and bundle store.