Skip to content

Commit

Permalink
All debug changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed Feb 29, 2024
1 parent 51bbd25 commit bc7e2ef
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 64 deletions.
4 changes: 4 additions & 0 deletions codalab/lib/bundle_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,8 @@ def do_download_command(self, args):
default_client, default_worksheet_uuid, args.target_spec
)

print(default_worksheet_uuid)

# Figure out where to download.
info = client.fetch('bundles', target.bundle_uuid)
if args.output_path:
Expand All @@ -1516,6 +1518,8 @@ def do_download_command(self, args):
if target.subpath == ''
else os.path.basename(target.subpath)
)

print(info)
final_path = os.path.join(os.getcwd(), local_path)
if os.path.exists(final_path):
if args.force:
Expand Down
176 changes: 112 additions & 64 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from codalab.common import (
StorageType,
StorageURLScheme,
parse_linked_bundle_url
)
from codalab.lib.print_util import FileTransferProgress
from codalab.lib import (
Expand Down Expand Up @@ -163,7 +164,7 @@ def __init__(self, target_store_name, change_db, delete, proc_id, sanity_check_n
self.bundle_migration_statuses = list()

if os.path.exists(self.get_bundle_statuses_path()):
self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path())
self.existing_bundle_migration_statuses = pd.read_csv(self.get_bundle_statuses_path(), dtype={'uuid': object, 'status': object, 'error_message': object})
else:
self.existing_bundle_migration_statuses = None

Expand Down Expand Up @@ -340,6 +341,9 @@ def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_lo

success, reason = None, None

# print(bundle_location, new_location)
# return True, None

if is_dir:
# For dirs, check the folder contains same files
with OpenFile(new_location, gzipped=True) as f:
Expand Down Expand Up @@ -477,15 +481,33 @@ def migrate_bundle(self, bundle_uuid):
self.logger.info("Getting Bundle info")
bundle = self.get_bundle(bundle_uuid)
bundle_location = self.get_bundle_location(bundle_uuid)
print(bundle)
print(self.bundle_manager._model.get_bundle_storage_info(bundle_uuid))
print(bundle_location)
# print("contents.gz" in bundle_location)

# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
bundle_info = None
new_bundle_location = bundle_location[:-2] + "tar.gz"
# print(new_bundle_location)
# print(self.get_bundle_info(bundle_uuid, new_bundle_location))
# linked_info = parse_linked_bundle_url(bundle_location)

try:
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
except Exception as e:
# print(e)
if "Path ''" in str(e):
for i in range(0, 10):
# if bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# try:
# if "contents.gz" in bundle_location:
# bundle_info = self.get_bundle_info(bundle_uuid, new_bundle_location)
# print(bundle_info)
# except:
# pass

for i in range(0, 11):
try:
bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}')
bundle_location = f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}'
Expand All @@ -498,70 +520,95 @@ def migrate_bundle(self, bundle_uuid):
is_bundle_rm = True
is_dir = False
bundle_migration_status.status = MigrationStatus.NOT_STARTED
bundle_migration_status.error_message = None
bundle_migration_status.error_message = ''
else:
raise e

pass

# if not bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# self.bundle_manager._model.update_bundle(
# bundle,
# {
# 'is_dir': bundle_info['type'] == 'directory',
# }
# )

print(bundle_info, bundle_location, is_bundle_rm)
# print(bundle_migration_status)

# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Upload to Azure.
# print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
self.logger.info("Changing DB")
start_time = time.time()
self.modify_bundle_data(bundle, bundle_uuid, is_dir)
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB

# Delete the bundle from disk.
if self.delete and not is_bundle_rm:
self.logger.info("Deleting from disk")
start_time = time.time()
if os.path.lexists(disk_location):
# Delete it.
path_util.remove(disk_bundle_location)
self.times["delete_original_bundle"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.FINISHED
# if not is_bundle_rm:
# is_dir = bundle_info['type'] == 'directory'
# target_location = self.blob_target_location(bundle_uuid, is_dir)
# disk_location = self.get_bundle_disk_location(bundle_uuid)

# # print(1)

# # Don't migrate currently running bundles
# if bundle.state not in State.FINAL_STATES:
# bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
# return

# # Don't migrate linked bundles
# if self.is_linked_bundle(bundle_uuid):
# bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
# return

# # print(bundle_migration_status)
# # print(2)

# # if db already changed
# # TODO: Check if bundle_location is azure (see other places in code base.)
# if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# bundle_migration_status.error_message = ''
# return
# # elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
# # bundle_migration_status.status = MigrationStatus.CHANGED_DB
# # bundle_migration_status.error_message = ''
# # elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
# # bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# # )[0]):
# # bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# # bundle_migration_status.error_message = ''

# # Upload to Azure.
# # print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
# if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
# print("UPLOADING")
# self.logger.info("Uploading to Azure")
# start_time = time.time()
# self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
# self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
# success, reason = self.sanity_check(
# bundle_uuid, bundle_location, bundle_info, is_dir, target_location
# )
# if not success:
# raise ValueError(f"SanityCheck failed with {reason}")
# bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
# bundle_migration_status.error_message = None

# # Change bundle metadata in database to point to the Azure Blob location (not disk)
# if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
# print("Changing DB")
# self.logger.info("Changing DB")
# start_time = time.time()
# self.modify_bundle_data(bundle, bundle_uuid, is_dir)
# self.times["modify_bundle_data"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.CHANGED_DB
# bundle_migration_status.error_message = None

# # Delete the bundle from disk.
# if self.delete and not is_bundle_rm:
# self.logger.info("Deleting from disk")
# start_time = time.time()
# delete_status = self.delete_original_bundle(bundle_uuid)
# if not delete_status:
# self.logger.info(f"Bundle location not on disk or bundle already deleted for bundle {bundle_uuid}")
# # if os.path.lexists(disk_location):
# # # Delete it.
# # path_util.remove(disk_bundle_location)
# self.times["delete_original_bundle"].append(time.time() - start_time)
# bundle_migration_status.status = MigrationStatus.FINISHED
# bundle_migration_status.error_message = None

self.times["migrate_bundle"].append(time.time() - total_start_time)

Expand Down Expand Up @@ -589,6 +636,7 @@ def log_times(self):
self.logger.info(json.dumps(output_dict, sort_keys=True, indent=4))

def write_bundle_statuses(self):
return
new_records = pd.DataFrame.from_records([b_m_s.to_dict() for b_m_s in self.bundle_migration_statuses])
if self.existing_bundle_migration_statuses is None:
self.existing_bundle_migration_statuses = new_records
Expand Down

0 comments on commit bc7e2ef

Please sign in to comment.