Skip to content

Commit

Permalink
Merge branch 'master' into rc1.6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewJGaut authored Mar 2, 2023
2 parents 887ca29 + 7833e8e commit 51a8e73
Show file tree
Hide file tree
Showing 34 changed files with 265 additions and 233 deletions.
28 changes: 28 additions & 0 deletions alembic/versions/2023021501_remove_data_hash_db3ca94867b3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""remove data hash
Revision ID: db3ca94867b3
Revises: 22b6c41cd29c
Create Date: 2023-02-15 01:43:30.027124
"""

# revision identifiers, used by Alembic.
revision = 'db3ca94867b3'
down_revision = '22b6c41cd29c'

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql

def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('bundle_data_hash_index', table_name='bundle')
op.drop_column('bundle', 'data_hash')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('bundle', sa.Column('data_hash', mysql.VARCHAR(length=63), nullable=True))
op.create_index('bundle_data_hash_index', 'bundle', ['data_hash'], unique=False)
# ### end Alembic commands ###
3 changes: 1 addition & 2 deletions codalab/bundles/derived_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DerivedBundle(NamedBundle):
)

@classmethod
def construct(cls, targets, command, metadata, owner_id, uuid, data_hash, state):
def construct(cls, targets, command, metadata, owner_id, uuid, state):
if not uuid:
uuid = spec_util.generate_uuid()
# Check that targets does not include both keyed and anonymous targets.
Expand All @@ -46,7 +46,6 @@ def construct(cls, targets, command, metadata, owner_id, uuid, data_hash, state)
'uuid': uuid,
'bundle_type': cls.BUNDLE_TYPE,
'command': command,
'data_hash': data_hash,
'state': state,
'metadata': metadata,
'dependencies': dependencies,
Expand Down
8 changes: 2 additions & 6 deletions codalab/bundles/make_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,5 @@ class MakeBundle(DerivedBundle):
)

@classmethod
def construct(
cls, targets, command, metadata, owner_id, uuid=None, data_hash=None, state=State.CREATED
):
return super(MakeBundle, cls).construct(
targets, command, metadata, owner_id, uuid, data_hash, state
)
def construct(cls, targets, command, metadata, owner_id, uuid=None, state=State.CREATED):
return super(MakeBundle, cls).construct(targets, command, metadata, owner_id, uuid, state)
1 change: 0 additions & 1 deletion codalab/bundles/private_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def construct(cls, uuid):
'bundle_type': cls.BUNDLE_TYPE,
'owner_id': None,
'command': None,
'data_hash': None,
'state': None,
'frozen': None,
'is_anonymous': None,
Expand Down
8 changes: 2 additions & 6 deletions codalab/bundles/run_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,10 @@ class RunBundle(DerivedBundle):
)

@classmethod
def construct(
cls, targets, command, metadata, owner_id, uuid=None, data_hash=None, state=State.CREATED
):
def construct(cls, targets, command, metadata, owner_id, uuid=None, state=State.CREATED):
if not isinstance(command, str):
raise UsageError('%r is not a valid command!' % (command,))
return super(RunBundle, cls).construct(
targets, command, metadata, owner_id, uuid, data_hash, state
)
return super(RunBundle, cls).construct(targets, command, metadata, owner_id, uuid, state)

def validate(self):
super(RunBundle, self).validate()
Expand Down
1 change: 0 additions & 1 deletion codalab/bundles/uploaded_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def construct(cls, metadata, owner_id, uuid=None):
row = {
'bundle_type': cls.BUNDLE_TYPE,
'command': None,
'data_hash': None,
'state': State.READY,
'metadata': metadata,
'dependencies': [],
Expand Down
17 changes: 1 addition & 16 deletions codalab/lib/bundle_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2405,7 +2405,6 @@ def print_basic_info(self, client, info, raw):
for key in (
'bundle_type',
'uuid',
'data_hash',
'state',
'command',
'frozen',
Expand Down Expand Up @@ -4357,27 +4356,13 @@ def do_ls_partitions_command(self, args):
help='Perform all garbage collection and database updates instead of just printing what would happen',
action='store_true',
),
Commands.Argument(
'-d',
'--data-hash',
help='Compute the digest for every bundle and compare against data_hash for consistency',
action='store_true',
),
Commands.Argument(
'-r',
'--repair',
help='When used with --force and --data-hash, repairs incorrect data_hash in existing bundles',
action='store_true',
),
),
)
def do_bs_health_check(self, args):
self._fail_if_headless(args)
self._fail_if_not_local(args)
print('Performing Health Check...', file=sys.stderr)
self.manager.bundle_store().health_check(
self.manager.model(), args.force, args.data_hash, args.repair
)
self.manager.bundle_store().health_check(self.manager.model(), args.force)

def _fail_if_headless(self, args):
if self.headless:
Expand Down
46 changes: 2 additions & 44 deletions codalab/lib/bundle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def cleanup(self, bundle_location, dry_run):
return False
return path_util.remove(bundle_location)

def health_check(self, model, force=False, compute_data_hash=False, repair_hashes=False):
def health_check(self, model, force=False):
"""
MultiDiskBundleStore.health_check(): In the MultiDiskBundleStore, bundle contents are stored on disk, and
occasionally the disk gets out of sync with the database, in which case we make repairs in the following ways:
Expand All @@ -247,8 +247,7 @@ def health_check(self, model, force=False, compute_data_hash=False, repair_hashe
5. For bundle <UUID> marked READY or FAILED, <UUID>.cid or <UUID>.status, or the <UUID>(-internal).sh files
should not exist.
|force|: Perform any destructive operations on the bundle store the health check determines are necessary. False by default
|compute_data_hash|: If True, compute the data_hash for every single bundle ourselves and see if it's consistent with what's in
the database. False by default.
"""
UUID_REGEX = re.compile(r'^(%s)' % spec_util.UUID_STR)

Expand Down Expand Up @@ -345,52 +344,11 @@ def _check_other_paths(other_paths, db_bundle_by_uuid):
trash_count += 1
_delete_path(to_delete)

# Check for each bundle if we need to compute its data_hash
data_hash_recomputed = 0

print('Checking data_hash of bundles in partition %s...' % partition, file=sys.stderr)
for bundle_path in bundle_paths:
uuid = _get_uuid(bundle_path)
bundle = db_bundle_by_uuid.get(uuid, None)
if bundle is None:
continue
if compute_data_hash or bundle.data_hash is None:
dirs_and_files = (
path_util.recursive_ls(bundle_path)
if os.path.isdir(bundle_path)
else ([], [bundle_path])
)
data_hash = '0x%s' % path_util.hash_directory(bundle_path, dirs_and_files)
if bundle.data_hash is None:
data_hash_recomputed += 1
print(
'Giving bundle %s data_hash %s' % (bundle_path, data_hash),
file=sys.stderr,
)
if force:
db_update = dict(data_hash=data_hash)
model.update_bundle(bundle, db_update)
elif compute_data_hash and data_hash != bundle.data_hash:
data_hash_recomputed += 1
print(
'Bundle %s should have data_hash %s, actual digest is %s'
% (bundle_path, bundle.data_hash, data_hash),
file=sys.stderr,
)
if repair_hashes and force:
db_update = dict(data_hash=data_hash)
model.update_bundle(bundle, db_update)

if force:
print('\tDeleted %d objects from the bundle store' % trash_count, file=sys.stderr)
print('\tRecomputed data_hash for %d bundles' % data_hash_recomputed, file=sys.stderr)
else:
print('Dry-Run Statistics, re-run with --force to perform updates:', file=sys.stderr)
print('\tObjects marked for deletion: %d' % trash_count, file=sys.stderr)
print(
'\tBundles that need data_hash recompute: %d' % data_hash_recomputed,
file=sys.stderr,
)


BundleLocation = TypedDict(
Expand Down
1 change: 0 additions & 1 deletion codalab/lib/bundle_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def bundle_to_bundle_info(model, bundle):
bundle.bundle_type,
bundle.owner_id,
bundle.command,
bundle.data_hash,
bundle.state,
bundle.frozen,
bundle.is_anonymous,
Expand Down
2 changes: 1 addition & 1 deletion codalab/lib/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def ratio_str(to_str, a, b):
"""
Example: to_str = duration_str, a = 60, b = 120 => "1m / 2m (50%)"
"""
return '%s / %s (%.1f%%)' % (to_str(a), to_str(b), 100.0 * a / b)
return '%s / %s (%.1f%%)' % (to_str(a), to_str(b), 100.0 * a / b if b != 0 else 100.0)


def parse_size(s):
Expand Down
6 changes: 3 additions & 3 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,12 @@ def write_fileobj(
try:
bytes_uploaded = 0
CHUNK_SIZE = 16 * 1024
ITERATIONS_PER_DISK_CHECK = 1
ITERATIONS_PER_DISK_CHECK = 2000
iteration = 0
with FileSystems.create(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as out:
while True:
iteration += 1
to_send = output_fileobj.read(CHUNK_SIZE)
if not to_send:
break
Expand All @@ -276,6 +275,7 @@ def write_fileobj(
should_resume = progress_callback(bytes_uploaded)
if not should_resume:
raise Exception('Upload aborted by client')
iteration += 1
with FileSystems.open(
bundle_path, compression_type=CompressionTypes.UNCOMPRESSED
) as ttf, tempfile.NamedTemporaryFile(suffix=".sqlite") as tmp_index_file:
Expand Down Expand Up @@ -368,7 +368,7 @@ def has_contents(self, bundle):
def cleanup_existing_contents(self, bundle):
data_size = self._bundle_model.get_bundle_metadata(bundle.uuid, 'data_size')[bundle.uuid]
removed = self._bundle_store.cleanup(bundle.uuid, dry_run=False)
bundle_update = {'data_hash': None, 'metadata': {'data_size': 0}}
bundle_update = {'metadata': {'data_size': 0}}
self._bundle_model.update_bundle(bundle, bundle_update)
if removed:
self._bundle_model.increment_user_disk_used(bundle.owner_id, -data_size)
Expand Down
60 changes: 32 additions & 28 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def add_join(table: Table, condition: Any, left_outer_join: bool = False):
elif key == '.format':
format_func = value
# Bundle fields
elif key in ('bundle_type', 'id', 'uuid', 'data_hash', 'state', 'command', 'owner_id'):
elif key in ('bundle_type', 'id', 'uuid', 'state', 'command', 'owner_id'):
conjunct = make_condition(key, getattr(cl_bundle.c, key), value)
elif key == '.shared': # shared with any group I am in with read permission
add_join(
Expand Down Expand Up @@ -1141,7 +1141,6 @@ def update_disk_metadata(self, bundle, bundle_location, enforce_disk_quota=False
dirs_and_files = [], [bundle_location]

# TODO(Ashwin): make this non-fs specific
data_hash = '0x%s' % (path_util.hash_directory(bundle_location, dirs_and_files))
data_size = path_util.get_size(bundle_location, dirs_and_files)
try:
if 'data_size' in bundle.metadata.__dict__:
Expand All @@ -1161,7 +1160,7 @@ def update_disk_metadata(self, bundle, bundle_location, enforce_disk_quota=False
% (data_size, disk_left)
)

bundle_update = {'data_hash': data_hash, 'metadata': {'data_size': data_size}}
bundle_update = {'metadata': {'data_size': data_size}}
self.update_bundle(bundle, bundle_update)
self.increment_user_disk_used(bundle.owner_id, disk_increment)

Expand Down Expand Up @@ -1358,12 +1357,6 @@ def delete_bundles(self, uuids):
connection.execute(cl_worker_run.delete().where(cl_worker_run.c.run_uuid.in_(uuids)))
connection.execute(cl_bundle.delete().where(cl_bundle.c.uuid.in_(uuids)))

def remove_data_hash_references(self, uuids):
with self.engine.begin() as connection:
connection.execute(
cl_bundle.update().where(cl_bundle.c.uuid.in_(uuids)).values({'data_hash': None})
)

# ==========================================================================
# Worksheet-related model methods follow!
# ==========================================================================
Expand Down Expand Up @@ -2721,21 +2714,41 @@ def update_user_info(self, user_info):
cl_user.update().where(cl_user.c.user_id == user_info['user_id']).values(user_info)
)

def increment_user_time_used(self, user_id, amount):
def increment_user_disk_used(self, user_id: str, amount: int):
"""
User used some time.
Increment disk_used for user by amount.
When incrementing values, we have to use a special query to ensure that we avoid
race conditions or deadlock arising from multiple threads calling functions
concurrently. We do this using with_for_update() and commit().
"""
user_info = self.get_user_info(user_id)
user_info['time_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.disk_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
disk_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(disk_used=disk_used)
)
connection.commit()

def increment_user_disk_used(self, user_id: str, amount: int):
def increment_user_time_used(self, user_id: str, amount: int):
"""
Increment disk_used for user by amount
User used some time.
See comment for increment_user_disk_used.
"""
user_info = self.get_user_info(user_id)
user_info['disk_used'] += amount
self.update_user_info(user_info)
with self.engine.begin() as connection:
rows = connection.execute(
select([cl_user.c.time_used]).where(cl_user.c.user_id == user_id).with_for_update()
)
if not rows:
raise NotFoundError("User with ID %s not found" % user_id)
time_used = rows.first()[0] + amount
connection.execute(
cl_user.update().where(cl_user.c.user_id == user_id).values(time_used=time_used)
)
connection.commit()

def get_user_time_quota_left(self, user_id, user_info=None):
if not user_info:
Expand Down Expand Up @@ -2769,15 +2782,6 @@ def update_user_last_login(self, user_id):
"""
self.update_user_info({'user_id': user_id, 'last_login': datetime.datetime.utcnow()})

def _get_disk_used(self, user_id):
# TODO(Ashwin): don't include linked bundles
return (
self.search_bundles(user_id, ['size=.sum', 'owner_id=' + user_id, 'data_hash=%'])[
'result'
]
or 0
)

def get_user_disk_quota_left(self, user_id, user_info=None):
if not user_info:
user_info = self.get_user_info(user_id)
Expand Down
3 changes: 0 additions & 3 deletions codalab/model/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
Column('bundle_type', String(63), nullable=False),
# The command will be NULL except for run bundles.
Column('command', Text, nullable=True),
# The data_hash will be NULL if the bundle's value is still being computed.
Column('data_hash', String(63), nullable=True),
Column('state', String(63), nullable=False),
Column('owner_id', String(255), nullable=True),
Column('frozen', DateTime, nullable=True), # When the bundle was frozen, if it is.
Expand All @@ -57,7 +55,6 @@
'is_dir', Boolean, nullable=True,
), # Whether the bundle is a directory or just a single file. If set to null, nothing has been uploaded for the bundle yet.
UniqueConstraint('uuid', name='uix_1'),
Index('bundle_data_hash_index', 'data_hash'),
Index('state_index', 'state'), # Needed for the bundle manager.
mysql_charset=TABLE_DEFAULT_CHARSET,
)
Expand Down
Loading

0 comments on commit 51a8e73

Please sign in to comment.