Skip to content

Commit

Permalink
Dependency manager hotfixes (#942)
Browse files Browse the repository at this point in the history
* Fix dict sorting in local dependency manager to get rid of key error, drastically increase serialized length limit to dependency cache as it fires too often, grinding the system to a halt
* Fix key error in Docker image cleanup as well
* Actually remove the path of the dependency when cleaning up
* More granular locking in dependency manager
  • Loading branch information
Kerem Goksel authored Aug 21, 2018
1 parent 251ba23 commit 609eefe
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 77 deletions.
8 changes: 4 additions & 4 deletions worker/codalabworker/local_run/docker_image_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ def _cleanup(self):
}
if failed_images:
digest_to_remove = min(
failed_images, key=lambda i: failed_images[i].last_used
)
failed_images.iteritems(), key=lambda image, state: state.last_used
)[0]
elif ready_images:
digest_to_remove = min(
ready_images, key=lambda i: ready_images[i].last_used
)
ready_images.iteritems(), key=lambda image, state: state.last_used
)[0]
else:
logger.debug(
'Docker image manager disk quota is full but there are only downloading images. Waiting for downloads to finishe before cleanup.'
Expand Down
182 changes: 112 additions & 70 deletions worker/codalabworker/local_run/local_dependency_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import shutil

from codalabworker.file_util import un_tar_directory
from codalabworker.file_util import remove_path, un_tar_directory
from codalabworker.formatting import size_str
from codalabworker.fsm import BaseDependencyManager, DependencyStage, StateTransitioner
import codalabworker.pyjson
Expand Down Expand Up @@ -45,7 +45,6 @@ class LocalFileSystemDependencyManager(StateTransitioner, BaseDependencyManager)
def __init__(
self, commit_file, bundle_service, worker_dir, max_cache_size_bytes, max_serialized_length
):

super(LocalFileSystemDependencyManager, self).__init__()
self.add_transition(DependencyStage.DOWNLOADING, self._transition_from_DOWNLOADING)
self.add_terminal(DependencyStage.READY)
Expand All @@ -62,7 +61,10 @@ def __init__(
logger.info('{} doesn\'t exist, creating.'.format(self.dependencies_dir))
os.makedirs(self.dependencies_dir, 0o770)

self._lock = threading.RLock()
# Locks for concurrency
self._dependency_locks = dict() # (parent_uuid, parent_path) -> threading.RLock
self._global_lock = threading.RLock() # Used for add/remove actions
self._paths_lock = threading.RLock() # Used for path name computations

# File paths that are currently being used to store dependencies. Used to prevent conflicts
self._paths = set()
Expand All @@ -76,49 +78,48 @@ def __init__(
self._main_thread = None

def _save_state(self):
with self._lock:
with self._global_lock, self._paths_lock:
self._state_committer.commit({'dependencies': self._dependencies, 'paths': self._paths})

def _load_state(self):
with self._lock:
state = self._state_committer.load(default={'dependencies': {}, 'paths': set()})
dependencies = {}
paths = set()
for dep, dep_state in state['dependencies'].items():
full_path = os.path.join(self.dependencies_dir, dep_state.path)
if os.path.exists(full_path):
dependencies[dep] = dep_state
else:
logger.info(
"Dependency {} in loaded state but its path {} doesn't exist in the filesystem".format(
dep, full_path
)
)
if dep_state.path not in state['paths']:
state['paths'].add(dep_state.path)
logger.info(
"Dependency {} in loaded state but its path {} is not in the loaded paths {}".format(
dep, dep_state.path, state['paths']
)
state = self._state_committer.load(default={'dependencies': {}, 'paths': set()})
dependencies = {}
dependency_locks = {}
paths = set()
for dep, dep_state in state['dependencies'].items():
full_path = os.path.join(self.dependencies_dir, dep_state.path)
if os.path.exists(full_path):
dependencies[dep] = dep_state
dependency_locks[dep] = threading.RLock()
else:
logger.info(
"Dependency {} in loaded state but its path {} doesn't exist in the filesystem".format(
dep, full_path
)
for path in state['paths']:
full_path = os.path.join(self.dependencies_dir, path)
if os.path.exists(full_path):
paths.add(path)
else:
logger.info(
"Path {} in loaded state but doesn't exist in the filesystem".format(
full_path
)
)
if dep_state.path not in state['paths']:
state['paths'].add(dep_state.path)
logger.info(
"Dependency {} in loaded state but its path {} is not in the loaded paths {}".format(
dep, dep_state.path, state['paths']
)
)
for path in state['paths']:
full_path = os.path.join(self.dependencies_dir, path)
if os.path.exists(full_path):
paths.add(path)
else:
logger.info(
"Path {} in loaded state but doesn't exist in the filesystem".format(full_path)
)

with self._global_lock, self._paths_lock:
self._dependencies = dependencies
self._dependency_locks = dependency_locks
self._paths = paths
logger.info(
'{} dependencies, {} paths in cache.'.format(
len(self._dependencies), len(self._paths)
)
)
logger.info(
'{} dependencies, {} paths in cache.'.format(len(self._dependencies), len(self._paths))
)

def start(self):
def loop(self):
Expand All @@ -142,18 +143,20 @@ def stop(self):
logger.info("Stopped local dependency manager. Exiting.")

def _process_dependencies(self):
with self._lock:
for entry, state in self._dependencies.items():
for entry, state in self._dependencies.items():
with self._dependency_locks[entry]:
self._dependencies[entry] = self.transition(state)

def _cleanup(self):
"""
Limit the disk usage of the dependencies (both the bundle files and the serialied state file size)
Limit the disk usage of the dependencies (both the bundle files and the serialized state file size)
Deletes oldest failed dependencies first and then oldest finished dependencies.
Doesn't touch downloading dependencies.
"""
# With all the locks (should be fast if no cleanup needed, otherwise make sure nothing is corrupted
while True:
with self._lock:
with self._global_lock:
self._acquire_all_locks()
bytes_used = sum(dep.size_bytes for dep in self._dependencies.values())
serialized_length = len(codalabworker.pyjson.dumps(self._dependencies))
if (
Expand All @@ -179,37 +182,47 @@ def _cleanup(self):
if state.stage == DependencyStage.READY and not state.dependents
}
if failed_deps:
dep_to_remove = min(failed_deps, key=lambda i: failed_deps[i].last_used)
dep_to_remove = min(
failed_deps.iteritems(), key=lambda dep, state: state.last_used
)[0]
elif ready_deps:
dep_to_remove = min(ready_deps, key=lambda i: ready_deps[i].last_used)
dep_to_remove = min(
ready_deps.iteritems(), key=lambda dep, state: state.last_used
)[0]
else:
logger.info(
'Dependency quota full but there are only downloading dependencies, not cleaning up until downloads are over'
)
break
try:
self._paths.remove(self._dependencies[dep_to_remove].path)
path_to_remove = self._dependencies[dep_to_remove].path
self._paths.remove(path_to_remove)
remove_path(path_to_remove)
finally:
if dep_to_remove:
del self._dependencies[dep_to_remove]
self._release_all_locks()
else:
self._release_all_locks()
break

def has(self, dependency):
"""
Takes a dependency = (parent_uuid, parent_path)
Returns true if the manager has processed this dependency
"""
with self._lock:
with self._global_lock:
return dependency in self._dependencies

def get(self, uuid, dependency):
"""
Request the dependency for the run with uuid, registering uuid as a dependent of this dependency
"""
now = time.time()
with self._lock:
if not self.has(dependency): # add dependency state if it does not exist
if not self._acquire_if_exists(dependency): # add dependency state if it does not exist
with self._global_lock:
self._dependency_locks[dependency] = threading.RLock()
self._dependency_locks[dependency].acquire()
self._dependencies[dependency] = DependencyState(
stage=DependencyStage.DOWNLOADING,
dependency=dependency,
Expand All @@ -221,31 +234,59 @@ def get(self, uuid, dependency):
killed=False,
)

# update last_used as long as it isn't in FAILED
if self._dependencies[dependency].stage != DependencyStage.FAILED:
self._dependencies[dependency].dependents.add(uuid)
self._dependencies[dependency] = self._dependencies[dependency]._replace(
last_used=now
)
return self._dependencies[dependency]
# update last_used as long as it isn't in FAILED
if self._dependencies[dependency].stage != DependencyStage.FAILED:
self._dependencies[dependency].dependents.add(uuid)
self._dependencies[dependency] = self._dependencies[dependency]._replace(last_used=now)
self._dependency_locks[dependency].release()
return self._dependencies[dependency]

def release(self, uuid, dependency):
"""
Register that the run with uuid is no longer dependent on this dependency
If no more runs are dependent on this dependency, kill it
"""
with self._lock:
if self.has(dependency):
dep_state = self._dependencies[dependency]
if uuid in dep_state.dependents:
dep_state.dependents.remove(uuid)
if not dep_state.dependents:
dep_state = dep_state._replace(killed=True)
self._dependencies[dependency] = dep_state
if self._acquire_if_exists(dependency):
dep_state = self._dependencies[dependency]
if uuid in dep_state.dependents:
dep_state.dependents.remove(uuid)
if not dep_state.dependents:
dep_state = dep_state._replace(killed=True)
self._dependencies[dependency] = dep_state
self._dependency_locks[dependency].release()

def _acquire_if_exists(self, dependency):
"""
Safely acquires a lock for the given dependency if it exists
Returns True if depedendency exists, False otherwise
Callers should remember to release the lock
"""
with self._global_lock:
if dependency in self._dependencies:
self._dependency_locks[dependency].acquire()
return True
else:
return False

def _acquire_all_locks(self):
"""
Acquires all dependency locks in the thread it's called from
"""
with self._global_lock:
for dependency, lock in self._dependency_locks.iteritems():
lock.acquire()

def _release_all_locks(self):
"""
Releases all dependency locks in the thread it's called from
"""
with self._global_lock:
for dependency, lock in self._dependency_locks.iteritems():
lock.release()

def _assign_path(self, dependency):
"""
Normalize the path for the dependency by replacing / with _, aboiding conflicts
Normalize the path for the dependency by replacing / with _, avoiding conflicts
"""
parent_uuid, parent_path = dependency
if parent_path:
Expand All @@ -256,7 +297,7 @@ def _assign_path(self, dependency):

# You could have a conflict between, for example a/b_c and
# a_b/c. We have to avoid those.
with self._lock:
with self._paths_lock:
while path in self._paths:
path = path + '_'
self._paths.add(path)
Expand All @@ -278,7 +319,7 @@ def _store_dependency(self, dependency_path, fileobj, target_type):

@property
def all_dependencies(self):
with self._lock:
with self._global_lock:
return list(self._dependencies.keys())

def _transition_from_DOWNLOADING(self, dependency_state):
Expand All @@ -288,7 +329,7 @@ def update_state_and_check_killed(bytes_downloaded):
Callback method for bundle service client updates dependency state and
raises DownloadAbortedException if download is killed by dep. manager
"""
with self._lock:
with self._dependency_locks[dependency]:
state = self._dependencies[dependency]
if state.killed:
raise DownloadAbortedException("Aborted by user")
Expand Down Expand Up @@ -329,11 +370,11 @@ def interruptable_read(*args, **kwargs):
parent_path,
dependency_path,
)
with self._lock:
with self._dependency_locks[dependency]:
self._downloading[dependency]['success'] = True

except Exception as e:
with self._lock:
with self._dependency_locks[dependency]:
self._downloading[dependency]['success'] = False
self._downloading[dependency][
'failure_message'
Expand All @@ -355,5 +396,6 @@ def interruptable_read(*args, **kwargs):
stage=DependencyStage.READY, message="Download complete"
)
else:
self._paths.remove(dependency_state.path)
with self._paths_lock:
self._paths.remove(dependency_state.path)
return dependency_state._replace(stage=DependencyStage.FAILED, message=failure_message)
5 changes: 2 additions & 3 deletions worker/codalabworker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ def main():
parser.add_argument(
'--max-dependencies-serialized-length',
type=int,
default=60000,
help='Maximum length of serialized json of dependency list of worker '
'(e.g., 50, 30000, 60000).',
default=10000000,
help='Maximum length of serialized json of dependency list of worker in bytes',
)
parser.add_argument(
'--max-image-cache-size',
Expand Down

0 comments on commit 609eefe

Please sign in to comment.