Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CodaLab committed Oct 25, 2024
1 parent bc3fb4e commit 189ff91
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions scripts/migrate-disk-to-blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ class Migration:
Performs the migration.
"""

def __init__(self, migration_states_path, target_store_name, upload, change_db, verify, delete, proc_id):
def __init__(self, migration_states_path, target_store_name, upload, change_db, verify, delete_disk, delete_target, proc_id):
self.migration_states_path = migration_states_path
self.target_store_name = target_store_name
self.upload = upload
self.change_db = change_db
self.verify = verify
self.delete = delete
self.delete_disk = delete_disk
self.delete_target = delete_target
self.proc_id = proc_id

self.times = defaultdict(list)
Expand Down Expand Up @@ -148,15 +149,9 @@ def write_migration_states(self):
with open(self.migration_states_path, "w") as f:
print(json.dumps(dict((uuid, asdict(state)) for uuid, state in self.migration_states.items())), file=f)

def get_bundle_uuids(self, worksheet_uuid, max_bundles):
if worksheet_uuid is None:
bundle_uuids = self.model.get_all_bundle_uuids(max_bundless=max_bundles)
else:
bundle_uuids = self.model.get_bundle_uuids(
{'name': None, 'worksheet_uuid': worksheet_uuid, 'user_id': self.root_user_id},
max_bundless=max_bundles, # return all bundles in the worksheets
)
return list(set(bundle_uuids))
def get_bundle_uuids(self, max_bundles):
bundle_uuids = self.model.get_all_bundle_uuids(max_results=max_bundles)
return sorted(list(set(bundle_uuids)))

def is_linked_bundle(self, bundle_uuid):
bundle_link_url = self.model.get_bundle_metadata(
Expand Down Expand Up @@ -204,14 +199,10 @@ def blob_target_location(self, bundle_uuid, is_dir):
def blob_index_location(self, bundle_uuid):
return f"{self.target_store_url}/{bundle_uuid}/index.sqlite"

def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False):
def upload_to_azure_blob(self, bundle_uuid, disk_location, is_dir=False):
# generate target bundle path
target_location = self.blob_target_location(bundle_uuid, is_dir)

# TODO: delete?
if FileSystems.exists(target_location):
path_util.remove(target_location)

uploader = BlobStorageUploader(
bundle_model=self.model,
bundle_store=self.bundle_store,
Expand All @@ -220,21 +211,17 @@ def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False):
)

if is_dir:
source_fileobj = tar_gzip_directory(bundle_location)
source_fileobj = tar_gzip_directory(disk_location, exclude_patterns=None)
source_ext = ".tar.gz"
unpack = True
else:
# If it's a file, change it into GzipStream
source_fileobj = open(bundle_location, 'rb')
source_fileobj = open(disk_location, 'rb')
source_ext = ''
unpack = False

print(
"Uploading from %s to Azure Blob Storage %s, uploaded file size is %s",
bundle_location,
target_location,
path_util.get_path_size(bundle_location),
)
size = path_util.get_path_size(disk_location)
print(f"Uploading {disk_location} to {target_location} (size {size})")

# Upload file content and generate index file
# NOTE: We added a timeout (using the Timer class) since sometimes bundles just never uploaded
Expand Down Expand Up @@ -283,7 +270,6 @@ def sanity_check(self, bundle_uuid, disk_location, is_dir, target_location, inde
# Check index
try:
info = _compute_target_info_blob(target_location, depth=10000)
print(info)
except Exception as e:
return False, f"Unable to read info: {e}"

Expand All @@ -299,7 +285,9 @@ def sanity_check(self, bundle_uuid, disk_location, is_dir, target_location, inde
old_file_list = files + dirs
old_file_list.sort()
if old_file_list != new_file_list:
return False, "Directory file lists differ."
print("OLD", old_file_list)
print("NEW", new_file_list)
return False, f"Directory file lists differ: {len(old_file_list)} versus {len(new_file_list)}"

return True, f"{len(new_file_list)} directories/files match"

Expand Down Expand Up @@ -370,16 +358,17 @@ def migrate_bundle(self, bundle_uuid):
disk_location = self.get_bundle_disk_location(bundle_uuid)
on_disk = disk_location is not None

is_dir = os.path.isdir(disk_location) if on_disk else None
if on_disk:
is_dir = os.path.isdir(disk_location)
else:
is_dir = "contents.tar.gz" in bundle_location
is_link = self.is_linked_bundle(bundle_uuid)

changed_db = bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value)

target_location = self.blob_target_location(bundle_uuid, is_dir) if is_dir else None
target_location = self.blob_target_location(bundle_uuid, is_dir)
index_location = self.blob_index_location(bundle_uuid)
on_azure = target_location and FileSystems.exists(target_location) and FileSystems.exists(index_location)
#print("AAAAAAA", target_location, FileSystems.exists(target_location))
#print("BBBBBBB", index_location, FileSystems.exists(index_location))
on_azure = FileSystems.exists(target_location) and FileSystems.exists(index_location)

# Create new migration state
if bundle_uuid in self.migration_states:
Expand All @@ -405,13 +394,14 @@ def migrate_bundle(self, bundle_uuid):
should_upload = self.upload and state.on_disk and not state.on_azure
should_verify = self.verify and state.on_azure and not state.verified
should_change_db = self.change_db and state.on_azure and not state.changed_db
should_delete = self.delete and state.changed_db and state.on_azure and state.on_disk
should_delete_disk = self.delete_disk and state.changed_db and state.on_azure and state.on_disk
should_delete_target = self.delete_target and state.on_azure

# Upload to Azure
if should_upload:
print(f" PERFORM: upload {bundle_uuid}")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.adjust_quota_and_upload_to_blob(bundle_uuid, disk_location, is_dir)
self.times["upload"].append(time.time() - start_time)
state.on_azure = True

Expand All @@ -433,13 +423,22 @@ def migrate_bundle(self, bundle_uuid):
state.messages.append(reason)

# Delete from disk
if should_delete:
print(" PERFORM: delete")
if should_delete_disk:
print(" PERFORM: delete disk")
start_time = time.time()
path_util.remove(disk_location)
self.times["delete"].append(time.time() - start_time)
self.times["delete_disk"].append(time.time() - start_time)
state.on_disk = False

# Delete from target (to undo failed uploads)
if should_delete_target:
print(" PERFORM: delete target")
start_time = time.time()
FileSystems.delete([target_location, index_location])
self.times["delete_target"].append(time.time() - start_time)
state.on_azure = False
state.verified = False

except Exception as e:
print(f"Error for {bundle_uuid}: {traceback.format_exc()}")
self.migration_states[bundle_uuid].messages.append(str(e))
Expand All @@ -466,7 +465,7 @@ def migrate_bundles(self, bundle_uuids, log_interval=100):
self.write_migration_states()


def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids, max_bundles, num_processes, proc_id):
def run_job(target_store_name, upload, change_db, verify, delete_disk, delete_target, bundle_uuids, max_bundles, num_processes, proc_id):
"""
NOTE: I know this is bad styling since we re-create the Migration object and the
bundle_uuids in each process. However, we cannot pass the same Migration object in as
Expand All @@ -482,15 +481,14 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids,
upload=upload,
change_db=change_db,
verify=verify,
delete=delete,
delete_disk=delete_disk,
delete_target=delete_target,
proc_id=proc_id,
)

# Get all bundle uuids (if not already provided)
if not bundle_uuids:
bundle_uuids = sorted(
migration.get_bundle_uuids(worksheet_uuid=worksheet, max_bundles=max_bundles)
)
bundle_uuids = migration.get_bundle_uuids(max_bundles=max_bundles)

# Keep only the ones for this process
selected_bundle_uuids = [uuid for uuid in bundle_uuids if hash(uuid) % num_processes == proc_id]
Expand All @@ -500,13 +498,14 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids,

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--max-bundles', type=int, help='Maximum number of bundles to migrate', default=1e9)
parser.add_argument('-m', '--max-bundles', type=int, help='Maximum number of bundles to migrate', default=1e9)
parser.add_argument('-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.')
parser.add_argument('-t', '--target-store-name', type=str, help='The destination bundle store name', default="blob-prod")
parser.add_argument('-U', '--upload', help='Upload', action='store_true')
parser.add_argument('-C', '--change-db', help='Change the db', action='store_true')
parser.add_argument('-V', '--verify', help='Verify contents', action='store_true')
parser.add_argument('-D', '--delete', help='Delete the original database', action='store_true')
parser.add_argument('-D', '--delete-disk', help='Delete the disk version', action='store_true')
parser.add_argument('--delete-target', help='Delete the target version (to redo)', action='store_true')
parser.add_argument('-p', '--num-processes', type=int, help="Number of processes for multiprocessing", default=1)
args = parser.parse_args()

Expand All @@ -517,7 +516,8 @@ def run_job(target_store_name, upload, change_db, verify, delete, bundle_uuids,
args.upload,
args.change_db,
args.verify,
args.delete,
args.delete_disk,
args.delete_target,
args.bundle_uuids,
args.max_bundles,
args.num_processes,
Expand Down

0 comments on commit 189ff91

Please sign in to comment.