From d33626f22cf957cae297942e8e98bae6d39840d2 Mon Sep 17 00:00:00 2001 From: AndrewJGaut <35617203+AndrewJGaut@users.noreply.github.com> Date: Mon, 20 Feb 2023 10:40:11 -0800 Subject: [PATCH 1/6] Fix/4392 remove data hash (#4399) * removing data hash * fix tests for data hash * fix tests * update directory hashing code * minor formatting changes * fix tests * minor changes * Add in database migration --- ...023021501_remove_data_hash_db3ca94867b3.py | 28 +++++++ codalab/bundles/derived_bundle.py | 3 +- codalab/bundles/make_bundle.py | 8 +- codalab/bundles/private_bundle.py | 1 - codalab/bundles/run_bundle.py | 8 +- codalab/bundles/uploaded_bundle.py | 1 - codalab/lib/bundle_cli.py | 17 +---- codalab/lib/bundle_store.py | 46 +---------- codalab/lib/bundle_util.py | 1 - codalab/lib/upload_manager.py | 2 +- codalab/model/bundle_model.py | 20 +---- codalab/model/tables.py | 3 - codalab/objects/bundle.py | 2 - codalab/rest/bundles.py | 7 +- codalab/rest/schemas.py | 3 - codalab/worker/bundle_state.py | 3 - docs/CLI-Reference.md | 4 +- docs/REST-API-Reference.md | 1 - docs/features/bundles/uploading.md | 2 - tests/cli/test_cli.py | 76 +++++++++++++++---- tests/unit/model/bundle_model_test.py | 1 - tests/unit/objects/bundle_test.py | 2 - tests/unit/rest/bundles_test.py | 1 - tests/unit/rest/util_test.py | 1 - tests/unit/worker/bundle_state_test.py | 2 - 25 files changed, 105 insertions(+), 138 deletions(-) create mode 100644 alembic/versions/2023021501_remove_data_hash_db3ca94867b3.py diff --git a/alembic/versions/2023021501_remove_data_hash_db3ca94867b3.py b/alembic/versions/2023021501_remove_data_hash_db3ca94867b3.py new file mode 100644 index 000000000..7312ce174 --- /dev/null +++ b/alembic/versions/2023021501_remove_data_hash_db3ca94867b3.py @@ -0,0 +1,28 @@ +"""remove data hash + +Revision ID: db3ca94867b3 +Revises: 22b6c41cd29c +Create Date: 2023-02-15 01:43:30.027124 + +""" + +# revision identifiers, used by Alembic. +revision = 'db3ca94867b3' +down_revision = '22b6c41cd29c' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('bundle_data_hash_index', table_name='bundle') + op.drop_column('bundle', 'data_hash') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('bundle', sa.Column('data_hash', mysql.VARCHAR(length=63), nullable=True)) + op.create_index('bundle_data_hash_index', 'bundle', ['data_hash'], unique=False) + # ### end Alembic commands ### diff --git a/codalab/bundles/derived_bundle.py b/codalab/bundles/derived_bundle.py index 7c482ada9..f5a2beb7c 100644 --- a/codalab/bundles/derived_bundle.py +++ b/codalab/bundles/derived_bundle.py @@ -23,7 +23,7 @@ class DerivedBundle(NamedBundle): ) @classmethod - def construct(cls, targets, command, metadata, owner_id, uuid, data_hash, state): + def construct(cls, targets, command, metadata, owner_id, uuid, state): if not uuid: uuid = spec_util.generate_uuid() # Check that targets does not include both keyed and anonymous targets. @@ -46,7 +46,6 @@ def construct(cls, targets, command, metadata, owner_id, uuid, data_hash, state) 'uuid': uuid, 'bundle_type': cls.BUNDLE_TYPE, 'command': command, - 'data_hash': data_hash, 'state': state, 'metadata': metadata, 'dependencies': dependencies, diff --git a/codalab/bundles/make_bundle.py b/codalab/bundles/make_bundle.py index 792bca3d2..b0be721fb 100644 --- a/codalab/bundles/make_bundle.py +++ b/codalab/bundles/make_bundle.py @@ -22,9 +22,5 @@ class MakeBundle(DerivedBundle): ) @classmethod - def construct( - cls, targets, command, metadata, owner_id, uuid=None, data_hash=None, state=State.CREATED - ): - return super(MakeBundle, cls).construct( - targets, command, metadata, owner_id, uuid, data_hash, state - ) + def construct(cls, targets, command, metadata, owner_id, uuid=None, state=State.CREATED): + return super(MakeBundle, cls).construct(targets, command, metadata, owner_id, uuid, state) diff --git a/codalab/bundles/private_bundle.py b/codalab/bundles/private_bundle.py index 37debf70e..1f14b8356 100644 --- a/codalab/bundles/private_bundle.py +++ b/codalab/bundles/private_bundle.py @@ -22,7 +22,6 @@ def construct(cls, uuid): 'bundle_type': cls.BUNDLE_TYPE, 'owner_id': None, 'command': None, - 'data_hash': None, 'state': None, 'frozen': None, 'is_anonymous': None, diff --git a/codalab/bundles/run_bundle.py b/codalab/bundles/run_bundle.py index b6b7ae87a..a4cbf79c2 100644 --- a/codalab/bundles/run_bundle.py +++ b/codalab/bundles/run_bundle.py @@ -317,14 +317,10 @@ class RunBundle(DerivedBundle): ) @classmethod - def construct( - cls, targets, command, metadata, owner_id, uuid=None, data_hash=None, state=State.CREATED - ): + def construct(cls, targets, command, metadata, owner_id, uuid=None, state=State.CREATED): if not isinstance(command, str): raise UsageError('%r is not a valid command!' % (command,)) - return super(RunBundle, cls).construct( - targets, command, metadata, owner_id, uuid, data_hash, state - ) + return super(RunBundle, cls).construct(targets, command, metadata, owner_id, uuid, state) def validate(self): super(RunBundle, self).validate() diff --git a/codalab/bundles/uploaded_bundle.py b/codalab/bundles/uploaded_bundle.py index b16a76c8c..fea8d5b8b 100644 --- a/codalab/bundles/uploaded_bundle.py +++ b/codalab/bundles/uploaded_bundle.py @@ -51,7 +51,6 @@ def construct(cls, metadata, owner_id, uuid=None): row = { 'bundle_type': cls.BUNDLE_TYPE, 'command': None, - 'data_hash': None, 'state': State.READY, 'metadata': metadata, 'dependencies': [], diff --git a/codalab/lib/bundle_cli.py b/codalab/lib/bundle_cli.py index 92eeb165e..ab445b428 100644 --- a/codalab/lib/bundle_cli.py +++ b/codalab/lib/bundle_cli.py @@ -2405,7 +2405,6 @@ def print_basic_info(self, client, info, raw): for key in ( 'bundle_type', 'uuid', - 'data_hash', 'state', 'command', 'frozen', @@ -4357,27 +4356,13 @@ def do_ls_partitions_command(self, args): help='Perform all garbage collection and database updates instead of just printing what would happen', action='store_true', ), - Commands.Argument( - '-d', - '--data-hash', - help='Compute the digest for every bundle and compare against data_hash for consistency', - action='store_true', - ), - Commands.Argument( - '-r', - '--repair', - help='When used with --force and --data-hash, repairs incorrect data_hash in existing bundles', - action='store_true', - ), ), ) def do_bs_health_check(self, args): self._fail_if_headless(args) self._fail_if_not_local(args) print('Performing Health Check...', file=sys.stderr) - self.manager.bundle_store().health_check( - self.manager.model(), args.force, args.data_hash, args.repair - ) + self.manager.bundle_store().health_check(self.manager.model(), args.force) def _fail_if_headless(self, args): if self.headless: diff --git a/codalab/lib/bundle_store.py b/codalab/lib/bundle_store.py index c3342cf78..a0f520aaa 100644 --- a/codalab/lib/bundle_store.py +++ b/codalab/lib/bundle_store.py @@ -235,7 +235,7 @@ def cleanup(self, bundle_location, dry_run): return False return path_util.remove(bundle_location) - def health_check(self, model, force=False, compute_data_hash=False, repair_hashes=False): + def health_check(self, model, force=False): """ MultiDiskBundleStore.health_check(): In the MultiDiskBundleStore, bundle contents are stored on disk, and occasionally the disk gets out of sync with the database, in which case we make repairs in the following ways: @@ -247,8 +247,7 @@ def health_check(self, model, force=False, compute_data_hash=False, repair_hashe 5. For bundle marked READY or FAILED, .cid or .status, or the (-internal).sh files should not exist. |force|: Perform any destructive operations on the bundle store the health check determines are necessary. False by default - |compute_data_hash|: If True, compute the data_hash for every single bundle ourselves and see if it's consistent with what's in - the database. False by default. + """ UUID_REGEX = re.compile(r'^(%s)' % spec_util.UUID_STR) @@ -345,52 +344,11 @@ def _check_other_paths(other_paths, db_bundle_by_uuid): trash_count += 1 _delete_path(to_delete) - # Check for each bundle if we need to compute its data_hash - data_hash_recomputed = 0 - - print('Checking data_hash of bundles in partition %s...' % partition, file=sys.stderr) - for bundle_path in bundle_paths: - uuid = _get_uuid(bundle_path) - bundle = db_bundle_by_uuid.get(uuid, None) - if bundle is None: - continue - if compute_data_hash or bundle.data_hash is None: - dirs_and_files = ( - path_util.recursive_ls(bundle_path) - if os.path.isdir(bundle_path) - else ([], [bundle_path]) - ) - data_hash = '0x%s' % path_util.hash_directory(bundle_path, dirs_and_files) - if bundle.data_hash is None: - data_hash_recomputed += 1 - print( - 'Giving bundle %s data_hash %s' % (bundle_path, data_hash), - file=sys.stderr, - ) - if force: - db_update = dict(data_hash=data_hash) - model.update_bundle(bundle, db_update) - elif compute_data_hash and data_hash != bundle.data_hash: - data_hash_recomputed += 1 - print( - 'Bundle %s should have data_hash %s, actual digest is %s' - % (bundle_path, bundle.data_hash, data_hash), - file=sys.stderr, - ) - if repair_hashes and force: - db_update = dict(data_hash=data_hash) - model.update_bundle(bundle, db_update) - if force: print('\tDeleted %d objects from the bundle store' % trash_count, file=sys.stderr) - print('\tRecomputed data_hash for %d bundles' % data_hash_recomputed, file=sys.stderr) else: print('Dry-Run Statistics, re-run with --force to perform updates:', file=sys.stderr) print('\tObjects marked for deletion: %d' % trash_count, file=sys.stderr) - print( - '\tBundles that need data_hash recompute: %d' % data_hash_recomputed, - file=sys.stderr, - ) BundleLocation = TypedDict( diff --git a/codalab/lib/bundle_util.py b/codalab/lib/bundle_util.py index 6a1f3d19b..5f7efa124 100644 --- a/codalab/lib/bundle_util.py +++ b/codalab/lib/bundle_util.py @@ -26,7 +26,6 @@ def bundle_to_bundle_info(model, bundle): bundle.bundle_type, bundle.owner_id, bundle.command, - bundle.data_hash, bundle.state, bundle.frozen, bundle.is_anonymous, diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index 28d9490d9..f2f740312 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -368,7 +368,7 @@ def has_contents(self, bundle): def cleanup_existing_contents(self, bundle): data_size = self._bundle_model.get_bundle_metadata(bundle.uuid, 'data_size')[bundle.uuid] removed = self._bundle_store.cleanup(bundle.uuid, dry_run=False) - bundle_update = {'data_hash': None, 'metadata': {'data_size': 0}} + bundle_update = {'metadata': {'data_size': 0}} self._bundle_model.update_bundle(bundle, bundle_update) if removed: self._bundle_model.increment_user_disk_used(bundle.owner_id, -data_size) diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index 0444e96a2..e907ec0b8 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -497,7 +497,7 @@ def add_join(table: Table, condition: Any, left_outer_join: bool = False): elif key == '.format': format_func = value # Bundle fields - elif key in ('bundle_type', 'id', 'uuid', 'data_hash', 'state', 'command', 'owner_id'): + elif key in ('bundle_type', 'id', 'uuid', 'state', 'command', 'owner_id'): conjunct = make_condition(key, getattr(cl_bundle.c, key), value) elif key == '.shared': # shared with any group I am in with read permission add_join( @@ -1141,7 +1141,6 @@ def update_disk_metadata(self, bundle, bundle_location, enforce_disk_quota=False dirs_and_files = [], [bundle_location] # TODO(Ashwin): make this non-fs specific - data_hash = '0x%s' % (path_util.hash_directory(bundle_location, dirs_and_files)) data_size = path_util.get_size(bundle_location, dirs_and_files) try: if 'data_size' in bundle.metadata.__dict__: @@ -1161,7 +1160,7 @@ def update_disk_metadata(self, bundle, bundle_location, enforce_disk_quota=False % (data_size, disk_left) ) - bundle_update = {'data_hash': data_hash, 'metadata': {'data_size': data_size}} + bundle_update = {'metadata': {'data_size': data_size}} self.update_bundle(bundle, bundle_update) self.increment_user_disk_used(bundle.owner_id, disk_increment) @@ -1358,12 +1357,6 @@ def delete_bundles(self, uuids): connection.execute(cl_worker_run.delete().where(cl_worker_run.c.run_uuid.in_(uuids))) connection.execute(cl_bundle.delete().where(cl_bundle.c.uuid.in_(uuids))) - def remove_data_hash_references(self, uuids): - with self.engine.begin() as connection: - connection.execute( - cl_bundle.update().where(cl_bundle.c.uuid.in_(uuids)).values({'data_hash': None}) - ) - # ========================================================================== # Worksheet-related model methods follow! # ========================================================================== @@ -2769,15 +2762,6 @@ def update_user_last_login(self, user_id): """ self.update_user_info({'user_id': user_id, 'last_login': datetime.datetime.utcnow()}) - def _get_disk_used(self, user_id): - # TODO(Ashwin): don't include linked bundles - return ( - self.search_bundles(user_id, ['size=.sum', 'owner_id=' + user_id, 'data_hash=%'])[ - 'result' - ] - or 0 - ) - def get_user_disk_quota_left(self, user_id, user_info=None): if not user_info: user_info = self.get_user_info(user_id) diff --git a/codalab/model/tables.py b/codalab/model/tables.py index fa328fa76..51dd3ff73 100644 --- a/codalab/model/tables.py +++ b/codalab/model/tables.py @@ -41,8 +41,6 @@ Column('bundle_type', String(63), nullable=False), # The command will be NULL except for run bundles. Column('command', Text, nullable=True), - # The data_hash will be NULL if the bundle's value is still being computed. - Column('data_hash', String(63), nullable=True), Column('state', String(63), nullable=False), Column('owner_id', String(255), nullable=True), Column('frozen', DateTime, nullable=True), # When the bundle was frozen, if it is. @@ -57,7 +55,6 @@ 'is_dir', Boolean, nullable=True, ), # Whether the bundle is a directory or just a single file. If set to null, nothing has been uploaded for the bundle yet. UniqueConstraint('uuid', name='uix_1'), - Index('bundle_data_hash_index', 'data_hash'), Index('state_index', 'state'), # Needed for the bundle manager. mysql_charset=TABLE_DEFAULT_CHARSET, ) diff --git a/codalab/objects/bundle.py b/codalab/objects/bundle.py index 528084037..ee1a6ad88 100644 --- a/codalab/objects/bundle.py +++ b/codalab/objects/bundle.py @@ -23,7 +23,6 @@ class Bundle(ORMObject): 'uuid', 'bundle_type', 'command', - 'data_hash', 'state', 'owner_id', 'frozen', @@ -41,7 +40,6 @@ class Bundle(ORMObject): uuid: str bundle_type: str command: str - data_hash: str state: str owner_id: str frozen: bool diff --git a/codalab/rest/bundles.py b/codalab/rest/bundles.py index 9ef10ce4d..377abc4d0 100644 --- a/codalab/rest/bundles.py +++ b/codalab/rest/bundles.py @@ -1319,11 +1319,8 @@ def delete_bundles(uuids, force, recursive, data_only, dry_run): # Delete the actual bundle if not dry_run: - if data_only: - # Just remove references to the data hashes - local.model.remove_data_hash_references(relevant_uuids) - else: - # Actually delete the bundle + if not data_only: + # Delete bundle metadata. local.model.delete_bundles(relevant_uuids) # Delete the data. diff --git a/codalab/rest/schemas.py b/codalab/rest/schemas.py index bb6eed2f2..77bc30188 100644 --- a/codalab/rest/schemas.py +++ b/codalab/rest/schemas.py @@ -201,7 +201,6 @@ class BundleSchema(Schema): validate=validate.OneOf({bsc.BUNDLE_TYPE for bsc in BUNDLE_SUBCLASSES}) ) command = fields.String(allow_none=True, validate=validate_ascii) - data_hash = fields.String() state = fields.String() state_details = fields.String() owner = fields.Relationship(include_resource_linkage=True, type_='users', attribute='owner_id') @@ -270,7 +269,6 @@ class Meta: # restrictions differ depending on the action BUNDLE_CREATE_RESTRICTED_FIELDS = ( - 'data_hash', 'state', 'owner', 'children', @@ -284,7 +282,6 @@ class Meta: BUNDLE_UPDATE_RESTRICTED_FIELDS = ( 'command', - 'data_hash', 'state', 'dependencies', 'children', diff --git a/codalab/worker/bundle_state.py b/codalab/worker/bundle_state.py index 53792dc1e..0e89bb604 100644 --- a/codalab/worker/bundle_state.py +++ b/codalab/worker/bundle_state.py @@ -65,7 +65,6 @@ def __init__( bundle_type, # type: str owner_id, # type: str command, # type: str - data_hash, # type: str state, # type: State frozen, # type: Optional[str] is_anonymous, # type: bool @@ -78,7 +77,6 @@ def __init__( self.bundle_type = bundle_type self.owner_id = owner_id self.command = command - self.data_hash = data_hash self.state = state self.frozen = frozen self.is_anonymous = is_anonymous @@ -114,7 +112,6 @@ def from_dict(cls, dct): bundle_type=dct["bundle_type"], owner_id=dct["owner_id"], command=dct["command"], - data_hash=dct["data_hash"], state=dct["state"], frozen=dct.get("frozen"), is_anonymous=dct["is_anonymous"], diff --git a/docs/CLI-Reference.md b/docs/CLI-Reference.md index b357f796f..c272d798d 100644 --- a/docs/CLI-Reference.md +++ b/docs/CLI-Reference.md @@ -512,9 +512,7 @@ Usage: `cl ` ### bs-health-check Perform a health check on the bundle store, garbage collecting bad files in the store. Performs a dry run by default, use -f to force removal. Arguments: - -f, --force Perform all garbage collection and database updates instead of just printing what would happen - -d, --data-hash Compute the digest for every bundle and compare against data_hash for consistency - -r, --repair When used with --force and --data-hash, repairs incorrect data_hash in existing bundles + -f, --force Perform all garbage collection and database updates instead of just printing what would happen ## Other commands diff --git a/docs/REST-API-Reference.md b/docs/REST-API-Reference.md index f58e21315..aba2242d7 100644 --- a/docs/REST-API-Reference.md +++ b/docs/REST-API-Reference.md @@ -240,7 +240,6 @@ Name | Type `uuid` | String `bundle_type` | String `command` | String -`data_hash` | String `state` | String `state_details` | String `owner` | Relationship([users](#users)) diff --git a/docs/features/bundles/uploading.md b/docs/features/bundles/uploading.md index 730915b18..d7337dd54 100644 --- a/docs/features/bundles/uploading.md +++ b/docs/features/bundles/uploading.md @@ -27,7 +27,6 @@ Now, one can also view the information of the uploaded bundle using the `cl info % cl info 0xdc74e4ef29b64ab19e1e26e94ea22811 bundle_type : dataset uuid : 0xdc74e4ef29b64ab19e1e26e94ea22811 -data_hash : 0x28bf5b81a526b5c049d9a3e5e8cdce5541182506 state : ready command : frozen : @@ -93,7 +92,6 @@ You can tell if a bundle is a linked bundle by checking to see if the `link_url` % cl info 0x3013b2c4519c4c99a622b07af078a5ff bundle_type : dataset uuid : 0x3013b2c4519c4c99a622b07af078a5ff -data_hash : state : ready command : frozen : diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 26c8b4b0f..2bbb1cbaa 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -30,6 +30,7 @@ from scripts.test_util import Colorizer, run_command import argparse +import hashlib import json import multiprocessing import os @@ -268,6 +269,58 @@ def wait_until_substring(fp, substr): return +""" +Hash helpers for mimic and copy +""" + + +def recursive_ls(path): + """ + Return a (list of directories, list of files) in the given directory and + all of its nested subdirectories. All paths returned are absolute. + Symlinks are returned in the list of files, even if they point to directories. + This makes it possible to distinguish between real and symlinked directories + when computing the hash of a directory. This function will NOT descend into + symlinked directories. + """ + (directories, files) = ([], []) + for (root, _, file_names) in os.walk(path): + directories.append(root) + for file_name in file_names: + files.append(os.path.join(root, file_name)) + # os.walk ignores symlinks to directories, but we should count them as files. + # However, we can't used the followlinks parameter, because a) we don't want + # to descend into directories and b) we could end up in an infinite loop if + # we were to pass that flag. Instead, we handle symlinks here: + for subpath in os.listdir(root): + full_subpath = os.path.join(root, subpath) + if os.path.islink(full_subpath) and os.path.isdir(full_subpath): + files.append(full_subpath) + return (directories, files) + + +def data_hash(uuid, worksheet=None): + """ + Temporarily download bundle contents. + Return a hash of those contents. + """ + path = temp_path(uuid) + if not os.path.exists(path): + # Download the bundle to that path. + command = [cl, 'download', uuid, '-o', path] + if worksheet is not None: + command += ['-w', worksheet] + _run_command(command) + sha1 = hashlib.sha1() + files = recursive_ls(path)[1] + for f in files: + try: + sha1.update(open(f, 'r').read().encode()) + except Exception as e: + raise Exception("file name: {}. exception: {}".format(f, e)) + return sha1.hexdigest() + + def _run_command( args, expected_exit_code=0, @@ -673,9 +726,7 @@ def test_basic(ctx): # rm _run_command([cl, 'rm', '--dry-run', uuid]) - check_contains('0x', get_info(uuid, 'data_hash')) _run_command([cl, 'rm', '--data-only', uuid]) - check_equals('None', get_info(uuid, 'data_hash')) _run_command([cl, 'rm', uuid]) @@ -1510,7 +1561,6 @@ def test_worksheet(ctx): _run_command([cl, 'add', 'text', '// comment']) _run_command([cl, 'add', 'text', '% schema foo']) _run_command([cl, 'add', 'text', '% add uuid']) - _run_command([cl, 'add', 'text', '% add data_hash data_hash s/0x/HEAD']) _run_command([cl, 'add', 'text', '% add CREATE created "date | [0:5]"']) _run_command([cl, 'add', 'text', '% display table foo']) _run_command([cl, 'add', 'bundle', uuid]) @@ -1889,7 +1939,6 @@ def test_run(ctx): name = random_name() uuid = _run_command([cl, 'run', 'echo hello', '-n', name]) wait(uuid) - check_contains('0x', get_info(uuid, 'data_hash')) check_not_equals('0s', get_info(uuid, 'time_preparing')) check_not_equals('0s', get_info(uuid, 'time_running')) check_not_equals('0s', get_info(uuid, 'time_cleaning_up')) @@ -2220,18 +2269,19 @@ def test_write(ctx): check_equals(str(['write\tmessage\thello world']), get_info(uuid, 'actions')) +""" +This we'll have ot think about how to migrate... +""" + + @TestModule.register('mimic') def test_mimic(ctx): - def data_hash(uuid): - _run_command([cl, 'wait', uuid]) - return get_info(uuid, 'data_hash') - simple_name = random_name() - input_uuid = _run_command([cl, 'upload', test_path('a.txt'), '-n', simple_name + '-in1']) + input_uuid = _run_command([cl, 'upload', test_path('dir2'), '-n', simple_name + '-in1']) simple_out_uuid = _run_command([cl, 'make', input_uuid, '-n', simple_name + '-out']) - new_input_uuid = _run_command([cl, 'upload', test_path('a.txt')]) + new_input_uuid = _run_command([cl, 'upload', test_path('dir2')]) # Try three ways of mimicing, should all produce the same answer input_mimic_uuid = _run_command([cl, 'mimic', input_uuid, new_input_uuid, '-n', 'new']) @@ -2444,9 +2494,9 @@ def compare_output_across_instances(command): # Upload to original worksheet, transfer to remote _run_command([cl, 'work', source_worksheet]) - uuid = _run_command([cl, 'upload', test_path('')]) + uuid = _run_command([cl, 'upload', test_path('a.txt')]) _run_command([cl, 'add', 'bundle', uuid, '--dest-worksheet', remote_worksheet]) - compare_output_across_instances([cl, 'info', '-f', 'data_hash,name', uuid]) + compare_output_across_instances([cl, 'info', '-f', 'name', uuid]) # TODO: `cl cat` is not working even with the bundle available # compare_output_across_instances([cl, 'cat', uuid]) @@ -2454,7 +2504,7 @@ def compare_output_across_instances(command): _run_command([cl, 'work', remote_worksheet]) uuid = _run_command([cl, 'upload', test_path('')]) _run_command([cl, 'add', 'bundle', uuid, '--dest-worksheet', source_worksheet]) - compare_output_across_instances([cl, 'info', '-f', 'data_hash,name', uuid]) + compare_output_across_instances([cl, 'info', '-f', 'name', uuid]) # compare_output_across_instances([cl, 'cat', uuid]) # Upload to remote, transfer to local (metadata only) diff --git a/tests/unit/model/bundle_model_test.py b/tests/unit/model/bundle_model_test.py index 01362165a..ffcce4221 100644 --- a/tests/unit/model/bundle_model_test.py +++ b/tests/unit/model/bundle_model_test.py @@ -66,7 +66,6 @@ class MockBundle(object): _fields = { 'uuid': 'my_uuid', 'bundle_type': 'my_bundle_type', - 'data_hash': 'my_data_hash', 'state': 'my_state', 'metadata': {'key_1': 'value_1', 'key_2': 'value_2'}, 'dependencies': [MockDependency().to_dict()], diff --git a/tests/unit/objects/bundle_test.py b/tests/unit/objects/bundle_test.py index 83a22a080..438e47fcd 100644 --- a/tests/unit/objects/bundle_test.py +++ b/tests/unit/objects/bundle_test.py @@ -30,7 +30,6 @@ class BundleTest(unittest.TestCase): bundle_type = MockBundle.BUNDLE_TYPE command = 'my_command' - data_hash = 'my_data_hash' state = 'my_state' # worker_command = None @@ -42,7 +41,6 @@ def construct_mock_bundle(self): } return MockBundle.construct( command=self.command, - data_hash=self.data_hash, state=self.state, metadata=metadata, dependencies=[], diff --git a/tests/unit/rest/bundles_test.py b/tests/unit/rest/bundles_test.py index 7c32b92fd..462f857a0 100644 --- a/tests/unit/rest/bundles_test.py +++ b/tests/unit/rest/bundles_test.py @@ -53,7 +53,6 @@ def test_create(self): "type": "bundles", "attributes": { "permission": 2, - "data_hash": None, "uuid": bundle_id, "args": "run \"echo TEST\" --request-cpus 1 --request-docker-image codalab/default-cpu:latest --request-memory 4g", "metadata": { diff --git a/tests/unit/rest/util_test.py b/tests/unit/rest/util_test.py index 01d599a29..830b1cbeb 100644 --- a/tests/unit/rest/util_test.py +++ b/tests/unit/rest/util_test.py @@ -31,7 +31,6 @@ def _mock_bundle(self, uuid): bundle_type='run', owner_id=None, command=None, - data_hash=None, state='created', frozen=None, is_anonymous=False, diff --git a/tests/unit/worker/bundle_state_test.py b/tests/unit/worker/bundle_state_test.py index a8b712f8f..6cd4d68c0 100644 --- a/tests/unit/worker/bundle_state_test.py +++ b/tests/unit/worker/bundle_state_test.py @@ -10,7 +10,6 @@ def setUp(self) -> None: 'bundle_type': 'bundle_type', 'owner_id': 'owner_id', 'command': 'command', - 'data_hash': 'data_hash', 'state': State.RUNNING, 'frozen': None, 'is_anonymous': False, @@ -44,7 +43,6 @@ def test_bundle_info_serialization(self): bundle_type=self.bundle_info_fields['bundle_type'], owner_id=self.bundle_info_fields['owner_id'], command=self.bundle_info_fields['command'], - data_hash=self.bundle_info_fields['data_hash'], state=self.bundle_info_fields['state'], frozen=self.bundle_info_fields['frozen'], is_anonymous=self.bundle_info_fields['is_anonymous'], From dd9c4425897439a4c3f81bd00f421802b3d0d0c1 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Wed, 1 Mar 2023 11:03:52 -0500 Subject: [PATCH 2/6] Fix division by zero error when time quota is set to 0 (#4375) Co-authored-by: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- codalab/lib/formatting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codalab/lib/formatting.py b/codalab/lib/formatting.py index e1caabfdc..44f9f7f8c 100644 --- a/codalab/lib/formatting.py +++ b/codalab/lib/formatting.py @@ -102,7 +102,7 @@ def ratio_str(to_str, a, b): """ Example: to_str = duration_str, a = 60, b = 120 => "1m / 2m (50%)" """ - return '%s / %s (%.1f%%)' % (to_str(a), to_str(b), 100.0 * a / b) + return '%s / %s (%.1f%%)' % (to_str(a), to_str(b), 100.0 * a / b if b != 0 else 100.0) def parse_size(s): From 7f2442cb3e29ab5eb65c5de34b33708edab4863f Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Wed, 1 Mar 2023 11:53:22 -0500 Subject: [PATCH 3/6] Bump ratarmountcore to 0.3.2 (#4063) * Bump ratarmountcore to 0.3.1 Speeds up things, can potentially fix https://github.com/codalab/codalab-worksheets/issues/3771 * Update requirements.txt --------- Co-authored-by: Jiani Wang <40016222+wwwjn@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4782f8cc8..30ffbdaeb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,7 @@ marshmallow==2.15.1 setuptools>=40.0.0 argcomplete==1.12.3 indexed_gzip==1.6.3 -ratarmountcore==0.1.3 +ratarmountcore==0.3.2 PyYAML==5.4 psutil==5.7.2 six==1.15.0 From 5955017a477ccb18009fd82440ee5b3b0564a3b1 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Wed, 1 Mar 2023 16:40:36 -0500 Subject: [PATCH 4/6] Clean up worker comments (#4390) --- codalab/worker/worker.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/codalab/worker/worker.py b/codalab/worker/worker.py index 58138f77d..792a2beb5 100644 --- a/codalab/worker/worker.py +++ b/codalab/worker/worker.py @@ -37,8 +37,8 @@ Codalab Worker Workers handle communications with the Codalab server. Their main role in Codalab execution is syncing the job states with the server and passing on job-related commands from the server -to architecture-specific RunManagers that run the jobs. Workers are execution platform antagonistic -but they expect the platform specific RunManagers they use to implement a common interface +to architecture-specific BundleRuntimes that run the jobs. Workers are execution platform agnostic +but they expect the platform specific BundleRuntimes they use to implement a common interface """ NOOP = 'noop' @@ -318,7 +318,7 @@ async def receive_msg(): while not self.terminate: try: await receive_msg() - except asyncio.futures.TimeoutError: + except asyncio.TimeoutError: pass except websockets.exceptions.ConnectionClosed: logger.warning("Websocket connection closed, starting a new one...") @@ -625,7 +625,7 @@ def propose_set(resource_set, request_count): @property def all_runs(self): """ - Returns a list of all the runs managed by this RunManager + Returns a list of all the runs managed by this worker """ return [ BundleCheckinState( @@ -650,7 +650,7 @@ def all_runs(self): @property def free_disk_bytes(self): """ - Available disk space by bytes of this RunManager. + Available disk space by bytes of this worker. """ error_msg = "Failed to run command {}".format("df -k" + self.work_dir) try: @@ -681,7 +681,7 @@ def initialize_run(self, bundle, resources): If not, returns immediately. Then, checks in with the bundle service and sees if the bundle is still assigned to this worker. If not, returns immediately. - Otherwise, tell RunManager to create the run. + Otherwise, creates the run. """ if self.exit_after_num_runs == self.num_runs: print( From 7833e8e981268ebabd7e9219e917d87f6c672270 Mon Sep 17 00:00:00 2001 From: AndrewJGaut <35617203+AndrewJGaut@users.noreply.github.com> Date: Wed, 1 Mar 2023 20:19:23 -0800 Subject: [PATCH 5/6] Fix Stress Tests (#4406) * bump version to 1.6.1 * fix mysql db deadlock * do upload first so we can check for deadlock * minor update * Modified the code to use better iterations per disk check to be more effiicnet * add changes to increment time used as well * revert chnages from rc1.6.1 since I acttually branched off from that rather than matster * minor change for debugging stress tests * minor changes * update stress tests to get more fine-grained information about what might be goign wrong * update worker manager to log image name * small changes to try and debug stress test issue * get remaining tests to pass * Add in better error logging for send_json_message * minor update to help with finalization issue * Address comments --------- Co-authored-by: Jiani Wang --- codalab/lib/upload_manager.py | 4 +- codalab/model/bundle_model.py | 40 ++++-- codalab/model/worker_model.py | 13 +- codalab/server/bundle_manager.py | 2 +- codalab/worker/upload_util.py | 2 +- .../azure_batch_worker_manager.py | 2 +- scripts/test_util.py | 47 +++++++ tests/stress/stress_test.py | 129 +++++++++--------- 8 files changed, 152 insertions(+), 87 deletions(-) diff --git a/codalab/lib/upload_manager.py b/codalab/lib/upload_manager.py index f2f740312..e57092436 100644 --- a/codalab/lib/upload_manager.py +++ b/codalab/lib/upload_manager.py @@ -244,13 +244,12 @@ def write_fileobj( try: bytes_uploaded = 0 CHUNK_SIZE = 16 * 1024 - ITERATIONS_PER_DISK_CHECK = 1 + ITERATIONS_PER_DISK_CHECK = 2000 iteration = 0 with FileSystems.create( bundle_path, compression_type=CompressionTypes.UNCOMPRESSED ) as out: while True: - iteration += 1 to_send = output_fileobj.read(CHUNK_SIZE) if not to_send: break @@ -276,6 +275,7 @@ def write_fileobj( should_resume = progress_callback(bytes_uploaded) if not should_resume: raise Exception('Upload aborted by client') + iteration += 1 with FileSystems.open( bundle_path, compression_type=CompressionTypes.UNCOMPRESSED ) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file: diff --git a/codalab/model/bundle_model.py b/codalab/model/bundle_model.py index e907ec0b8..36f1f80ac 100644 --- a/codalab/model/bundle_model.py +++ b/codalab/model/bundle_model.py @@ -2714,21 +2714,41 @@ def update_user_info(self, user_info): cl_user.update().where(cl_user.c.user_id == user_info['user_id']).values(user_info) ) - def increment_user_time_used(self, user_id, amount): + def increment_user_disk_used(self, user_id: str, amount: int): """ - User used some time. + Increment disk_used for user by amount. + When incrementing values, we have to use a special query to ensure that we avoid + race conditions or deadlock arising from multiple threads calling functions + concurrently. We do this using with_for_update() and commit(). """ - user_info = self.get_user_info(user_id) - user_info['time_used'] += amount - self.update_user_info(user_info) + with self.engine.begin() as connection: + rows = connection.execute( + select([cl_user.c.disk_used]).where(cl_user.c.user_id == user_id).with_for_update() + ) + if not rows: + raise NotFoundError("User with ID %s not found" % user_id) + disk_used = rows.first()[0] + amount + connection.execute( + cl_user.update().where(cl_user.c.user_id == user_id).values(disk_used=disk_used) + ) + connection.commit() - def increment_user_disk_used(self, user_id: str, amount: int): + def increment_user_time_used(self, user_id: str, amount: int): """ - Increment disk_used for user by amount + User used some time. + See comment for increment_user_disk_used. """ - user_info = self.get_user_info(user_id) - user_info['disk_used'] += amount - self.update_user_info(user_info) + with self.engine.begin() as connection: + rows = connection.execute( + select([cl_user.c.time_used]).where(cl_user.c.user_id == user_id).with_for_update() + ) + if not rows: + raise NotFoundError("User with ID %s not found" % user_id) + time_used = rows.first()[0] + amount + connection.execute( + cl_user.update().where(cl_user.c.user_id == user_id).values(time_used=time_used) + ) + connection.commit() def get_user_time_quota_left(self, user_id, user_info=None): if not user_info: diff --git a/codalab/model/worker_model.py b/codalab/model/worker_model.py index 95e3d98e1..6fe1ecbc4 100644 --- a/codalab/model/worker_model.py +++ b/codalab/model/worker_model.py @@ -385,8 +385,8 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret Note, only the worker should call this method with autoretry set to False. See comments below. """ - start_time = time.time() self._ping_worker_ws(worker_id) + start_time = time.time() while time.time() - start_time < timeout_secs: with closing(socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)) as sock: sock.settimeout(timeout_secs) @@ -407,15 +407,14 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret success = sock.recv(len(WorkerModel.ACK)) == WorkerModel.ACK else: success = True - except socket.error: - logging.debug("socket error when calling send_json_message") + except socket.error as e: + logging.error(f"socket error when calling send_json_message: {e}") if not success: # Shouldn't be too expensive just to keep retrying. # TODO: maybe exponential backoff - time.sleep( - 0.3 - ) # changed from 0.003 to keep from rate-limiting due to dead workers + logging.error("Sleeping for 0.1 seconds.") + time.sleep(0.3) continue if not autoretry: @@ -430,7 +429,7 @@ def send_json_message(self, socket_id, worker_id, message, timeout_secs, autoret sock.sendall(json.dumps(message).encode()) return True - + logging.info("Socket message timeout.") return False def has_reply_permission(self, user_id, worker_id, socket_id): diff --git a/codalab/server/bundle_manager.py b/codalab/server/bundle_manager.py index 9a1cc549c..64ed682bd 100644 --- a/codalab/server/bundle_manager.py +++ b/codalab/server/bundle_manager.py @@ -712,7 +712,7 @@ def _try_start_bundle(self, workers, worker, bundle, bundle_resources): worker['socket_id'], worker['worker_id'], self._construct_run_message(worker['shared_file_system'], bundle, bundle_resources), - 0.2, + 1, ): logger.info( 'Starting run bundle {} on worker {}'.format(bundle.uuid, worker['worker_id']) diff --git a/codalab/worker/upload_util.py b/codalab/worker/upload_util.py index aa9548753..d73cbaaa2 100644 --- a/codalab/worker/upload_util.py +++ b/codalab/worker/upload_util.py @@ -65,7 +65,7 @@ def upload_with_chunked_encoding( # Use chunked transfer encoding to send the data through. bytes_uploaded = 0 - ITERATIONS_PER_DISK_CHECK = 1 + ITERATIONS_PER_DISK_CHECK = 2000 iteration = 0 while True: to_send = fileobj.read(CHUNK_SIZE) diff --git a/codalab/worker_manager/azure_batch_worker_manager.py b/codalab/worker_manager/azure_batch_worker_manager.py index fad705dad..9a61aa2d0 100644 --- a/codalab/worker_manager/azure_batch_worker_manager.py +++ b/codalab/worker_manager/azure_batch_worker_manager.py @@ -97,7 +97,7 @@ def get_worker_jobs(self) -> List[WorkerJob]: def start_worker_job(self) -> None: worker_image: str = 'codalab/worker:' + os.environ.get('CODALAB_VERSION', 'latest') worker_id: str = uuid.uuid4().hex - logger.debug('Starting worker {} with image {}'.format(worker_id, worker_image)) + logger.info('Starting worker {} with image {}'.format(worker_id, worker_image)) work_dir: str = ( self.args.worker_work_dir_prefix if self.args.worker_work_dir_prefix else "/tmp/" ) diff --git a/scripts/test_util.py b/scripts/test_util.py index f750cb93b..dccc5c038 100644 --- a/scripts/test_util.py +++ b/scripts/test_util.py @@ -1,6 +1,8 @@ import io +import signal import subprocess import sys +import time import traceback global cl @@ -179,3 +181,48 @@ def cleanup(cl, tag, should_wait=True): run_command([cl, 'wrm', uuid, '--force']) worksheets_removed += 1 print('Removed {} bundles and {} worksheets.'.format(bundles_removed, worksheets_removed)) + + +class Timer: + """ + Class that uses signal to interrupt functions while they're running + if they run for longer than timeout_seconds. + Can also be used to time how long functions take within its context manager. + Used for the timing tests. + """ + + def __init__(self, timeout_seconds=1, handle_timeouts=True, uuid=None): + """ + A class that can be used as a context manager to ensure that code within that context manager times out + after timeout_seconds time and which times the execution of code within the context manager. + Parameters: + timeout_seconds (float): Amount of time before execution in context manager is interrupted for timeout + handle_timeouts (bool): If True, do not timeout, only return the time taken for execution in context manager. + uuid (str): Uuid of bundles running within context manager. + """ + self.handle_timeouts = handle_timeouts + self.timeout_seconds = timeout_seconds + self.uuid = None + + def handle_timeout(self, signum, frame): + timeout_message = "Timeout ocurred" + if self.uuid: + timeout_message += " while waiting for %s to run" % self.uuid + raise TimeoutError(timeout_message) + + def time_elapsed(self): + return time.time() - self.start_time + + def __enter__(self): + self.start_time = time.time() + if self.handle_timeouts: + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.setitimer(signal.ITIMER_REAL, self.timeout_seconds, self.timeout_seconds) + + # now, reset itimer. + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def __exit__(self, type, value, traceback): + self.time_elapsed = time.time() - self.start_time + if self.handle_timeouts: + signal.alarm(0) diff --git a/tests/stress/stress_test.py b/tests/stress/stress_test.py index 37f681e98..5e47ea7df 100644 --- a/tests/stress/stress_test.py +++ b/tests/stress/stress_test.py @@ -1,4 +1,6 @@ import argparse +from collections import defaultdict +import json import os import random import string @@ -9,7 +11,13 @@ from multiprocessing import cpu_count, Pool from threading import Thread -from scripts.test_util import cleanup, run_command +from scripts.test_util import cleanup, run_command, Timer + + +def temp_path(file_name): + root = '/tmp' + return os.path.join(root, file_name) + """ Script to stress test CodaLab's backend. The following is a list of what's being tested: @@ -38,6 +46,7 @@ class TestFile: def __init__(self, file_name, size_mb=1, content=None): self._file_name = file_name + self._file_path = temp_path(file_name) if content is None: self._size_mb = size_mb self._make_random_file() @@ -45,27 +54,30 @@ def __init__(self, file_name, size_mb=1, content=None): self._write_content(content) def _make_random_file(self): - with open(self._file_name, 'wb') as file: - file.seek(self._size_mb * 1024 * 1024) # Seek takes in file size in terms of bytes + with open(self._file_path, 'wb') as file: + file.seek(int(self._size_mb * 1024 * 1024)) # Seek takes in file size in terms of bytes file.write(b'0') - print('Created file {} of size {} MB.'.format(self._file_name, self._size_mb)) + print('Created file {} of size {} MB.'.format(self._file_path, self._size_mb)) def _write_content(self, content): - with open(self._file_name, 'w') as file: + with open(self._file_path, 'w') as file: file.write(content) def name(self): return self._file_name + def path(self): + return self._file_path + def delete(self): ''' Removes the file. ''' - if os.path.exists(self._file_name): - os.remove(self._file_name) - print('Deleted file {}.'.format(self._file_name)) + if os.path.exists(self._file_path): + os.remove(self._file_path) + print('Deleted file {}.'.format(self._file_path)) else: - print('File {} has already been deleted.'.format(self._file_name)) + print('File {} has already been deleted.'.format(self._file_path)) class StressTestRunner: @@ -106,65 +118,51 @@ class StressTestRunner: def __init__(self, cl, args): self._cl = cl self._args = args + self._runs = defaultdict(list) # Connect to the instance the stress tests will run on print('Connecting to instance %s...' % args.instance) subprocess.call([self._cl, 'work', '%s::' % args.instance]) + def time_function(self, fn): + t = Timer(handle_timeouts=False) + with t: + fn() + self.cleanup() + print(f'{fn.__name__} finished in {t.time_elapsed}') + self._runs[fn.__name__].append(t.time_elapsed) + + def test_function(self, fn): + try: + self.time_function(fn) + except Exception as e: + print(f"Exception for function {fn.__name__}: {e}") + self._runs[fn.__name__].append(str(e)) + def run(self): print('Cleaning up stress test files from other runs...') - cleanup(self._cl, StressTestRunner._TAG, should_wait=False) + cleanup(self._cl, StressTestRunner._TAG, should_wait=True) print('Running stress tests...') self._start_heartbeat() - self._test_large_bundle_result() - print('_test_large_bundle_result finished') - self.cleanup() - - self._test_large_bundle_upload() - print('_test_large_bundle_upload finished') - self.cleanup() - - self._test_many_gpu_runs() - print('_test_many_gpu_runs finished') - self.cleanup() - - self._test_multiple_cpus_runs_count() - print('_test_multiple_cpus_runs_count finished') - self.cleanup() - - self._test_many_bundle_uploads() - print('_test_many_bundle_uploads finished') - self.cleanup() - - self._test_many_worksheet_copies() - print('_test_many_worksheet_copies finished') - self.cleanup() - - self._test_parallel_runs() - print('_test_parallel_runs finished') - self.cleanup() - - self._test_many_docker_runs() - print('_test_many_docker_runs finished') - self.cleanup() - - self._test_infinite_memory() - print('_test_infinite_memory finished') - self.cleanup() - - self._test_infinite_gpu() - print('_test_infinite_gpu finished') - self.cleanup() - - self._test_infinite_disk() - print('_test_infinite_disk finished') - self.cleanup() - - self._test_many_disk_writes() - print('_test_many_disk_writes finished') - self.cleanup() + functions = [ + self._test_large_bundle_upload, + self._test_large_bundle_result, + self._test_many_gpu_runs, + self._test_multiple_cpus_runs_count, + self._test_many_bundle_uploads, + self._test_many_worksheet_copies, + self._test_parallel_runs, + self._test_many_docker_runs, + self._test_infinite_memory, + self._test_infinite_gpu, + self._test_infinite_disk, + self._test_many_disk_writes, + ] + + for fn in functions: + self.test_function(fn) print('Done.') def _start_heartbeat(self): @@ -182,8 +180,8 @@ def _heartbeat(self): if t.is_alive(): print('Heartbeat failed. Exiting...') sys.exit(1) - # Have heartbeat run every 30 seconds - time.sleep(30) + # Have heartbeat run every 10 minutes + time.sleep(600) def _test_large_bundle_result(self) -> None: def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: @@ -194,7 +192,7 @@ def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: self._set_worksheet('large_bundle_result') file: TestFile = create_large_file_in_bundle(self._args.large_dependency_size_gb) - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() dependency_uuid: str = self._run_bundle( @@ -214,7 +212,7 @@ def create_large_file_in_bundle(large_file_size_gb: int) -> TestFile: def _test_large_bundle_upload(self) -> None: self._set_worksheet('large_bundle_upload') large_file: TestFile = TestFile('large_file', self._args.large_file_size_gb * 1000) - dependency_uuid: str = self._run_bundle([self._cl, 'upload', large_file.name()]) + dependency_uuid: str = self._run_bundle([self._cl, 'upload', large_file.path()]) large_file.delete() uuid: str = self._run_bundle( [ @@ -241,7 +239,7 @@ def _test_many_bundle_uploads(self): self._set_worksheet('many_bundle_uploads') file = TestFile('small_file', 1) for _ in range(self._args.bundle_upload_count): - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() def _test_many_worksheet_copies(self): @@ -249,7 +247,7 @@ def _test_many_worksheet_copies(self): worksheet_uuid = self._set_worksheet('many_worksheet_copies') file = TestFile('copy_file', 1) for _ in range(10): - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) file.delete() # Create many worksheets with current worksheet's content copied over @@ -284,7 +282,7 @@ def _test_infinite_memory(self): return self._set_worksheet('infinite_memory') file = self._create_infinite_memory_script() - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) self._run_bundle( [self._cl, 'run', ':' + file.name(), 'python ' + file.name()], expected_exit_code=1 ) @@ -299,7 +297,7 @@ def _test_infinite_gpu(self): return self._set_worksheet('infinite_gpu') file = self._create_infinite_memory_script() - self._run_bundle([self._cl, 'upload', file.name()]) + self._run_bundle([self._cl, 'upload', file.path()]) for _ in range(self._args.infinite_gpu_runs_count): self._run_bundle( [self._cl, 'run', ':' + file.name(), 'python ' + file.name(), '--request-gpus=1'], @@ -393,6 +391,7 @@ def main(): runner.run() duration_seconds = time.time() - start_time print("--- Completion Time: {} minutes---".format(duration_seconds / 60)) + print(json.dumps(runner._runs)) if __name__ == '__main__': From 1c634d46d87553239a9b317797876cdbf6ca7f2d Mon Sep 17 00:00:00 2001 From: AndrewJGaut <35617203+AndrewJGaut@users.noreply.github.com> Date: Sat, 4 Mar 2023 17:37:10 -0800 Subject: [PATCH 6/6] increase timeouts range in both worker and bundle-manager to help alleviate the syncrhonization issue (#4421) --- codalab/rest/workers.py | 2 +- codalab/server/bundle_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/codalab/rest/workers.py b/codalab/rest/workers.py index 1cab14bf4..7ef6f1537 100644 --- a/codalab/rest/workers.py +++ b/codalab/rest/workers.py @@ -25,7 +25,7 @@ def checkin(worker_id): Waits for a message for the worker for WAIT_TIME_SECS seconds. Returns the message or None if there isn't one. """ - WAIT_TIME_SECS = 3.0 + WAIT_TIME_SECS = 5.0 # Old workers might not have all the fields, so allow subsets to be missing. socket_id = local.worker_model.worker_checkin( diff --git a/codalab/server/bundle_manager.py b/codalab/server/bundle_manager.py index 64ed682bd..f39beba7f 100644 --- a/codalab/server/bundle_manager.py +++ b/codalab/server/bundle_manager.py @@ -355,7 +355,7 @@ def _acknowledge_recently_finished_bundles(self, workers): worker['socket_id'], worker['worker_id'], {'type': 'mark_finalized', 'uuid': bundle.uuid}, - 0.2, + 1, ): logger.info( 'Acknowledged finalization of run bundle {} on worker {}'.format(