From bc7e2ef749d1923a7b9124e614f1bd6221290a98 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 29 Feb 2024 12:56:08 -0800 Subject: [PATCH] All debug changes --- codalab/lib/bundle_cli.py | 4 + codalab/migration.py | 176 ++++++++++++++++++++++++-------------- 2 files changed, 116 insertions(+), 64 deletions(-) diff --git a/codalab/lib/bundle_cli.py b/codalab/lib/bundle_cli.py index 3a5a93f6a..ac205bc05 100644 --- a/codalab/lib/bundle_cli.py +++ b/codalab/lib/bundle_cli.py @@ -1506,6 +1506,8 @@ def do_download_command(self, args): default_client, default_worksheet_uuid, args.target_spec ) + print(default_worksheet_uuid) + # Figure out where to download. info = client.fetch('bundles', target.bundle_uuid) if args.output_path: @@ -1516,6 +1518,8 @@ def do_download_command(self, args): if target.subpath == '' else os.path.basename(target.subpath) ) + + print(info) final_path = os.path.join(os.getcwd(), local_path) if os.path.exists(final_path): if args.force: diff --git a/codalab/migration.py b/codalab/migration.py index 86b30ea66..d2234f655 100644 --- a/codalab/migration.py +++ b/codalab/migration.py @@ -38,6 +38,7 @@ from codalab.common import ( StorageType, StorageURLScheme, + parse_linked_bundle_url ) from codalab.lib.print_util import FileTransferProgress from codalab.lib import ( @@ -163,7 +164,7 @@ def __init__(self, target_store_name, change_db, delete, proc_id, sanity_check_n self.bundle_migration_statuses = list() if os.path.exists(self.get_bundle_statuses_path()): - self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path()) + self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path(), dtype={'uuid': object, 'status': object, 'error_message': object}) else: self.existing_bundle_migration_statuses = None @@ -340,6 +341,9 @@ def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_lo success, reason = None, None + # print(bundle_location, new_location) + # return True, None + if is_dir: # For dirs, check the folder contains same files with OpenFile(new_location, gzipped=True) as f: @@ -477,15 +481,33 @@ def migrate_bundle(self, bundle_uuid): self.logger.info("Getting Bundle info") bundle = self.get_bundle(bundle_uuid) bundle_location = self.get_bundle_location(bundle_uuid) + print(bundle) + print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid)) + print(bundle_location) + # print("contents.gz" in bundle_location) # This is for handling cases where rm -d was run on the bundle is_bundle_rm = False bundle_info = None + new_bundle_location = bundle_location[:-2] + "tar.gz" + # print(new_bundle_location) + # print(self.get_bundle_info(bundle_uuid, new_bundle_location)) + # linked_info = parse_linked_bundle_url(bundle_location) + try: bundle_info = self.get_bundle_info(bundle_uuid, bundle_location) except Exception as e: + # print(e) if "Path ''" in str(e): - for i in range(0, 10): + # if bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # try: + # if "contents.gz" in bundle_location: + # bundle_info = self.get_bundle_info(bundle_uuid, new_bundle_location) + # print(bundle_info) + # except: + # pass + + for i in range(0, 11): try: bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}') bundle_location = f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}' @@ -498,70 +520,95 @@ def migrate_bundle(self, bundle_uuid): is_bundle_rm = True is_dir = False bundle_migration_status.status = MigrationStatus.NOT_STARTED - bundle_migration_status.error_message = None + bundle_migration_status.error_message = '' else: - raise e - + pass + + # if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # self.bundle_manager._model.update_bundle( + # bundle, + # { + # 'is_dir': bundle_info['type'] == 'directory', + # } + # ) + + print(bundle_info, bundle_location, is_bundle_rm) + # print(bundle_migration_status) + # Normal Migration - if not is_bundle_rm: - is_dir = bundle_info['type'] == 'directory' - target_location = self.blob_target_location(bundle_uuid, is_dir) - disk_location = self.get_bundle_disk_location(bundle_uuid) - - # Don't migrate currently running bundles - if bundle.state not in State.FINAL_STATES: - bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL - return - - # Don't migrate linked bundles - if self.is_linked_bundle(bundle_uuid): - bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED - return - - # if db already changed - # TODO: Check if bundle_location is azure (see other places in code base.) - if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - return - elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): - bundle_migration_status.status = MigrationStatus.CHANGED_DB - elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( - bundle_uuid, bundle_location, bundle_info, is_dir, target_location - )[0]): - bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - bundle_migration_status.error_message = None - - # Upload to Azure. - # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) - if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): - self.logger.info("Uploading to Azure") - start_time = time.time() - self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) - self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) - success, reason = self.sanity_check( - bundle_uuid, bundle_location, bundle_info, is_dir, target_location - ) - if not success: - raise ValueError(f"SanityCheck failed with {reason}") - bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE - bundle_migration_status.error_message = None - - # Change bundle metadata in database to point to the Azure Blob location (not disk) - if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: - self.logger.info("Changing DB") - start_time = time.time() - self.modify_bundle_data(bundle, bundle_uuid, is_dir) - self.times["modify_bundle_data"].append(time.time() - start_time) - bundle_migration_status.status = MigrationStatus.CHANGED_DB - - # Delete the bundle from disk. - if self.delete and not is_bundle_rm: - self.logger.info("Deleting from disk") - start_time = time.time() - if os.path.lexists(disk_location): - # Delete it. - path_util.remove(disk_bundle_location) - self.times["delete_original_bundle"].append(time.time() - start_time) - bundle_migration_status.status = MigrationStatus.FINISHED + # if not is_bundle_rm: + # is_dir = bundle_info['type'] == 'directory' + # target_location = self.blob_target_location(bundle_uuid, is_dir) + # disk_location = self.get_bundle_disk_location(bundle_uuid) + + # # print(1) + + # # Don't migrate currently running bundles + # if bundle.state not in State.FINAL_STATES: + # bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL + # return + + # # Don't migrate linked bundles + # if self.is_linked_bundle(bundle_uuid): + # bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED + # return + + # # print(bundle_migration_status) + # # print(2) + + # # if db already changed + # # TODO: Check if bundle_location is azure (see other places in code base.) + # if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # bundle_migration_status.error_message = '' + # return + # # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value): + # # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # # bundle_migration_status.error_message = '' + # # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check( + # # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # # )[0]): + # # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # # bundle_migration_status.error_message = '' + + # # Upload to Azure. + # # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location)) + # if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location): + # print("UPLOADING") + # self.logger.info("Uploading to Azure") + # start_time = time.time() + # self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) + # self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) + # success, reason = self.sanity_check( + # bundle_uuid, bundle_location, bundle_info, is_dir, target_location + # ) + # if not success: + # raise ValueError(f"SanityCheck failed with {reason}") + # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE + # bundle_migration_status.error_message = None + + # # Change bundle metadata in database to point to the Azure Blob location (not disk) + # if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message: + # print("Changing DB") + # self.logger.info("Changing DB") + # start_time = time.time() + # self.modify_bundle_data(bundle, bundle_uuid, is_dir) + # self.times["modify_bundle_data"].append(time.time() - start_time) + # bundle_migration_status.status = MigrationStatus.CHANGED_DB + # bundle_migration_status.error_message = None + + # # Delete the bundle from disk. + # if self.delete and not is_bundle_rm: + # self.logger.info("Deleting from disk") + # start_time = time.time() + # delete_status = self.delete_original_bundle(bundle_uuid) + # if not delete_status: + # self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}") + # # if os.path.lexists(disk_location): + # # # Delete it. + # # path_util.remove(disk_bundle_location) + # self.times["delete_original_bundle"].append(time.time() - start_time) + # bundle_migration_status.status = MigrationStatus.FINISHED + # bundle_migration_status.error_message = None self.times["migrate_bundle"].append(time.time() - total_start_time) @@ -589,6 +636,7 @@ def log_times(self): self.logger.info(json.dumps(output_dict, sort_keys=True, indent=4)) def write_bundle_statuses(self): + return new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses]) if self.existing_bundle_migration_statuses is None: self.existing_bundle_migration_statuses = new_records