Skip to content

Commit

Permalink
Add documentation and make convert_files function import greedily
Browse files Browse the repository at this point in the history
  • Loading branch information
stxue1 committed Sep 6, 2024
1 parent f5f3eea commit 37efd55
Showing 1 changed file with 82 additions and 38 deletions.
120 changes: 82 additions & 38 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@

logger = logging.getLogger(__name__)

# WDL options to pass into the WDL jobs and standard libraries
# task_path: Dotted WDL name of the part of the workflow this library is working for.
# execution_dir: Directory to use as the working directory for workflow code.
# enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through
# share_files_with: If set to an existing standard library instance, use the same file upload and download paths as it.
WDL_OPTIONS = TypedDict('WDL_OPTIONS', {"execution_dir": NotRequired[str], "container": NotRequired[str],
"share_files_with": NotRequired["ToilWDLStdLibBase"], "task_path": str})

Expand Down Expand Up @@ -456,6 +461,12 @@ async def toil_read_source(uri: str, path: List[str], importer: Optional[WDL.Tre


def virtualized_equal(file1: WDL.Value.Base, file2: WDL.Value.Base) -> bool:
"""
Check if two WDL values are equal when taking account file virtualization.
:param file1: WDL value
:param file2: WDL value
:return: Whether the two file values are true
"""
def f(file: WDL.Value.File) -> WDL.Value.File:
file.value = getattr(file, "virtualized_value", file.value)
return file
Expand Down Expand Up @@ -504,11 +515,6 @@ def combine_bindings(all_bindings: Sequence[WDLBindings]) -> WDLBindings:

return merged

def revert_file_to_original(file: WDL.Value.File) -> Optional[WDL.Value.File]:
original_value = getattr(file, "original_value", None)
if original_value is not None:
file.value = original_value
return file

# TODO: Develop a Protocol that can match the logging function type more closely
def log_bindings(log_function: Callable[..., None], message: str, all_bindings: Sequence[Promised[WDLBindings]]) -> None:
Expand Down Expand Up @@ -801,54 +807,69 @@ def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', T
return True
return False

def convert_remote_files(environment: WDLBindings, file_source: Union[AbstractFileStore, Toil], search_paths: Optional[List[str]] = None) -> None:
def convert_remote_files(environment: WDLBindings, file_source: Toil, task_path: str, search_paths: Optional[List[str]] = None, skip_remote: bool = False) -> None:
"""
Iterate through the environment and convert all files that reference a remote file from the possible URIs to that URI.
If the file references a local file, leave it unchanged.
:param environment: Bindings to evaluate on
:param file_source: Context to search for remote files with
:param search_paths: List of paths to search through
"""
:param task_path: Dotted WDL name of the user-level code doing the
importing (probably the workflow name).
:param search_paths: If set, try resolving input location relative to the URLs or
directories in this list.
:param skip_remote: If set, don't try to import files from remote
locations. Leave them as URIs.
"""
path_to_id: Dict[str, uuid.UUID] = {}
@memoize
def convert_file_to_url(file: WDL.Value.File) -> WDL.Value.File:
"""
Detect if any potential URI exist and convert a file's value to a URI
Detect if any potential URI exists. Will convert a file's value to a URI and import it.
"""
# Search through any input search paths passed in and download it if found
filename = file.value
tried = []
for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else []):
tried.append(candidate_uri)
try:
# Detect if the
found = False
if isinstance(file_source, AbstractJobStore):
found = file_source.url_exists(candidate_uri)
elif isinstance(file_source, Toil):
found = file_source.url_exists(candidate_uri)
if found is False:
# Wasn't found there
continue
if skip_remote and is_url(candidate_uri):
# Use remote URIs in place. But we need to find the one that exists.
if not file_source.url_exists(candidate_uri):
# Wasn't found there
continue

# Now we know this exists, so pass it through
file.value = candidate_uri
return file
else:
# Actually import
# Try to import the file. Don't raise if we can't find it, just
# return None!
imported = file_source.import_file(candidate_uri, check_existence=False)
if imported is None:
# Wasn't found there
continue
except UnimplementedURLException as e:
# We can't find anything that can even support this URL scheme.
# Report to the user, they are probably missing an extra.
logger.critical('Error: ' + str(e))
raise
except HTTPError as e:
# Something went wrong looking for it there.
logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code)
# Try the next location.
continue
except Exception:
# Something went wrong besides the file not being found. Maybe
# we have no auth.
logger.error("Something went wrong when testing for existence of %s", candidate_uri)
raise

if found is False:
if imported is None:
# Wasn't found there
# Mostly to satisfy mypy
continue

if candidate_uri.startswith("file:") or filename == candidate_uri:
# Don't replace if the original file was already found
# or if the replacement value is the same as the original
return file
logger.info('Converting input file path %s to %s', filename, candidate_uri)

# Work out what the basename for the file was
file_basename = os.path.basename(urlsplit(candidate_uri).path)

Expand All @@ -857,12 +878,42 @@ def convert_file_to_url(file: WDL.Value.File) -> WDL.Value.File:
# download them at that basename later.
raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File")

if candidate_uri.startswith("file:") or filename == candidate_uri:
# Don't replace if the original file was already found
# or if the replacement value is the same as the original
return file

# Was actually found
if is_url(candidate_uri):
# Might be a file URI or other URI.
# We need to make sure file URIs and local paths that point to
# the same place are treated the same.
parsed = urlsplit(candidate_uri)
if parsed.scheme == "file:":
# This is a local file URI. Convert to a path for source directory tracking.
parent_dir = os.path.dirname(unquote(parsed.path))
else:
# This is some other URL. Get the URL to the parent directory and use that.
parent_dir = urljoin(candidate_uri, ".")
else:
# Must be a local path
parent_dir = os.path.dirname(candidate_uri)

# Pack a UUID of the parent directory
dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4())

toil_uri = pack_toil_uri(imported, task_path, dir_id, file_basename)

logger.info('Converting input file path %s to %s', filename, candidate_uri)

# Was actually found
file.value = candidate_uri
setattr(file, "virtualized_value", toil_uri)
return file

# If we get here we tried all the candidates
raise RuntimeError(f"Could not find {filename} at any of: {tried}")

map_over_typed_files_in_bindings(environment, convert_file_to_url)


Expand Down Expand Up @@ -899,10 +950,6 @@ def __init__(self, file_store: AbstractFileStore, wdl_options: WDL_OPTIONS):
"""
Set up the standard library.
:param wdl_options: Options to pass into the standard library to use.
Ex: task_path: Dotted WDL name of the part of the workflow this library is working for.
execution_dir: Directory to use as the working directory for workflow code.
enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through
share_files_with: If set to an existing standard library instance, use the same file upload and download paths as it.
"""
# TODO: Just always be the 1.2 standard library.
wdl_version = "1.2"
Expand Down Expand Up @@ -1019,12 +1066,7 @@ def _devirtualize_file(self, file: WDL.Value.File) -> WDL.Value.File:
return file
virtualized_filename = getattr(file, "virtualized_value", None)
if virtualized_filename is not None:
devirtualized = self._devirtualize_filename(virtualized_filename)
if file.value != devirtualized:
# Remember the original value for when we merge bindings from finished jobs to future jobs as the devirtualized name
# is job specific, but the merge requires the file names to be the same
setattr(file, "original_value", file.value)
file.value = devirtualized
file.value = self._devirtualize_filename(virtualized_filename)
return file

def _virtualize_file(self, file: WDL.Value.File, enforce_existence: bool = True) -> WDL.Value.File:
Expand Down Expand Up @@ -1066,6 +1108,9 @@ def _devirtualize_filename(self, filename: str) -> str:

@staticmethod
def _devirtualize_uri(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], state: DirectoryNamingStateDict) -> str:
"""
Given a filename, either return the devirtualized path or the filename itself if not a virtualized URI.
"""
if filename.startswith(TOIL_URI_SCHEME):
# This is a reference to the Toil filestore.
# Deserialize the FileID
Expand Down Expand Up @@ -1831,7 +1876,6 @@ def ensure_null_files_are_nullable(value: WDL.Value.Base, original_value: WDL.Va
:param value: WDL base value to check. This is the WDL value that has been transformed and has the null elements
:param original_value: The original WDL base value prior to the transformation. Only used for error messages
:param expected_type: The WDL type of the value
:return:
"""
if isinstance(value, WDL.Value.File):
pass
Expand Down Expand Up @@ -2369,7 +2413,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
# Set up the WDL standard library
# UUID to use for virtualizing files
# We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them
standard_library = ToilWDLStdLibBase(file_store, wdl_options = {"task_path": self._wdl_options["task_path"]})
standard_library = ToilWDLStdLibBase(file_store, wdl_options={"task_path": self._wdl_options["task_path"]})

# Get the bindings from after the input section
bindings = unwrap(self._task_internal_bindings)
Expand Down Expand Up @@ -3700,7 +3744,7 @@ def main() -> None:
# Get the execution directory
execution_dir = os.getcwd()

convert_remote_files(input_bindings, toil, inputs_search_path)
convert_remote_files(input_bindings, toil, task_path=target.name, search_paths=inputs_search_path, skip_remote=options.reference_inputs)

# Configure workflow interpreter options
wdl_options: WDL_OPTIONS = {"execution_dir": execution_dir, "container": options.container, "task_path": target.name}
Expand Down

0 comments on commit 37efd55

Please sign in to comment.