diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index d8514d67ce..340543da0f 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -98,6 +98,11 @@ logger = logging.getLogger(__name__) +# WDL options to pass into the WDL jobs and standard libraries +# task_path: Dotted WDL name of the part of the workflow this library is working for. +# execution_dir: Directory to use as the working directory for workflow code. +# enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through +# share_files_with: If set to an existing standard library instance, use the same file upload and download paths as it. WDL_OPTIONS = TypedDict('WDL_OPTIONS', {"execution_dir": NotRequired[str], "container": NotRequired[str], "share_files_with": NotRequired["ToilWDLStdLibBase"], "task_path": str}) @@ -456,6 +461,12 @@ async def toil_read_source(uri: str, path: List[str], importer: Optional[WDL.Tre def virtualized_equal(file1: WDL.Value.Base, file2: WDL.Value.Base) -> bool: + """ + Check if two WDL values are equal when taking account file virtualization. + :param file1: WDL value + :param file2: WDL value + :return: Whether the two file values are true + """ def f(file: WDL.Value.File) -> WDL.Value.File: file.value = getattr(file, "virtualized_value", file.value) return file @@ -504,11 +515,6 @@ def combine_bindings(all_bindings: Sequence[WDLBindings]) -> WDLBindings: return merged -def revert_file_to_original(file: WDL.Value.File) -> Optional[WDL.Value.File]: - original_value = getattr(file, "original_value", None) - if original_value is not None: - file.value = original_value - return file # TODO: Develop a Protocol that can match the logging function type more closely def log_bindings(log_function: Callable[..., None], message: str, all_bindings: Sequence[Promised[WDLBindings]]) -> None: @@ -801,16 +807,24 @@ def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', T return True return False -def convert_remote_files(environment: WDLBindings, file_source: Union[AbstractFileStore, Toil], search_paths: Optional[List[str]] = None) -> None: +def convert_remote_files(environment: WDLBindings, file_source: Toil, task_path: str, search_paths: Optional[List[str]] = None, skip_remote: bool = False) -> None: """ Iterate through the environment and convert all files that reference a remote file from the possible URIs to that URI. + If the file references a local file, leave it unchanged. :param environment: Bindings to evaluate on :param file_source: Context to search for remote files with - :param search_paths: List of paths to search through - """ + :param task_path: Dotted WDL name of the user-level code doing the + importing (probably the workflow name). + :param search_paths: If set, try resolving input location relative to the URLs or + directories in this list. + :param skip_remote: If set, don't try to import files from remote + locations. Leave them as URIs. + """ + path_to_id: Dict[str, uuid.UUID] = {} + @memoize def convert_file_to_url(file: WDL.Value.File) -> WDL.Value.File: """ - Detect if any potential URI exist and convert a file's value to a URI + Detect if any potential URI exists. Will convert a file's value to a URI and import it. """ # Search through any input search paths passed in and download it if found filename = file.value @@ -818,37 +832,44 @@ def convert_file_to_url(file: WDL.Value.File) -> WDL.Value.File: for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else []): tried.append(candidate_uri) try: - # Detect if the - found = False - if isinstance(file_source, AbstractJobStore): - found = file_source.url_exists(candidate_uri) - elif isinstance(file_source, Toil): - found = file_source.url_exists(candidate_uri) - if found is False: - # Wasn't found there - continue + if skip_remote and is_url(candidate_uri): + # Use remote URIs in place. But we need to find the one that exists. + if not file_source.url_exists(candidate_uri): + # Wasn't found there + continue + + # Now we know this exists, so pass it through + file.value = candidate_uri + return file + else: + # Actually import + # Try to import the file. Don't raise if we can't find it, just + # return None! + imported = file_source.import_file(candidate_uri, check_existence=False) + if imported is None: + # Wasn't found there + continue except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. logger.critical('Error: ' + str(e)) raise + except HTTPError as e: + # Something went wrong looking for it there. + logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code) + # Try the next location. + continue except Exception: # Something went wrong besides the file not being found. Maybe # we have no auth. logger.error("Something went wrong when testing for existence of %s", candidate_uri) raise - if found is False: + if imported is None: # Wasn't found there # Mostly to satisfy mypy continue - if candidate_uri.startswith("file:") or filename == candidate_uri: - # Don't replace if the original file was already found - # or if the replacement value is the same as the original - return file - logger.info('Converting input file path %s to %s', filename, candidate_uri) - # Work out what the basename for the file was file_basename = os.path.basename(urlsplit(candidate_uri).path) @@ -857,12 +878,42 @@ def convert_file_to_url(file: WDL.Value.File) -> WDL.Value.File: # download them at that basename later. raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") + if candidate_uri.startswith("file:") or filename == candidate_uri: + # Don't replace if the original file was already found + # or if the replacement value is the same as the original + return file + + # Was actually found + if is_url(candidate_uri): + # Might be a file URI or other URI. + # We need to make sure file URIs and local paths that point to + # the same place are treated the same. + parsed = urlsplit(candidate_uri) + if parsed.scheme == "file:": + # This is a local file URI. Convert to a path for source directory tracking. + parent_dir = os.path.dirname(unquote(parsed.path)) + else: + # This is some other URL. Get the URL to the parent directory and use that. + parent_dir = urljoin(candidate_uri, ".") + else: + # Must be a local path + parent_dir = os.path.dirname(candidate_uri) + + # Pack a UUID of the parent directory + dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) + + toil_uri = pack_toil_uri(imported, task_path, dir_id, file_basename) + + logger.info('Converting input file path %s to %s', filename, candidate_uri) + # Was actually found file.value = candidate_uri + setattr(file, "virtualized_value", toil_uri) return file # If we get here we tried all the candidates raise RuntimeError(f"Could not find {filename} at any of: {tried}") + map_over_typed_files_in_bindings(environment, convert_file_to_url) @@ -899,10 +950,6 @@ def __init__(self, file_store: AbstractFileStore, wdl_options: WDL_OPTIONS): """ Set up the standard library. :param wdl_options: Options to pass into the standard library to use. - Ex: task_path: Dotted WDL name of the part of the workflow this library is working for. - execution_dir: Directory to use as the working directory for workflow code. - enforce_existence: If true, then if a file is detected as nonexistent, raise an error. Else, let it pass through - share_files_with: If set to an existing standard library instance, use the same file upload and download paths as it. """ # TODO: Just always be the 1.2 standard library. wdl_version = "1.2" @@ -1019,12 +1066,7 @@ def _devirtualize_file(self, file: WDL.Value.File) -> WDL.Value.File: return file virtualized_filename = getattr(file, "virtualized_value", None) if virtualized_filename is not None: - devirtualized = self._devirtualize_filename(virtualized_filename) - if file.value != devirtualized: - # Remember the original value for when we merge bindings from finished jobs to future jobs as the devirtualized name - # is job specific, but the merge requires the file names to be the same - setattr(file, "original_value", file.value) - file.value = devirtualized + file.value = self._devirtualize_filename(virtualized_filename) return file def _virtualize_file(self, file: WDL.Value.File, enforce_existence: bool = True) -> WDL.Value.File: @@ -1066,6 +1108,9 @@ def _devirtualize_filename(self, filename: str) -> str: @staticmethod def _devirtualize_uri(filename: str, dest_dir: str, file_source: Union[AbstractFileStore, Toil], state: DirectoryNamingStateDict) -> str: + """ + Given a filename, either return the devirtualized path or the filename itself if not a virtualized URI. + """ if filename.startswith(TOIL_URI_SCHEME): # This is a reference to the Toil filestore. # Deserialize the FileID @@ -1831,7 +1876,6 @@ def ensure_null_files_are_nullable(value: WDL.Value.Base, original_value: WDL.Va :param value: WDL base value to check. This is the WDL value that has been transformed and has the null elements :param original_value: The original WDL base value prior to the transformation. Only used for error messages :param expected_type: The WDL type of the value - :return: """ if isinstance(value, WDL.Value.File): pass @@ -2369,7 +2413,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Set up the WDL standard library # UUID to use for virtualizing files # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them - standard_library = ToilWDLStdLibBase(file_store, wdl_options = {"task_path": self._wdl_options["task_path"]}) + standard_library = ToilWDLStdLibBase(file_store, wdl_options={"task_path": self._wdl_options["task_path"]}) # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) @@ -3700,7 +3744,7 @@ def main() -> None: # Get the execution directory execution_dir = os.getcwd() - convert_remote_files(input_bindings, toil, inputs_search_path) + convert_remote_files(input_bindings, toil, task_path=target.name, search_paths=inputs_search_path, skip_remote=options.reference_inputs) # Configure workflow interpreter options wdl_options: WDL_OPTIONS = {"execution_dir": execution_dir, "container": options.container, "task_path": target.name}