diff --git a/codalab/migration.py b/codalab/migration.py index a010865c8..f8721e723 100644 --- a/codalab/migration.py +++ b/codalab/migration.py @@ -1,4 +1,11 @@ # A script to migrate bundles from disk storage to Azure storage (UploadBundles, MakeBundles, RunBundles?) +import multiprocessing +from functools import partial +import time +from collections import defaultdict +import json +import numpy as np +import traceback import logging import argparse import os @@ -7,16 +14,14 @@ from codalab.common import ( StorageType, StorageURLScheme, - parse_linked_bundle_url, ) from codalab.lib import ( path_util, - spec_util, zip_util, ) from codalab.worker import download_util -from codalab.worker.download_util import BundleTarget +from codalab.worker.download_util import BundleTarget, PathException from codalab.server.bundle_manager import BundleManager from codalab.lib.upload_manager import BlobStorageUploader from codalab.lib.codalab_manager import CodaLabManager @@ -26,24 +31,24 @@ ) -# Steps: -# 1. Create a database connection. Connecting to local database to get infomations -# 2. Get bundle locations in local filesystem using CodaLabManager() -# Get bundle information using CodaLabManager -# 3. Find the proporate target bundle_url ([uuid]/contents.gz, [uuid]/contents.gz) -# 4. Upload all the bundles from local disk to Azure -# 5. Update database, pointing to new locations -# 6. Delete the original data - - class Migration: """ Base class for BundleManager tests with a CodaLab Manager uses local database. ATTENTION: this class will modify real bundle database. """ - def __init__(self, target_store_name) -> None: + def __init__(self, target_store_name, change_db, delete) -> None: self.target_store_name = target_store_name + self.change_db = change_db + self.delete = delete + self.skipped_ready = ( + self.skipped_link + ) = ( + self.skipped_beam + ) = ( + self.skipped_delete_path_dne + ) = self.path_exception_cnt = self.error_cnt = self.success_cnt = 0 + self.times = defaultdict(list) def setUp(self): self.codalab_manager = CodaLabManager() @@ -65,7 +70,26 @@ def setUp(self): assert StorageType.AZURE_BLOB_STORAGE.value == self.target_store['storage_type'] self.target_store_type = StorageType.AZURE_BLOB_STORAGE - def get_bundle_uuids(self, worksheet_uuid, max_result=100): + # This file is used to log those bundles's location that has been changed in database. + self.logger = self.get_logger() + + def get_logger(self): + """ + Create a logger to log the migration process. + """ + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + handler = logging.FileHandler( + os.path.join(self.codalab_manager.codalab_home, "migration.log") + ) + handler.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def get_bundle_uuids(self, worksheet_uuid, max_result=1e9): if worksheet_uuid is None: bundle_uuids = self.bundle_manager._model.get_all_bundle_uuids(max_results=max_result) else: @@ -79,32 +103,48 @@ def is_linked_bundle(self, bundle_uuid): bundle_link_url = self.bundle_manager._model.get_bundle_metadata( [bundle_uuid], "link_url" ).get(bundle_uuid) - return bundle_link_url is None + # logging.info(f"[migration] bundle {bundle_uuid} bundle_link_url: {bundle_link_url}") + if bundle_link_url: + return True + return False def get_bundle_location(self, bundle_uuid): - # bundle_locations = self.bundle_manager._model.get_bundle_locations(bundle_uuid) # this is getting locations from + # bundle_locations = self.bundle_manager._model.get_bundle_locations(bundle_uuid) # this is getting locations from database + # TODO: why after upload to Azure, this will automatically change to Azure url? bundle_location = self.bundle_manager._bundle_store.get_bundle_location(bundle_uuid) return bundle_location + def get_bundle_disk_location(self, bundle_uuid): + # Get the original bundle location on disk. + # NOTE: This is hacky, but it appears to work. That super() function + # is in the _MultiDiskBundleStore class, and it basically searches through + # all the partitions to find the bundle. + # However, if it doesn't exist, it just returns a good path to store the bundle + # at on disk, so we must check the path exists before deleting. + return super( + type(self.bundle_manager._bundle_store), self.bundle_manager._bundle_store + ).get_bundle_location(bundle_uuid) + def get_bundle(self, bundle_uuid): return self.bundle_manager._model.get_bundle(bundle_uuid) def get_bundle_info(self, bundle_uuid, bundle_location): target = BundleTarget(bundle_uuid, subpath='') - logging.info(f"[migration] {target}") try: info = download_util.get_target_info(bundle_location, target, depth=0) - logging.info(f"[migration] {info}") except Exception as e: logging.info(f"[migration] Error: {str(e)}") raise e return info + def blob_target_location(self, bundle_uuid, is_dir=False): + file_name = "contents.tar.gz" if is_dir else "contents.gz" + return f"{self.target_store_url}/{bundle_uuid}/{file_name}" + def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False): # generate target bundle path - file_name = "contents.tar.gz" if is_dir else "contents.gz" - target_location = f"{self.target_store_url}/{bundle_uuid}/{file_name}" + target_location = self.blob_target_location(bundle_uuid, is_dir) # TODO: This step might cause repeated upload. Can not check by checking size (Azure blob storage is zipped). if FileSystems.exists(target_location): @@ -113,7 +153,7 @@ def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False): uploader = BlobStorageUploader( bundle_model=self.bundle_manager._model, bundle_store=self.bundle_manager._bundle_store, - destination_bundle_store=None, + destination_bundle_store=self.bundle_manager._bundle_store, json_api_client=None, ) @@ -127,18 +167,25 @@ def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False): source_ext = '' unpack = False + logging.info( + "[migration] Uploading from %s to Azure Blob Storage %s, uploaded file size is %s", + bundle_location, + target_location, + path_util.get_path_size(bundle_location), + ) # Upload file content and generate index file uploader.write_fileobj(source_ext, source_fileobj, target_location, unpack_archive=unpack) assert FileSystems.exists(target_location) + return target_location def modify_bundle_data(self, bundle, bundle_uuid, is_dir): """ Change the bundle location in the database ATTENTION: this function will modify codalab """ - - # add bundle location + logging.info(f"[migration] Modifying bundle info {bundle_uuid} in database") + # add bundle location: Add bundle location to database self.bundle_manager._model.add_bundle_location(bundle_uuid, self.target_store_uuid) new_location = self.get_bundle_location(bundle_uuid) @@ -160,10 +207,11 @@ def modify_bundle_data(self, bundle, bundle_uuid, is_dir): }, ) - def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir): - new_location = self.get_bundle_location(bundle_uuid) + def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_location=None): + if new_location is None: + new_location = self.get_bundle_location(bundle_uuid) if is_dir: - # For dirs, check the folder contains same files + # For dirs, check the folder contains same files. with OpenFile(new_location, gzipped=True) as f: new_file_list = tarfile.open(fileobj=f, mode='r:gz').getnames() new_file_list.sort() @@ -172,95 +220,245 @@ def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir): old_file_list = files + dirs old_file_list = [n.replace(bundle_location, '.') for n in old_file_list] old_file_list.sort() + if old_file_list != new_file_list: + return False - assert old_file_list == new_file_list else: # For files, check the file has same contents old_content = read_file_section(bundle_location, 5, 10) new_content = read_file_section(new_location, 5, 10) - assert old_content == new_content - - def delete_original_bundle(self, bundle_uuid, bundle_location): - # Delete data from orginal bundle store - if os.path.exists(bundle_location): - deleted_size = path_util.get_path_size(bundle_location) - bundle_user_id = self.bundle_manager._model.get_bundle_owner_ids([bundle_uuid])[ - bundle_uuid - ] - path_util.remove(bundle_location) - # update user's disk usage: reduce original bundle size + if old_content != new_content: + return False + + old_file_size = path_util.get_path_size(bundle_location) + new_file_size = path_util.get_path_size(new_location) + if old_file_size != new_file_size: + return False + + # check file contents of last 10 bytes + if old_file_size < 10: + if read_file_section(bundle_location, 0, 10) != read_file_section( + new_location, 0, 10 + ): + return False + else: + if read_file_section(bundle_location, old_file_size - 10, 10) != read_file_section( + new_location, old_file_size - 10, 10 + ): + return False + return True + + def delete_original_bundle(self, uuid): + # Get the original bundle location. + disk_bundle_location = self.get_bundle_disk_location(uuid) + if not os.path.lexists(disk_bundle_location): + return False + + # Now, delete the bundle. + path_util.remove(disk_bundle_location) + + def adjust_quota_and_upload_to_blob(self, bundle_uuid, bundle_location, is_dir): + # Get user info + bundle_user_id = self.bundle_manager._model.get_bundle_owner_ids([bundle_uuid])[bundle_uuid] + user_info = self.bundle_manager._model.get_user_info(bundle_user_id) + + # Update user disk quota, making sure quota doesn't go negative. + deleted_size = path_util.get_path_size(bundle_location) + decrement = ( + deleted_size if user_info['disk_used'] > deleted_size else user_info['disk_used'] + ) + new_disk_used = user_info['disk_used'] - decrement + self.bundle_manager._model.update_user_info( + {'user_id': bundle_user_id, 'disk_used': new_disk_used} + ) + + try: + # If upload successfully, user's disk usage will change when uploading to Azure + self.upload_to_azure_blob(bundle_uuid, bundle_location, is_dir) + except Exception as e: + # If upload failed, add user's disk usage back user_info = self.bundle_manager._model.get_user_info(bundle_user_id) - assert user_info['disk_used'] >= deleted_size - new_disk_used = user_info['disk_used'] - deleted_size + new_disk_used = user_info['disk_used'] + decrement self.bundle_manager._model.update_user_info( {'user_id': bundle_user_id, 'disk_used': new_disk_used} ) + raise e # still raise the expcetion to outer try-catch wrapper + + def migrate_bundle(self, bundle_uuid): + try: + total_start_time = time.time() + + # Get bundle information + bundle = self.get_bundle(bundle_uuid) + bundle_location = self.get_bundle_location(bundle_uuid) + bundle_info = self.get_bundle_info(bundle_uuid, bundle_location) + is_dir = bundle_info['type'] == 'directory' + + # Don't migrate currently running bundles + if bundle.state != 'ready': + self.skipped_ready += 1 + return + + # Don't migrate linked bundles + if self.is_linked_bundle(bundle_uuid): + self.skipped_link += 1 + return + + # Migrate bundle. Only migrate if -c, -d not specifid or sanity check FAILS + target_location = self.blob_target_location(bundle_uuid, is_dir) + disk_location = self.get_bundle_disk_location(bundle_uuid) + if os.path.lexists(disk_location) and ( + not FileSystems.exists(target_location) + or not self.sanity_check( + bundle_uuid, disk_location, bundle_info, is_dir, target_location + ) + ): + start_time = time.time() + self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir) + self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time) + if not self.sanity_check( + bundle_uuid, bundle_location, bundle_info, is_dir, target_location + ): + raise ValueError("SanityCheck failed") + self.success_cnt += 1 + + # Change bundle metadata to point to the Azure Blob location (not disk) + if self.change_db: + start_time = time.time() + self.modify_bundle_data(bundle, bundle_uuid, is_dir) + self.times["modify_bundle_data"].append(time.time() - start_time) + + # Delete the bundle from disk. + if self.delete: + start_time = time.time() + deleted = self.delete_original_bundle(bundle_uuid) + if not deleted: + self.skipped_delete_path_dne += 1 + self.times["delete_original_bundle"].append(time.time() - start_time) + + self.times["migrate_bundle"].append(time.time() - total_start_time) + except PathException: + self.path_exception_cnt += 1 + except Exception: + self.error_cnt += 1 + print(traceback.format_exc()) + + def print_times(self): + output_dict = dict() + for k, v in self.times.items(): + output_dict[k] = { + "mean": np.mean(v), + "std": np.std(v), + "range": np.ptp(v), + "median": np.median(v), + "max": np.max(v), + "min": np.min(v), + } + print(json.dumps(output_dict, sort_keys=True, indent=4)) + + def migrate_bundles(self, bundle_uuids, log_interval=1000): + for i, uuid in enumerate(bundle_uuids): + self.migrate_bundle(uuid) + if i > 0 and i % log_interval == 0: + self.print_times() + self.print_times() + + +def job(target_store_name, change_db, delete, worksheet, max_result, num_processes, proc_id): + """A function for running the migration in parallel. + + NOTE: I know this is bad styling since we re-create the Migration object and the + bundle_uuids in each process. However, we cannot pass the same Migration object in as + a parameter to the function given to each process by Pool because the Migration object + is not Pickle-able (indeed, it is not even dill-able) due to one of its member objects + (BundleManager, CodalabManager, etc.), and so this is the compromise we came up with. + """ + # Setup Migration. + migration = Migration(target_store_name, change_db, delete) + migration.setUp() + + # Get bundle uuids. + bundle_uuids = sorted( + migration.get_bundle_uuids(worksheet_uuid=worksheet, max_result=max_result) + ) + + # Sort according to what process you are. + chunk_size = len(bundle_uuids) // num_processes + start_idx = chunk_size * proc_id + end_idx = len(bundle_uuids) if proc_id == num_processes - 1 else chunk_size * (proc_id + 1) + print(f"[migration] ProcessID{proc_id}\tChunk: {chunk_size}\tstart:{start_idx}\tend:{end_idx}") + bundle_uuids = bundle_uuids[start_idx:end_idx] + + # Do the migration. + total = len(bundle_uuids) + print(f"[migration] Start migrating {total} bundles") + migration.migrate_bundles(bundle_uuids) + + print( + f"[migration] Migration finished, total {total} bundles migrated, " + f"skipped {migration.skipped_ready}(ready) " + f"{migration.skipped_link}(linked bundle) " + f"{migration.skipped_beam}(on Azure) bundles, " + f"skipped delete due to path DNE {migration.skipped_delete_path_dne}, " + f"PathException {migration.path_exception_cnt}, " + f"error {migration.error_cnt} bundles. " + f"Succeeed {migration.success_cnt} bundles" + ) + if change_db: + print( + "[migration][Change DB] Database migration finished, bundle location changed in database." + ) + + if delete: + print("[migration][Deleted] Original bundles deleted from local disk.") if __name__ == '__main__': - # Command line parser, parse the worksheet id + # Command line args. parser = argparse.ArgumentParser( description='Manages your local CodaLab Worksheets service deployment' ) parser.add_argument( - '-a', '--all', help='Run migration on all worksheets and all bundles', action='store_true', + '-k', '--max-result', type=int, help='The worksheet uuid that needs migration', default=1e9 ) parser.add_argument( '-w', '--worksheet', type=str, help='The worksheet uuid that needs migration' ) - parser.add_argument('--target_store_name', type=str, help='The destination bundle store name') + parser.add_argument( + '-t', + '--target_store_name', + type=str, + help='The destination bundle store name', + default="azure-store-default", + ) parser.add_argument( '-c', '--change_db', help='Change the bundle location in the database', action='store_true', ) + parser.add_argument('--disable_logging', help='If set, disable logging', action='store_true') + parser.add_argument( + '-p', + '--num_processes', + help="Number of processes for multiprocessing", + default=multiprocessing.cpu_count(), + ) parser.add_argument('-d', '--delete', help='Delete the original database', action='store_true') - args = parser.parse_args() - worksheet_uuid = args.worksheet - target_store_name = ( - "azure-store-default" if args.target_store_name is None else args.target_store_name + # Configure logging + logging.getLogger().setLevel(logging.INFO) + if args.disable_logging: + # Disables logging. Comment out if you want logging + logging.disable(logging.CRITICAL) + + # Run the program with multiprocessing + f = partial( + job, + args.target_store_name, + args.change_db, + args.delete, + args.worksheet, + args.max_result, + args.num_processes, ) - - # TODO: write output to log / log files - migration = Migration(target_store_name) - migration.setUp() - - if args.all: - bundle_uuids = migration.get_bundle_uuids(worksheet_uuid=None) - else: - # Must specify worksheet uuid - if worksheet_uuid is not None and not spec_util.UUID_REGEX.match(worksheet_uuid): - raise Exception("Input worksheet uuid has wrong format. ") - bundle_uuids = migration.get_bundle_uuids(worksheet_uuid) - - for bundle_uuid in bundle_uuids: - logging.info(bundle_uuid) - - bundle = migration.get_bundle(bundle_uuid) - if bundle.bundle_type != 'dataset' or bundle.state != 'ready': - # only migrate uploaded bundle, and the bundle state needs to be ready - continue - - # Uploaded bundles does not need has dependencies - assert len(bundle.dependencies) == 0 - - if migration.is_linked_bundle(bundle_uuid): - # Do not migrate link bundle - continue - - bundle_location = migration.get_bundle_location(bundle_uuid) - - if parse_linked_bundle_url(bundle_location).uses_beam: - # Do not migrate Azure / GCP bundles - continue - - bundle_info = migration.get_bundle_info(bundle_uuid, bundle_location) - - is_dir = bundle_info['type'] == 'directory' - migration.upload_to_azure_blob(bundle_uuid, bundle_location, is_dir) - - if args.change_db: # If need to change the database, continue to run - migration.modify_bundle_data(bundle, bundle_uuid, is_dir) - migration.sanity_check(bundle_uuid, bundle_location, bundle_info, is_dir) - if args.delete: - migration.delete_original_bundle(bundle_uuid, bundle_location) + with multiprocessing.Pool(processes=args.num_processes) as pool: + pool.map(f, list(range(args.num_processes))) diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index 73b665085..a8265614a 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -744,10 +744,8 @@ def get_bundle_uuids(self, conditions, max_results): return self._execute_query(query) - def get_all_bundle_uuids(self, max_results): - clause = and_(true(), cl_bundle.c.uuid == cl_bundle_metadata.c.bundle_uuid) # Join - query = select([cl_bundle.c.uuid]).where(clause) - query = query.order_by(cl_bundle.c.id.desc()).limit(max_results) + def get_all_bundle_uuids(self, max_results=100): + query = select([cl_bundle.c.uuid]).limit(max_results) return self._execute_query(query) def get_memoized_bundles(self, user_id, command, dependencies): diff --git a/codalab/worker/file_util.py b/codalab/worker/file_util.py index 71b50b04a..aca3a0a45 100644 --- a/codalab/worker/file_util.py +++ b/codalab/worker/file_util.py @@ -654,10 +654,8 @@ def update_file_size(bundle_path, file_size): finfo = dict(finfo) finfo['size'] = file_size new_info = tuple([value for _, value in finfo.items()]) - logging.info(finfo) # get the result of a fi tf._setFileInfo(new_info) tf.sqlConnection.commit() # need to mannually commit here - logging.info(f"tf.index_file_name: {tf.indexFilePath}") # Update the index file stored in blob storage FileSystems.delete([parse_linked_bundle_url(bundle_path).index_path])