Skip to content

Commit

Permalink
Temp changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed May 31, 2024
1 parent 82aa8b4 commit cc15a6c
Showing 1 changed file with 90 additions and 92 deletions.
182 changes: 90 additions & 92 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,6 @@ def migrate_bundle(self, bundle_uuid):
# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
bundle_info = None
new_bundle_location = bundle_location[:-2] + "tar.gz"
# print(new_bundle_location)
# print(self.get_bundle_info(bundle_uuid, new_bundle_location))
# linked_info = parse_linked_bundle_url(bundle_location).is_archive_dir
# print(linked_info)

try:
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
Expand All @@ -516,96 +511,91 @@ def migrate_bundle(self, bundle_uuid):
else:
pass

if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
self.bundle_manager._model.update_bundle(
bundle,
{
'storage_type': StorageType.DISK_STORAGE.value,
'is_dir': bundle_info['type'] == 'directory',
}
)
self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid)
bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED
bundle_migration_status.error_message = ''
print(self.bundle_manager._model.get_bundle_locations(bundle_uuid))
# if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# self.bundle_manager._model.update_bundle(
# bundle,
# {
# 'storage_type': StorageType.DISK_STORAGE.value,
# 'is_dir': bundle_info['type'] == 'directory',
# }
# )
# self.bundle_manager._model.remove_bundle_location(bundle_uuid, self.target_store_uuid)
# bundle_migration_status.status = MigrationStatus.UPLOADED_NOT_INDEXED
# bundle_migration_status.error_message = ''
# print(self.bundle_manager._model.get_bundle_locations(bundle_uuid))

print(bundle_info, bundle_location, is_bundle_rm)
# print(bundle_info, bundle_location, is_bundle_rm)
# print(bundle_migration_status)

# # Normal Migration
# if not is_bundle_rm:
# is_dir = bundle_info['type'] == 'directory'
# target_location = self.blob_target_location(bundle_uuid, is_dir)
# disk_location = self.get_bundle_disk_location(bundle_uuid)

# # print(1)

# # Don't migrate currently running bundles
# if bundle.state not in State.FINAL_STATES:
# bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
# return

# # Don't migrate linked bundles
# if self.is_linked_bundle(bundle_uuid):
# bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
# return

# # print(bundle_migration_status)
# # print(2)

# # if db already changed
# # TODO: Check if bundle_location is azure (see other places in code base.)
# if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.error_message = ''
# return
# # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# # bundle_migration_status.status = MigrationStatus.CHANGED_DB
# # bundle_migration_status.error_message = ''
# # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# # bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# # )[0]):
# # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# # bundle_migration_status.error_message = ''

# # Upload to Azure.
# # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
# if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
# print("UPLOADING")
# self.logger.info("Uploading to Azure")
# start_time = time.time()
# self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
# self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
# success, reason = self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )
# if not success:
# raise ValueError(f"SanityCheck failed with {reason}")
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = None

# # Change bundle metadata in database to point to the Azure Blob location (not disk)
# if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
# print("Changing DB")
# self.logger.info("Changing DB")
# start_time = time.time()
# self.modify_bundle_data(bundle, bundle_uuid, is_dir)
# self.times["modify_bundle_data"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = None

# # Delete the bundle from disk.
# if self.delete and not is_bundle_rm:
# self.logger.info("Deleting from disk")
# start_time = time.time()
# delete_status = self.delete_original_bundle(bundle_uuid)
# if not delete_status:
# self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# # if os.path.lexists(disk_location):
# # # Delete it.
# # path_util.remove(disk_bundle_location)
# self.times["delete_original_bundle"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.FINISHED
# bundle_migration_status.error_message = None
# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.error_message = ''
return
# elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = ''
# elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )[0]):
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = ''

# Upload to Azure.
# print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
print("UPLOADING")
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
print("Changing DB")
self.logger.info("Changing DB")
start_time = time.time()
self.modify_bundle_data(bundle, bundle_uuid, is_dir)
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB
bundle_migration_status.error_message = None

# Delete the bundle from disk.
if self.delete and not is_bundle_rm:
self.logger.info("Deleting from disk")
start_time = time.time()
delete_status = self.delete_original_bundle(bundle_uuid)
if not delete_status:
self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# if os.path.lexists(disk_location):
# # Delete it.
# path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED
bundle_migration_status.error_message = None

self.times["migrate_bundle"].append(time.time() - total_start_time)

Expand Down Expand Up @@ -634,6 +624,7 @@ def log_times(self):

def write_bundle_statuses(self):
new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses])
new_records = new_records.astype({'uuid': object, 'status': object, 'error_message': object})
if self.existing_bundle_migration_statuses is None:
self.existing_bundle_migration_statuses = new_records
else:
Expand All @@ -645,6 +636,7 @@ def write_bundle_statuses(self):
def migrate_bundles(self, bundle_uuids, log_interval=100):
total = len(bundle_uuids)
for i, uuid in enumerate(bundle_uuids):
print(f"Migrating bundle {i} out of {total}")
self.logger.info(f"migrating {uuid}")
self.migrate_bundle(uuid)
self.logger.info("status: %d / %d", i, total)
Expand Down Expand Up @@ -674,6 +666,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
bundle_uuids_df.to_csv(migration.get_bundle_ids_path(), index=False, mode='w')
print(f"[migration] Recorded all bundle ids to be migrated")

# data = pd.read_csv(bundle_uuids)
# bundle_uuids = data["0"].tolist()[100001:]

# Sort according to what process you are.
chunk_size = len(bundle_uuids) // num_processes
start_idx = chunk_size * proc_id
Expand Down Expand Up @@ -701,6 +696,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
parser.add_argument(
'-u', '--bundle-uuids', type=str, nargs='*', default=None, help='List of bundle UUIDs to migrate.'
)
# parser.add_argument(
# '-u', '--bundle-uuids', type=str, help='List of bundle UUIDs to migrate.'
# )
parser.add_argument(
'-t',
'--target_store_name',
Expand Down

0 comments on commit cc15a6c

Please sign in to comment.