diff --git a/scripts/migrate-disk-to-blob.py b/scripts/migrate-disk-to-blob.py index 47178c439..b8286b621 100644 --- a/scripts/migrate-disk-to-blob.py +++ b/scripts/migrate-disk-to-blob.py @@ -102,13 +102,14 @@ class Migration: Performs the migration. """ - def __init__(self, migration_states_path, target_store_name, upload, change_db, verify, delete, proc_id): + def __init__(self, migration_states_path, target_store_name, upload, change_db, verify, delete_disk, delete_target, proc_id): self.migration_states_path = migration_states_path self.target_store_name = target_store_name self.upload = upload self.change_db = change_db self.verify = verify - self.delete = delete + self.delete_disk = delete_disk + self.delete_target = delete_target self.proc_id = proc_id self.times = defaultdict(list) @@ -148,15 +149,9 @@ def write_migration_states(self): with open(self.migration_states_path, "w") as f: print(json.dumps(dict((uuid, asdict(state)) for uuid, state in self.migration_states.items())), file=f) - def get_bundle_uuids(self, worksheet_uuid, max_bundles): - if worksheet_uuid is None: - bundle_uuids = self.model.get_all_bundle_uuids(max_bundless=max_bundles) - else: - bundle_uuids = self.model.get_bundle_uuids( - {'name': None, 'worksheet_uuid': worksheet_uuid, 'user_id': self.root_user_id}, - max_bundless=max_bundles, # return all bundles in the worksheets - ) - return list(set(bundle_uuids)) + def get_bundle_uuids(self, max_bundles): + bundle_uuids = self.model.get_all_bundle_uuids(max_results=max_bundles) + return sorted(list(set(bundle_uuids))) def is_linked_bundle(self, bundle_uuid): bundle_link_url = self.model.get_bundle_metadata( @@ -204,14 +199,10 @@ def blob_target_location(self, bundle_uuid, is_dir): def blob_index_location(self, bundle_uuid): return f"{self.target_store_url}/{bundle_uuid}/index.sqlite" - def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False): + def upload_to_azure_blob(self, bundle_uuid, disk_location, is_dir=False): # generate target bundle path target_location = self.blob_target_location(bundle_uuid, is_dir) - # TODO: delete? - if FileSystems.exists(target_location): - path_util.remove(target_location) - uploader = BlobStorageUploader( bundle_model=self.model, bundle_store=self.bundle_store, @@ -220,21 +211,17 @@ def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False): ) if is_dir: - source_fileobj = tar_gzip_directory(bundle_location) + source_fileobj = tar_gzip_directory(disk_location, exclude_patterns=None) source_ext = ".tar.gz" unpack = True else: # If it's a file, change it into GzipStream - source_fileobj = open(bundle_location, 'rb') + source_fileobj = open(disk_location, 'rb') source_ext = '' unpack = False - print( - "Uploading from %s to Azure Blob Storage %s, uploaded file size is %s", - bundle_location, - target_location, - path_util.get_path_size(bundle_location), - ) + size = path_util.get_path_size(disk_location) + print(f"Uploading {disk_location} to {target_location} (size {size})") # Upload file content and generate index file # NOTE: We added a timeout (using the Timer class) since sometimes bundles just never uploaded @@ -283,7 +270,6 @@ def sanity_check(self, bundle_uuid, disk_location, is_dir, target_location, inde # Check index try: info = _compute_target_info_blob(target_location, depth=10000) - print(info) except Exception as e: return False, f"Unable to read info: {e}" @@ -299,7 +285,9 @@ def sanity_check(self, bundle_uuid, disk_location, is_dir, target_location, inde old_file_list = files + dirs old_file_list.sort() if old_file_list != new_file_list: - return False, "Directory file lists differ." + print("OLD", old_file_list) + print("NEW", new_file_list) + return False, f"Directory file lists differ: {len(old_file_list)} versus {len(new_file_list)}" return True, f"{len(new_file_list)} directories/files match" @@ -370,16 +358,17 @@ def migrate_bundle(self, bundle_uuid): disk_location = self.get_bundle_disk_location(bundle_uuid) on_disk = disk_location is not None - is_dir = os.path.isdir(disk_location) if on_disk else None + if on_disk: + is_dir = os.path.isdir(disk_location) + else: + is_dir = "contents.tar.gz" in bundle_location is_link = self.is_linked_bundle(bundle_uuid) changed_db = bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value) - target_location = self.blob_target_location(bundle_uuid, is_dir) if is_dir else None + target_location = self.blob_target_location(bundle_uuid, is_dir) index_location = self.blob_index_location(bundle_uuid) - on_azure = target_location and FileSystems.exists(target_location) and FileSystems.exists(index_location) - #print("AAAAAAA", target_location, FileSystems.exists(target_location)) - #print("BBBBBBB", index_location, FileSystems.exists(index_location)) + on_azure = FileSystems.exists(target_location) and FileSystems.exists(index_location) # Create new migration state if bundle_uuid in self.migration_states: @@ -405,13 +394,14 @@ def migrate_bundle(self, bundle_uuid): should_upload = self.upload and state.on_disk and not state.on_azure should_verify = self.verify and state.on_azure and not state.verified should_change_db = self.change_db and state.on_azure and not state.changed_db - should_delete = self.delete and state.changed_db and state.on_azure and state.on_disk + should_delete_disk = self.delete_disk and state.changed_db and state.on_azure and state.on_disk + should_delete_target = self.delete_target and state.on_azure # Upload to Azure if should_upload: print(f" PERFORM: upload {bundle_uuid}") start_time = time.time() - self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) + self.adjust_quota_and_upload_to_blob(bundle_uuid, disk_location, is_dir) self.times["upload"].append(time.time() - start_time) state.on_azure = True @@ -433,13 +423,22 @@ def migrate_bundle(self, bundle_uuid): state.messages.append(reason) # Delete from disk - if should_delete: - print(" PERFORM: delete") + if should_delete_disk: + print(" PERFORM: delete disk") start_time = time.time() path_util.remove(disk_location) - self.times["delete"].append(time.time() - start_time) + self.times["delete_disk"].append(time.time() - start_time) state.on_disk = False + # Delete from target (to undo failed uploads) + if should_delete_target: + print(" PERFORM: delete target") + start_time = time.time() + FileSystems.delete([target_location, index_location]) + self.times["delete_target"].append(time.time() - start_time) + state.on_azure = False + state.verified = False + except Exception as e: print(f"Error for {bundle_uuid}: {traceback.format_exc()}") self.migration_states[bundle_uuid].messages.append(str(e)) @@ -466,7 +465,7 @@ def migrate_bundles(self, bundle_uuids, log_interval=100): self.write_migration_states() -def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids, max_bundles, num_processes, proc_id): +def run_job(target_store_name, upload, change_db, verify, delete_disk, delete_target, bundle_uuids, max_bundles, num_processes, proc_id): """ NOTE: I know this is bad styling since we re-create the Migration object and the bundle_uuids in each process. However, we cannot pass the same Migration object in as @@ -482,15 +481,14 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids, upload=upload, change_db=change_db, verify=verify, - delete=delete, + delete_disk=delete_disk, + delete_target=delete_target, proc_id=proc_id, ) # Get all bundle uuids (if not already provided) if not bundle_uuids: - bundle_uuids = sorted( - migration.get_bundle_uuids(worksheet_uuid=worksheet, max_bundles=max_bundles) - ) + bundle_uuids = migration.get_bundle_uuids(max_bundles=max_bundles) # Keep only the ones for this process selected_bundle_uuids = [uuid for uuid in bundle_uuids if hash(uuid) % num_processes == proc_id] @@ -500,13 +498,14 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids, if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--max-bundles', type=int, help='Maximum number of bundles to migrate', default=1e9) + parser.add_argument('-m', '--max-bundles', type=int, help='Maximum number of bundles to migrate', default=1e9) parser.add_argument('-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.') parser.add_argument('-t', '--target-store-name', type=str, help='The destination bundle store name', default="blob-prod") parser.add_argument('-U', '--upload', help='Upload', action='store_true') parser.add_argument('-C', '--change-db', help='Change the db', action='store_true') parser.add_argument('-V', '--verify', help='Verify contents', action='store_true') - parser.add_argument('-D', '--delete', help='Delete the original database', action='store_true') + parser.add_argument('-D', '--delete-disk', help='Delete the disk version', action='store_true') + parser.add_argument('--delete-target', help='Delete the target version (to redo)', action='store_true') parser.add_argument('-p', '--num-processes', type=int, help="Number of processes for multiprocessing", default=1) args = parser.parse_args() @@ -517,7 +516,8 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids, args.upload, args.change_db, args.verify, - args.delete, + args.delete_disk, + args.delete_target, args.bundle_uuids, args.max_bundles, args.num_processes,