diff --git a/codalab/migration.py b/codalab/migration.py index 4e911d491..86a2261c1 100644 --- a/codalab/migration.py +++ b/codalab/migration.py @@ -488,11 +488,6 @@ def migrate_bundle(self, bundle_uuid): # This is for handling cases where rm -d was run on the bundle is_bundle_rm = False bundle_info = None - new_bundle_location = bundle_location[:-2] + "tar.gz" - # print(new_bundle_location) - # print(self.get_bundle_info(bundle_uuid, new_bundle_location)) - # linked_info = parse_linked_bundle_url(bundle_location).is_archive_dir - # print(linked_info) try: bundle_info = self.get_bundle_info(bundle_uuid, bundle_location) @@ -516,96 +511,91 @@ def migrate_bundle(self, bundle_uuid): else: pass - if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - self.bundle_manager._model.update_bundle( - bundle, - { - 'storage_type': StorageType.DISK_STORAGE.value, - 'is_dir': bundle_info['type'] == 'directory', - } - ) - self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid) - bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED - bundle_migration_status.error_message = '' - print(self.bundle_manager._model.get_bundle_locations(bundle_uuid)) + # if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # self.bundle_manager._model.update_bundle( + # bundle, + # { + # 'storage_type': StorageType.DISK_STORAGE.value, + # 'is_dir': bundle_info['type'] == 'directory', + # } + # ) + # self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid) + # bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED + # bundle_migration_status.error_message = '' + # print(self.bundle_manager._model.get_bundle_locations(bundle_uuid)) - print(bundle_info, bundle_location, is_bundle_rm) + # print(bundle_info, bundle_location, is_bundle_rm) # print(bundle_migration_status) - # # Normal Migration - # if not is_bundle_rm: - # is_dir = bundle_info['type'] == 'directory' - # target_location = self.blob_target_location(bundle_uuid, is_dir) - # disk_location = self.get_bundle_disk_location(bundle_uuid) - - # # print(1) - - # # Don't migrate currently running bundles - # if bundle.state not in State.FINAL_STATES: - # bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL - # return - - # # Don't migrate linked bundles - # if self.is_linked_bundle(bundle_uuid): - # bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED - # return - - # # print(bundle_migration_status) - # # print(2) - - # # if db already changed - # # TODO: Check if bundle_location is azure (see other places in code base.) - # if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - # bundle_migration_status.error_message = '' - # return - # # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - # # bundle_migration_status.status = MigrationStatus.CHANGED_DB - # # bundle_migration_status.error_message = '' - # # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( - # # bundle_uuid, bundle_location, bundle_info, is_dir, target_location - # # )[0]): - # # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - # # bundle_migration_status.error_message = '' - - # # Upload to Azure. - # # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) - # if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): - # print("UPLOADING") - # self.logger.info("Uploading to Azure") - # start_time = time.time() - # self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) - # self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) - # success, reason = self.sanity_check( - # bundle_uuid, bundle_location, bundle_info, is_dir, target_location - # ) - # if not success: - # raise ValueError(f"SanityCheck failed with {reason}") - # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - # bundle_migration_status.error_message = None - - # # Change bundle metadata in database to point to the Azure Blob location (not disk) - # if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: - # print("Changing DB") - # self.logger.info("Changing DB") - # start_time = time.time() - # self.modify_bundle_data(bundle, bundle_uuid, is_dir) - # self.times["modify_bundle_data"].append(time.time() - start_time) - # bundle_migration_status.status = MigrationStatus.CHANGED_DB - # bundle_migration_status.error_message = None - - # # Delete the bundle from disk. - # if self.delete and not is_bundle_rm: - # self.logger.info("Deleting from disk") - # start_time = time.time() - # delete_status = self.delete_original_bundle(bundle_uuid) - # if not delete_status: - # self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") - # # if os.path.lexists(disk_location): - # # # Delete it. - # # path_util.remove(disk_bundle_location) - # self.times["delete_original_bundle"].append(time.time() - start_time) - # bundle_migration_status.status = MigrationStatus.FINISHED - # bundle_migration_status.error_message = None + # Normal Migration + if not is_bundle_rm: + is_dir = bundle_info['type'] == 'directory' + target_location = self.blob_target_location(bundle_uuid, is_dir) + disk_location = self.get_bundle_disk_location(bundle_uuid) + + # Don't migrate currently running bundles + if bundle.state not in State.FINAL_STATES: + bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL + return + + # Don't migrate linked bundles + if self.is_linked_bundle(bundle_uuid): + bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED + return + + # if db already changed + # TODO: Check if bundle_location is azure (see other places in code base.) + if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + bundle_migration_status.error_message = '' + return + # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # bundle_migration_status.error_message = '' + # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( + # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # )[0]): + # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # bundle_migration_status.error_message = '' + + # Upload to Azure. + # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) + if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): + print("UPLOADING") + self.logger.info("Uploading to Azure") + start_time = time.time() + self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) + self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) + success, reason = self.sanity_check( + bundle_uuid, bundle_location, bundle_info, is_dir, target_location + ) + if not success: + raise ValueError(f"SanityCheck failed with {reason}") + bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + bundle_migration_status.error_message = None + + # Change bundle metadata in database to point to the Azure Blob location (not disk) + if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: + print("Changing DB") + self.logger.info("Changing DB") + start_time = time.time() + self.modify_bundle_data(bundle, bundle_uuid, is_dir) + self.times["modify_bundle_data"].append(time.time() - start_time) + bundle_migration_status.status = MigrationStatus.CHANGED_DB + bundle_migration_status.error_message = None + + # Delete the bundle from disk. + if self.delete and not is_bundle_rm: + self.logger.info("Deleting from disk") + start_time = time.time() + delete_status = self.delete_original_bundle(bundle_uuid) + if not delete_status: + self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") + # if os.path.lexists(disk_location): + # # Delete it. + # path_util.remove(disk_bundle_location) + self.times["delete_original_bundle"].append(time.time() - start_time) + bundle_migration_status.status = MigrationStatus.FINISHED + bundle_migration_status.error_message = None self.times["migrate_bundle"].append(time.time() - total_start_time) @@ -634,6 +624,7 @@ def log_times(self): def write_bundle_statuses(self): new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses]) + new_records = new_records.astype({'uuid': object, 'status': object, 'error_message': object}) if self.existing_bundle_migration_statuses is None: self.existing_bundle_migration_statuses = new_records else: @@ -645,6 +636,7 @@ def write_bundle_statuses(self): def migrate_bundles(self, bundle_uuids, log_interval=100): total = len(bundle_uuids) for i, uuid in enumerate(bundle_uuids): + print(f"Migrating bundle {i} out of {total}") self.logger.info(f"migrating {uuid}") self.migrate_bundle(uuid) self.logger.info("status: %d / %d", i, total) @@ -674,6 +666,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul bundle_uuids_df.to_csv(migration.get_bundle_ids_path(), index=False, mode='w') print(f"[migration] Recorded all bundle ids to be migrated") + # data = pd.read_csv(bundle_uuids) + # bundle_uuids = data["0"].tolist()[100001:] + # Sort according to what process you are. chunk_size = len(bundle_uuids) // num_processes start_idx = chunk_size * proc_id @@ -701,6 +696,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul parser.add_argument( '-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.' ) + # parser.add_argument( + # '-u', '--bundle-uuids', type=str, help='List of bundle UUIDs to migrate.' + # ) parser.add_argument( '-t', '--target_store_name',