From c3ea3f1443cf4209a86bb341bc6624745aba5a40 Mon Sep 17 00:00:00 2001 From: Bo Peng Date: Thu, 8 Feb 2024 23:41:29 -0600 Subject: [PATCH] Remove resolve remote --- setup.py | 1 - src/sos/targets.py | 665 ++++++++++----------------------------- src/sos/task_executor.py | 3 +- 3 files changed, 173 insertions(+), 496 deletions(-) diff --git a/setup.py b/setup.py index a5b648d9b..21a4fca3c 100644 --- a/setup.py +++ b/setup.py @@ -112,7 +112,6 @@ def run(self): file_target = sos.targets:file_target sos_tempfile = sos.targets:sos_tempfile dynamic = sos.targets:dynamic -remote = sos.targets:remote executable = sos.targets:executable sos_variable = sos.targets:sos_variable sos_step = sos.targets:sos_step diff --git a/src/sos/targets.py b/src/sos/targets.py index 34f66f18d..020ba585b 100644 --- a/src/sos/targets.py +++ b/src/sos/targets.py @@ -21,12 +21,10 @@ import fasteners import pkg_resources -from .controller import (request_answer_from_controller, - send_message_to_controller) +from .controller import (request_answer_from_controller, send_message_to_controller) from .eval import get_config, interpolate from .pattern import extract_pattern -from .utils import (Error, env, fileMD5, objectMD5, pickleable, short_repr, - stable_repr, textMD5) +from .utils import (Error, env, fileMD5, objectMD5, pickleable, short_repr, stable_repr, textMD5) __all__ = ["dynamic", "executable", "env_variable", "sos_variable"] @@ -39,8 +37,7 @@ def is_basic_type(obj): if isinstance(obj, (tuple, list, set)): return all(is_basic_type(x) for x in obj) if isinstance(obj, dict): - return all(is_basic_type(x) for x in obj.keys()) and all( - is_basic_type(x) for x in obj.values()) + return all(is_basic_type(x) for x in obj.keys()) and all(is_basic_type(x) for x in obj.values()) if isinstance(obj, (file_target, path, paths)): return True # we support types defined in numpy and pandas, but not others @@ -103,30 +100,23 @@ def set_traced(self): def set(self, *args, **kwargs): if args: if len(args) != 2: - raise ValueError( - "set(name, value) or set(name=value) is expected.") + raise ValueError("set(name, value) or set(name=value) is expected.") if not is_basic_type(args[1]): - env.logger.warning( - f"Failed to set attribute: {args[1]} is or contains unsupported data type." - ) + env.logger.warning(f"Failed to set attribute: {args[1]} is or contains unsupported data type.") return self if hasattr(self, args[0]): raise ValueError( - f"Attribute {args[0]} conflicts with another attribute of type {self.__class__.__name__}." - ) + f"Attribute {args[0]} conflicts with another attribute of type {self.__class__.__name__}.") self._dict[args[0]] = args[1] # if kwargs: for name, value in kwargs.items(): if not is_basic_type(value): - env.logger.warning( - f"Failed to set attribute: {value} is or contains unsupported data type." - ) + env.logger.warning(f"Failed to set attribute: {value} is or contains unsupported data type.") return self if hasattr(self, name): raise ValueError( - f"Attribute {name} conflicts with another attribute of {value.__class__.__name__}." - ) + f"Attribute {name} conflicts with another attribute of {value.__class__.__name__}.") self._dict.update(kwargs) return self @@ -139,8 +129,7 @@ def __getattr__(self, name): except Exception as e: # if name in self._dict: # return self._dict.get(name) - raise AttributeError( - f"{self.__class__.__name__} object has no attribute {name}") from e + raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}") from e def target_exists(self, mode="any"): # mode should be 'any', 'target', or 'signature' @@ -169,8 +158,7 @@ def validate(self, sig): return self.target_signature() == sig def __eq__(self, obj): - return (isinstance(obj, self.__class__) and - self.target_signature() == obj.target_signature()) + return (isinstance(obj, self.__class__) and self.target_signature() == obj.target_signature()) class sos_variable(BaseTarget): @@ -306,8 +294,7 @@ def target_signature(self, mode="any"): return textMD5(f"sos_step({self._step_name})") def __eq__(self, other): - return isinstance(other, - sos_step) and self._step_name == other._step_name + return isinstance(other, sos_step) and self._step_name == other._step_name def __hash__(self): return hash(repr(self)) @@ -337,8 +324,7 @@ def target_signature(self, mode="any"): return textMD5(f"named_output({self._output_name})") def __eq__(self, other): - return (isinstance(other, named_output) and - self._output_name == other._output_name) + return (isinstance(other, named_output) and self._output_name == other._output_name) def __hash__(self): return hash(repr(self)) @@ -363,18 +349,12 @@ def target_signature(self, mode="any"): def resolve(self): if isinstance(self._target, str): - return sorted( - [x for x in glob.glob(self._target) if os.path.isfile(x)]) - if isinstance(self._target, Sequence) and all( - isinstance(x, str) for x in self._target): - return sorted( - sum( - [[x - for x in glob.glob(t) - if os.path.isfile(x)] - for t in self._target], - [], - )) + return sorted([x for x in glob.glob(self._target) if os.path.isfile(x)]) + if isinstance(self._target, Sequence) and all(isinstance(x, str) for x in self._target): + return sorted(sum( + [[x for x in glob.glob(t) if os.path.isfile(x)] for t in self._target], + [], + )) return self._target def __format__(self, format_spec): @@ -382,67 +362,6 @@ def __format__(self, format_spec): return str(self).__format__(format_spec) -class remote(BaseTarget): - """A remote target is not tracked and not translated during task execution""" - - def __init__(self, *targets, host=None): - super().__init__() - self.__unresolvable_object__ = True - self._host = host - if len(targets) == 1: - self._target = targets[0] - else: - # multi-item targets - self._target = targets - if isinstance(self._target, - Sequence) and not isinstance(self._target, str): - self.__flattenable__ = True - - def target_name(self): - if isinstance(self._target, str): - return file_target(self._target).target_name() - if isinstance(self._target, BaseTarget): - return self._target.target_name() - return repr(self._target) - - def target_exists(self, mode="any"): - if not self._host and not env.config["default_queue"]: - return True - try: - from .hosts import Host - - h = Host(self._host if self._host else env.config["default_queue"]) - return h.target_exists(sos_targets(self._target)) - except Exception as e: - env.logger.debug( - f'Failed to check existence of {self._target} on {self._host if self._host else env.config["default_queue"]}: {e}' - ) - return True - - def target_signature(self, mode="any"): - if not self._host and not env.config["default_queue"]: - return textMD5(self.target_name()) - try: - from .hosts import Host - - h = Host(self._host if self._host else env.config["default_queue"]) - return h.target_signature(sos_targets(self._target)) - except Exception as e: - env.logger.debug( - f'Failed to check existence of {self._target} on {self._host if self._host else env.config["default_queue"]}: {e}' - ) - return textMD5(self.target_name()) - - def resolve(self): - return self._target - - def flatten(self): - return [remote(x) for x in self._target] - - def __format__(self, format_spec): - return str(self).__format__(format_spec) - - class executable(BaseTarget): """A target for an executable command.""" @@ -458,19 +377,14 @@ def __init__(self, cmd, version=None): self._version = tuple(version) def __eq__(self, other): - return (isinstance(other, executable) and self._cmd == other._cmd and - self._version == other._version) + return (isinstance(other, executable) and self._cmd == other._cmd and self._version == other._version) def target_exists(self, mode="any"): - if mode in ("any", "target") and shutil.which( - shlex.split(self._cmd)[0]): + if mode in ("any", "target") and shutil.which(shlex.split(self._cmd)[0]): if self._version: try: output = subprocess.check_output( - self._cmd, - stderr=subprocess.STDOUT, - shell=True, - timeout=5).decode() + self._cmd, stderr=subprocess.STDOUT, shell=True, timeout=5).decode() except subprocess.TimeoutExpired as e: env.logger.warning(str(e)) return False @@ -533,11 +447,10 @@ class path(type(Path())): lambda x: os.path.splitext(x)[0], "x": lambda x: os.path.splitext(x)[1], - "q": (lambda x: subprocess.list2cmdline([x])) - if sys.platform == "win32" else quote, + "q": (lambda x: subprocess.list2cmdline([x])) if sys.platform == "win32" else quote, "p": - lambda x: ("/" if len(x) > 1 and x[1] == ":" else "") + x.replace( - "\\", "/").replace(":/", "/").replace(":", "/"), + lambda x: + ("/" if len(x) > 1 and x[1] == ":" else "") + x.replace("\\", "/").replace(":/", "/").replace(":", "/"), "r": repr, "s": @@ -567,16 +480,12 @@ def is_relative_to(self, *other): def names(host=None): if host is None: if "__host__" not in env.sos_dict: - env.logger.debug( - 'Incomplete sos environment: missing __host__ definition, assuming "localhost".' - ) + env.logger.debug('Incomplete sos environment: missing __host__ definition, assuming "localhost".') host = env.sos_dict.get("__host__", "localhost") if "CONFIG" not in env.sos_dict or "hosts" not in env.sos_dict["CONFIG"]: - raise RuntimeError( - "Incomplete sos environment: missing hosts definition.") + raise RuntimeError("Incomplete sos environment: missing hosts definition.") if host not in env.sos_dict["CONFIG"]["hosts"]: - raise RuntimeError( - "Incomplete sos environment: undefined host {host}") + raise RuntimeError("Incomplete sos environment: undefined host {host}") if "paths" not in env.sos_dict["CONFIG"]["hosts"][host]: return [] return list(get_config(["hosts", host, "paths"]).keys()) @@ -595,38 +504,24 @@ def to_named_path(self, host=None): return self # p = self if self.is_absolute() else self.resolve() - host = env.sos_dict.get("__host__", - "localhost") if host is None else host - cfg = get_config( - "hosts", - host, - expected_type=dict, - allowed_keys=["shared", "paths"]) + host = env.sos_dict.get("__host__", "localhost") if host is None else host + cfg = get_config("hosts", host, expected_type=dict, allowed_keys=["shared", "paths"]) if "paths" not in cfg: raise ValueError("No path is defined in host defintion.") - relative_paths = [(name, path) - for name, path in cfg["paths"].items() - if p.is_relative_to(path)] + relative_paths = [(name, path) for name, path in cfg["paths"].items() if p.is_relative_to(path)] if not relative_paths: - raise ValueError( - f"{self} is not relative to any of the pre-defined paths for host {host}." - ) + raise ValueError(f"{self} is not relative to any of the pre-defined paths for host {host}.") if len(relative_paths) > 1: - env.logger.debug( - f"{self} is relative to more than one pre-defined paths for host {host}" - ) + env.logger.debug(f"{self} is relative to more than one pre-defined paths for host {host}") max_length = max(len(str(x[1])) for x in relative_paths) - relative_paths = [ - x for x in relative_paths if len(str(x[1])) == max_length - ] + relative_paths = [x for x in relative_paths if len(str(x[1])) == max_length] # return the anchored related = str(p)[len(relative_paths[0][1]):] if related and not related.startswith("/"): related = "/" + related return "#" + relative_paths[0][0] + related except Exception as e: - raise ValueError( - f"Failed to relate {self} with any of the named paths: {e}") from e + raise ValueError(f"Failed to relate {self} with any of the named paths: {e}") from e def expandname(self, host=None): if not self._parts or self._parts[0][:1] != "#": @@ -634,23 +529,19 @@ def expandname(self, host=None): try: # this is the case for task execution where paths is directly specified in # _runtime. - if "_runtime" in env.sos_dict and "localhost" in env.sos_dict[ - "_runtime"]: + if "_runtime" in env.sos_dict and "localhost" in env.sos_dict["_runtime"]: cfg = env.sos_dict["_runtime"]["localhost"] # this is the case for the main program, or when the task is executed else: cfg = get_config( "hosts", - env.sos_dict.get("__host__", "localhost") - if host is None else host, + env.sos_dict.get("__host__", "localhost") if host is None else host, expected_type=dict, ) try: - return self._from_parts([cfg["paths"][self._parts[0][1:]]] + - self._parts[1:]) + return self._from_parts([cfg["paths"][self._parts[0][1:]]] + self._parts[1:]) except KeyError: - return self._from_parts([cfg["shared"][self._parts[0][1:]]] + - self._parts[1:]) + return self._from_parts([cfg["shared"][self._parts[0][1:]]] + self._parts[1:]) except Exception as e: # if self._parts[0] == '#cwd': # return self._from_parts( @@ -661,27 +552,20 @@ def expandname(self, host=None): # raise RuntimeError( # "Incomplete sos environment: missing __host__ definition." # ) - if "CONFIG" not in env.sos_dict or "hosts" not in env.sos_dict[ - "CONFIG"]: - raise RuntimeError( - "Incomplete sos environment: missing hosts definition.") from e + if "CONFIG" not in env.sos_dict or "hosts" not in env.sos_dict["CONFIG"]: + raise RuntimeError("Incomplete sos environment: missing hosts definition.") from e if host is not None and host not in env.sos_dict["CONFIG"]["hosts"]: + raise RuntimeError(f"Incomplete sos environment: undefined host {host}") from e + if (env.sos_dict.get("__host__", "localhost") not in env.sos_dict["CONFIG"]["hosts"]): raise RuntimeError( - f"Incomplete sos environment: undefined host {host}") from e - if (env.sos_dict.get("__host__", "localhost") - not in env.sos_dict["CONFIG"]["hosts"]): - raise RuntimeError( - f'Incomplete sos environment: undefined host {env.sos_dict.get("__host__", "locahost")}' - ) from e - if ("paths" not in env.sos_dict["CONFIG"]["hosts"][env.sos_dict.get( - "__host__", "localhost")]): + f'Incomplete sos environment: undefined host {env.sos_dict.get("__host__", "locahost")}') from e + if ("paths" not in env.sos_dict["CONFIG"]["hosts"][env.sos_dict.get("__host__", "localhost")]): raise RuntimeError( f'Incomplete sos environment: paths not defined for host {env.sos_dict.get("__host__", "localhost")}' ) from e name = self._parts[0][1:] if (name not in env.sos_dict["CONFIG"]["hosts"] - [env.sos_dict.get("__host__", "localhost" - ) if host is None else host]["paths"]): + [env.sos_dict.get("__host__", "localhost") if host is None else host]["paths"]): raise ValueError( f'Named path "{name}" not defined for host {env.sos_dict.get("__host__", "localhost") if host is None else host}' ) from e @@ -691,15 +575,12 @@ def fullname(self): def __eq__(self, other): return os.path.abspath(self.fullname()) == os.path.abspath( - (other - if isinstance(other, file_target) else path(other)).fullname()) + (other if isinstance(other, file_target) else path(other)).fullname()) def __add__(self, part): if isinstance(part, (str, path)): return self.__class__(str(self) + str(part)) - raise ValueError( - f"Cannot concatenate path to {part} of type {type(part).__name__}: expect a string or path" - ) + raise ValueError(f"Cannot concatenate path to {part} of type {type(part).__name__}: expect a string or path") def __format__(self, format_spec): # handling special !q conversion flag @@ -725,9 +606,7 @@ def zap(self): if not self.exists() or not self.is_file(): raise FileNotFoundError(str(self)) with open(zap_file, "w") as md5: - md5.write( - f"{self.resolve()}\t{os.path.getmtime(self)}\t{os.path.getsize(self)}\t{fileMD5(self)}\n" - ) + md5.write(f"{self.resolve()}\t{os.path.getmtime(self)}\t{os.path.getsize(self)}\t{fileMD5(self)}\n") self.unlink() @@ -748,13 +627,10 @@ def _init(self, template=None): def create_placeholder(self): # create an empty placeholder file - if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config[ - "SOS_DEBUG"]: + if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config["SOS_DEBUG"]: env.log_to_file("TARGET", f"Create placeholder target {self}") self.touch() - send_message_to_controller( - ["workflow_sig", "placeholder", "file_target", - str(self)]) + send_message_to_controller(["workflow_sig", "placeholder", "file_target", str(self)]) def target_exists(self, mode="any"): try: @@ -786,8 +662,7 @@ def target_signature(self): if self.exists(): if full_md5: md5_file = self + '.md5' - if not md5_file.exists( - ) or os.path.getmtime(self) > os.path.getmtime(md5_file): + if not md5_file.exists() or os.path.getmtime(self) > os.path.getmtime(md5_file): self._md5, md5 = fileMD5(self, sig_type='both') # write md5 file with open(md5_file, 'w') as mfile: @@ -807,8 +682,7 @@ def target_signature(self): def sig_file(self): # self.resolve() does not resolve non-existing file and cannot be used here return os.path.join( - os.path.expanduser('~'), '.sos', 'signatures', - f"{textMD5(os.path.abspath(self))}.file_info") + os.path.expanduser('~'), '.sos', 'signatures', f"{textMD5(os.path.abspath(self))}.file_info") def validate(self, sig=None): """Check if file matches its signature""" @@ -845,9 +719,7 @@ def write_sig(self): if not self._md5: self._md5 = fileMD5(self) with open(self.sig_file(), "w") as sig: - sig.write( - f"{os.path.getmtime(self)}\t{os.path.getsize(self)}\t{self._md5}" - ) + sig.write(f"{os.path.getmtime(self)}\t{os.path.getsize(self)}\t{self._md5}") def __hash__(self): return hash(repr(self)) @@ -857,8 +729,7 @@ def __fspath__(self): return super().__fspath__() def __eq__(self, obj): - return isinstance( - obj, file_target) and os.path.abspath(self) == os.path.abspath(obj) + return isinstance(obj, file_target) and os.path.abspath(self) == os.path.abspath(obj) def __deepcopy__(self, memo): ft = file_target(self) @@ -881,8 +752,7 @@ class sos_tempfile(file_target): def __new__(cls, path=None, name=None, suffix=None, prefix=None, dir=None): if cls is Path: cls = WindowsPath if os.name == "nt" else PosixPath - filename = request_answer_from_controller( - ["sos_tempfile", path, name, suffix, prefix, dir]) + filename = request_answer_from_controller(["sos_tempfile", path, name, suffix, prefix, dir]) return cls._from_parts([filename]) @@ -899,8 +769,7 @@ def __init__(self, *args): for t in self._paths: if isinstance(t, paths): - raise RuntimeError( - f"Nested paths {t} were introduced by {args}") + raise RuntimeError(f"Nested paths {t} were introduced by {args}") if not isinstance(t, path): raise RuntimeError(f"Unrecognized path {t}") @@ -921,9 +790,7 @@ def __append__(self, arg): self._paths.append(path(arg)) elif isinstance(arg, sos_targets): if not all(isinstance(x, file_target) for x in arg._targets): - raise ValueError( - f"Cannot convert a sos_targets object {arg} with non-file target to paths" - ) + raise ValueError(f"Cannot convert a sos_targets object {arg} with non-file target to paths") self._paths.extend([path(str(x)) for x in arg._targets]) elif isinstance(arg, file_target): self._paths.append(path(str(arg))) @@ -954,9 +821,7 @@ def __fspath__(self): return self._paths[0].__fspath__() if not self._paths: raise ValueError("Cannot treat an empty paths as single path") - raise ValueError( - f"Cannot treat an paths object {self} with more than one paths as a single path" - ) + raise ValueError(f"Cannot treat an paths object {self} with more than one paths as a single path") def __format__(self, format_spec): if "," in format_spec: @@ -971,18 +836,14 @@ def __getattr__(self, key): if len(self._paths) == 1: return getattr(self._paths[0], key) if len(self._paths) == 0: - raise AttributeError( - f"Cannot get attribute {key} from empty target list") - raise AttributeError( - f"Cannot get attribute {key} from group of {len(self)} targets {self!r}" - ) + raise AttributeError(f"Cannot get attribute {key} from empty target list") + raise AttributeError(f"Cannot get attribute {key} from group of {len(self)} targets {self!r}") def __hash__(self): return hash(repr(self)) def __eq__(self, other): - return self._paths == other._paths if isinstance(other, - paths) else other + return self._paths == other._paths if isinstance(other, paths) else other def __repr__(self): return "[" + ", ".join(repr(x) for x in self._paths) + "]" @@ -1012,10 +873,8 @@ def __init__(self, indexes, labels=None, parent=None): def add_last(self, n, parent): # add the last n elements of parent to group # this has to be called after the elements have been appended - self._indexes.extend( - range(len(parent._targets) - n, len(parent._targets))) - self._labels.extend(parent._labels[len(parent._targets) - - n:len(parent._targets)]) + self._indexes.extend(range(len(parent._targets) - n, len(parent._targets))) + self._labels.extend(parent._labels[len(parent._targets) - n:len(parent._targets)]) return self def extend(self, grp, start, parent): @@ -1035,11 +894,7 @@ def idx_to_targets(self, parent): return ret def __getstate__(self): - return { - 'indexes': self._indexes, - 'labels': self._labels, - 'properties': self._dict - } + return {'indexes': self._indexes, 'labels': self._labels, 'properties': self._dict} def __setstate__(self, sdict): self._indexes = sdict["indexes"] @@ -1078,15 +933,12 @@ def __init__( else: self._undetermined = not bool(args) for arg in args: - self.__append__( - arg, default_source=_source, verify_existence=_verify_existence) + self.__append__(arg, default_source=_source, verify_existence=_verify_existence) for src, value in kwargs.items(): - self.__append__( - value, source=src, verify_existence=_verify_existence) + self.__append__(value, source=src, verify_existence=_verify_existence) for t in self._targets: if isinstance(t, sos_targets): - raise RuntimeError( - f"Nested sos_targets {t} were introduced by {args}") + raise RuntimeError(f"Nested sos_targets {t} were introduced by {args}") if not isinstance(t, BaseTarget): raise RuntimeError(f"Unrecognized target {t}") if _verify_existence: @@ -1127,11 +979,7 @@ def touch(self): def valid(self): return self._targets or self._undetermined is False - def __append__(self, - arg, - source="", - default_source="", - verify_existence=False): + def __append__(self, arg, source="", default_source="", verify_existence=False): src = source if source else default_source if isinstance(arg, paths): self._targets.extend([file_target(x) for x in arg._paths]) @@ -1185,8 +1033,7 @@ def __append__(self, for t in list(arg): self.__append__(t, source=src) elif arg is not None: - raise RuntimeError( - f"Unrecognized targets {arg} of type {arg.__class__.__name__}") + raise RuntimeError(f"Unrecognized targets {arg} of type {arg.__class__.__name__}") def set_labels(self, source): if isinstance(source, str): @@ -1194,16 +1041,13 @@ def set_labels(self, source): elif len(source) == len(self._targets): self._labels = source else: - raise ValueError( - f"Invalid source {source} for sos_target with {len(self)} targets." - ) + raise ValueError(f"Invalid source {source} for sos_target with {len(self)} targets.") labels = property(lambda self: self._labels, set_labels) targets = property(lambda self: self._targets) - groups = property( - lambda self: [x.idx_to_targets(self) for x in self._groups]) + groups = property(lambda self: [x.idx_to_targets(self) for x in self._groups]) def _get_group(self, index): return self._groups[index].idx_to_targets(self) @@ -1222,8 +1066,7 @@ def later_than(self, other): if not file_lhs: return False # now we have both. - return min(x.stat().st_mtime for x in file_lhs) > max( - x.stat().st_mtime for x in file_rhs) + return min(x.stat().st_mtime for x in file_lhs) > max(x.stat().st_mtime for x in file_rhs) def extend(self, another, source="", keep_groups=False): if isinstance(another, sos_targets): @@ -1253,15 +1096,11 @@ def extend(self, another, source="", keep_groups=False): if arg._groups: # if source is specified, it will override labels of all groups if not self._groups: - self._groups = [ - _sos_group(range(n_old), parent=self) for g in arg._groups - ] + self._groups = [_sos_group(range(n_old), parent=self) for g in arg._groups] if len(self._groups) == 1 and len(arg._groups) > 1: # 1 vs more, we duplicate itself self._groups = [ - _sos_group( - self._groups[0]._indexes, - self._groups[0]._labels).set(**self._groups[0]._dict) + _sos_group(self._groups[0]._indexes, self._groups[0]._labels).set(**self._groups[0]._dict) for ag in arg._groups ] for g, ag in zip(self._groups, arg._groups): @@ -1342,9 +1181,7 @@ def select(self, i): if isinstance(i, str): ret = sos_targets() ret._undetermined = self._undetermined - ret._targets = [ - x for x, y in zip(self._targets, self._labels) if y == i - ] + ret._targets = [x for x, y in zip(self._targets, self._labels) if y == i] index_map = { o_idx: n_idx for n_idx, o_idx in zip( range(len(ret._targets)), @@ -1356,11 +1193,7 @@ def select(self, i): for grp in self._groups: ret._groups.append( _sos_group( - [ - index_map[x] - for x, y in zip(grp._indexes, grp._labels) - if y == i - ], + [index_map[x] for x, y in zip(grp._indexes, grp._labels) if y == i], labels=i, ).set(**grp._dict)) return ret @@ -1382,27 +1215,19 @@ def select(self, i): ret._groups = [] if not self._groups: return ret - index_map = { - o_idx: n_idx - for n_idx, o_idx in zip(range(len(ret._targets)), kept) - } + index_map = {o_idx: n_idx for n_idx, o_idx in zip(range(len(ret._targets)), kept)} kept = set(kept) for grp in self._groups: ret._groups.append( _sos_group( [index_map[x] for x in grp._indexes if x in kept], - [ - y for x, y in zip(grp._indexes, grp._labels) - if x in kept - ], + [y for x, y in zip(grp._indexes, grp._labels) if x in kept], ).set(**grp._dict)) return ret ret = sos_targets() ret._undetermined = self._undetermined - ret._targets = ([self._targets[i]] - if isinstance(i, int) else self._targets[i]) - ret._labels = [self._labels[i]] if isinstance( - i, int) else self._labels[i] + ret._targets = ([self._targets[i]] if isinstance(i, int) else self._targets[i]) + ret._labels = [self._labels[i]] if isinstance(i, int) else self._labels[i] ret._groups = [] return ret @@ -1410,9 +1235,7 @@ def __getitem__(self, i): if isinstance(i, str): ret = sos_targets() ret._undetermined = self._undetermined - ret._targets = [ - x for x, y in zip(self._targets, self._labels) if y == i - ] + ret._targets = [x for x, y in zip(self._targets, self._labels) if y == i] index_map = { o_idx: n_idx for n_idx, o_idx in zip( range(len(ret._targets)), @@ -1424,11 +1247,7 @@ def __getitem__(self, i): for grp in self._groups: ret._groups.append( _sos_group( - [ - index_map[x] - for x, y in zip(grp._indexes, grp._labels) - if y == i - ], + [index_map[x] for x, y in zip(grp._indexes, grp._labels) if y == i], labels=i, ).set(**grp._dict)) if not ret._targets: @@ -1437,14 +1256,11 @@ def __getitem__(self, i): return self._targets[i] def target_signature(self): - return tuple((x.target_signature(), y) - for x, y in zip(self._targets, self._labels)) + return tuple((x.target_signature(), y) for x, y in zip(self._targets, self._labels)) def validate(self, sig): return (isinstance(sig, tuple) and len(sig) == len(self._targets) and - all( - x.validate(sig[0]) and src == sig[1] - for x, src, sig in zip(self._targets, self._labels, sig))) + all(x.validate(sig[0]) and src == sig[1] for x, src, sig in zip(self._targets, self._labels, sig))) def target_exists(self, mode="any"): return all(x.target_exists(mode) for x in self._targets) @@ -1458,12 +1274,10 @@ def __getattr__(self, name): return getattr(self._targets[0], name) except Exception as e: raise AttributeError( - f"{self.__class__.__name__} object or its first child has no attribute {name}" - ) from e + f"{self.__class__.__name__} object or its first child has no attribute {name}") from e else: raise AttributeError( - f"{self.__class__.__name__} object of length {len(self)} has no attribute {name}" - ) from e + f"{self.__class__.__name__} object of length {len(self)} has no attribute {name}") from e def target_name(self): return f"sos_targets([{','.join(x.target_name() for x in self._targets)}],_labels=[{','.join(self._labels)}])" @@ -1479,11 +1293,8 @@ def _dedup(self): def paired_with(self, name, properties): # can pair with sos_targets - if not isinstance(properties, - sos_targets) and not is_basic_type(properties): - env.logger.warning( - f'Failed to paired_with with value "{properties}" as it contains unsupported data type' - ) + if not isinstance(properties, sos_targets) and not is_basic_type(properties): + env.logger.warning(f'Failed to paired_with with value "{properties}" as it contains unsupported data type') return self if isinstance(properties, (bool, int, float, str, bytes)): for target in self._targets: @@ -1496,28 +1307,20 @@ def paired_with(self, name, properties): for target, property in zip(self._targets, properties): target.set(name, property) else: - raise ValueError( - f"Unacceptable properties {properties} for function paired_with" - ) + raise ValueError(f"Unacceptable properties {properties} for function paired_with") return self def remove_targets(self, type, kept=None): """Remove targets of certain type""" if kept is None: - kept = [ - i for i, x in enumerate(self._targets) - if not isinstance(x, type) - ] + kept = [i for i, x in enumerate(self._targets) if not isinstance(x, type)] if len(kept) == len(self._targets): return self self._targets = [self._targets[x] for x in kept] self._labels = [self._labels[x] for x in kept] if not self._groups: return self - index_map = { - o_idx: n_idx - for n_idx, o_idx in zip(range(len(self._targets)), kept) - } + index_map = {o_idx: n_idx for n_idx, o_idx in zip(range(len(self._targets)), kept)} kept = set(kept) for idx, grp in enumerate(self._groups): self._groups[idx] = _sos_group( @@ -1526,23 +1329,12 @@ def remove_targets(self, type, kept=None): ).set(**grp._dict) return self - def resolve_remote(self): - """If target is of remote type, resolve it""" - for idx, target in enumerate(self._targets): - if isinstance(target, remote): - resolved = target.resolve() - if isinstance(resolved, str): - resolved = interpolate(resolved, env.sos_dict.dict()) - self._targets[idx] = file_target(resolved).set(**target._dict) - return self def group_with(self, name, properties): if not self._groups: self._group(by="all") if not is_basic_type(properties): - env.logger.warning( - f"Failed to set {properties} as it is or contains unsupported data type" - ) + env.logger.warning(f"Failed to set {properties} as it is or contains unsupported data type") return self if isinstance(properties, (bool, int, float, str, bytes)): for group in self._groups: @@ -1556,8 +1348,7 @@ def group_with(self, name, properties): group.set(name, property) else: raise ValueError( - f"Unacceptable properties {properties} of type {properties.__class__.__name__} for function group_with" - ) + f"Unacceptable properties {properties} of type {properties.__class__.__name__} for function group_with") return self def get(self, name, default=None): @@ -1573,16 +1364,12 @@ def _add_groups(self, grps): for grp in grps: if not isinstance(grp, sos_targets): raise RuntimeError( - f"_output should be of type sos_targets: {grp} of type {grp.__class__.__name__} returned." - ) + f"_output should be of type sos_targets: {grp} of type {grp.__class__.__name__} returned.") start_idx = len(self._targets) grp_size = len(grp) self._targets.extend(grp._targets) self._labels.extend(grp._labels) - self._groups.append( - _sos_group( - range(start_idx, start_idx + grp_size), - labels=grp._labels).set(**grp._dict)) + self._groups.append(_sos_group(range(start_idx, start_idx + grp_size), labels=grp._labels).set(**grp._dict)) # in theory the groups should not overlap but in rare cases when # output is for example dynamic, they could overlap. return self._dedup() @@ -1595,8 +1382,7 @@ def _duplicate_groups(self, n): n_grps = len(self._groups) for _ in range(n - 1): for grp in self._groups[:n_grps]: - self._groups.append( - _sos_group(grp._indexes, grp._labels).set(**grp._dict)) + self._groups.append(_sos_group(grp._indexes, grp._labels).set(**grp._dict)) return self def _num_groups(self): @@ -1610,14 +1396,11 @@ def _group(self, by): self._groups = [] if by == "single": - self._groups = [ - _sos_group([x], parent=self) for x in range(len(self)) - ] + self._groups = [_sos_group([x], parent=self) for x in range(len(self))] elif by == "all": # default option self._groups = [_sos_group(range(len(self)), self._labels)] - elif isinstance(by, str) and (by.startswith("pairsource") or - by.startswith("pairlabel")): + elif isinstance(by, str) and (by.startswith("pairsource") or by.startswith("pairlabel")): labels = list(dict.fromkeys(self.labels)) if len(labels) == 1: raise ValueError("Cannot pairlabel input with a single label.") @@ -1633,15 +1416,11 @@ def _group(self, by): raise ValueError(f"Invalid pairsource option {by}") from e src_sizes = {s: self.labels.count(s) for s in labels} if max(src_sizes.values()) % grp_size != 0: - raise ValueError( - f"Cannot use group size {grp_size} (option {by}) for source of size {src_sizes}" - ) + raise ValueError(f"Cannot use group size {grp_size} (option {by}) for source of size {src_sizes}") n_groups = max(src_sizes.values()) // grp_size indexes = [[] for x in range(n_groups)] for s in labels: - lookup = [ - idx for idx, src in enumerate(self.labels) if src == s - ] + lookup = [idx for idx, src in enumerate(self.labels) if src == s] if src_sizes[s] > n_groups and src_sizes[s] % n_groups == 0: gs = src_sizes[s] // n_groups for i in range(n_groups): @@ -1650,20 +1429,13 @@ def _group(self, by): elif n_groups >= src_sizes[s] and n_groups % src_sizes[s] == 0: for i in range(n_groups): # (0 ), (0, ), (1, ), (1, ) ... - indexes[i].append(lookup[i // - (n_groups // src_sizes[s])]) + indexes[i].append(lookup[i // (n_groups // src_sizes[s])]) else: - raise ValueError( - f'Cannot use group size {grp_size} (by="{by}") for source of size {src_sizes}' - ) - self._groups = [ - _sos_group(indexes[x], parent=self) for x in range(n_groups) - ] + raise ValueError(f'Cannot use group size {grp_size} (by="{by}") for source of size {src_sizes}') + self._groups = [_sos_group(indexes[x], parent=self) for x in range(n_groups)] elif isinstance(by, str) and by.startswith("pairs"): if len(self) % 2 != 0: - raise ValueError( - f"Paired by has to have even number of input files: {len(self)} provided" - ) + raise ValueError(f"Paired by has to have even number of input files: {len(self)} provided") if by == "pairs": grp_size = 1 else: @@ -1673,19 +1445,16 @@ def _group(self, by): raise ValueError(f"Invalid pairs option {by}") from e if grp_size == 1: self._groups = [ - _sos_group(x, parent=self) for x in zip( - range(0, - len(self) // 2), range(len(self) // 2, len(self))) + _sos_group(x, parent=self) for x in zip(range(0, + len(self) // 2), range(len(self) // 2, len(self))) ] else: if len(self) % grp_size != 0: raise ValueError( - f"Paired by with group size {grp_size} is not possible with input of size {len(self)}" - ) + f"Paired by with group size {grp_size} is not possible with input of size {len(self)}") self._groups = [ _sos_group( - list(range(x[0], x[0] + grp_size)) + - list(range(x[1], x[1] + grp_size)), + list(range(x[0], x[0] + grp_size)) + list(range(x[1], x[1] + grp_size)), parent=self, ) for x in zip( range(0, @@ -1708,14 +1477,13 @@ def _group(self, by): else: if len(self) % grp_size != 0: raise ValueError( - f"Paired by with group size {grp_size} is not possible with input of size {len(self)}" - ) + f"Paired by with group size {grp_size} is not possible with input of size {len(self)}") f1, f2 = tee(range(len(self) // grp_size)) next(f2, None) self._groups = [ _sos_group( - list(range(x[0] * grp_size, (x[0] + 1) * grp_size)) + - list(range(x[1] * grp_size, (x[1] + 1) * grp_size)), + list(range(x[0] * grp_size, + (x[0] + 1) * grp_size)) + list(range(x[1] * grp_size, (x[1] + 1) * grp_size)), parent=self, ) for x in zip(f1, f2) ] @@ -1727,15 +1495,11 @@ def _group(self, by): grp_size = int(by[12:]) except Exception as e: raise ValueError(f"Invalid pairs option {by}") from e - self._groups = [ - _sos_group(x, parent=self) - for x in combinations(range(len(self)), grp_size) - ] + self._groups = [_sos_group(x, parent=self) for x in combinations(range(len(self)), grp_size)] elif by == "source" or by == "label": labels = list(dict.fromkeys(self.labels)) self._groups = [ - _sos_group([i for i, x in enumerate(self._labels) if x == src], - parent=self) for src in labels + _sos_group([i for i, x in enumerate(self._labels) if x == src], parent=self) for src in labels ] elif isinstance(by, int) or (isinstance(by, str) and by.isdigit()): by = int(by) @@ -1744,12 +1508,8 @@ def _group(self, by): f"Number of samples ({len(self)}) is not a multiple of by ({by}). The last group would have less files than the other groups." ) if by < 1: - raise ValueError( - "Value of paramter by should be a positive number.") - self._groups = [ - _sos_group(range(i, min(i + by, len(self))), parent=self) - for i in range(0, len(self), by) - ] + raise ValueError("Value of paramter by should be a positive number.") + self._groups = [_sos_group(range(i, min(i + by, len(self))), parent=self) for i in range(0, len(self), by)] elif callable(by): try: self._groups = [] @@ -1762,28 +1522,22 @@ def _group(self, by): ) from e _target_index = None for grp in by(self): - if isinstance(grp, Sequence) and all( - isinstance(x, int) for x in grp): + if isinstance(grp, Sequence) and all(isinstance(x, int) for x in grp): if any(x < 0 or x >= len(self._targets) for x in grp): - raise ValueError( - f"Index out of range (< {len(self._targets)}): {grp}" - ) + raise ValueError(f"Index out of range (< {len(self._targets)}): {grp}") self._groups.append(_sos_group(grp, parent=self)) else: index = [] for x in sos_targets(grp): try: if _target_index is None: - _target_index = {v:k for k,v in enumerate(self._targets)} + _target_index = {v: k for k, v in enumerate(self._targets)} index.append(_target_index[x]) except Exception as e: - raise ValueError( - f"Returned target is not one of the targets. {x}" - ) from e + raise ValueError(f"Returned target is not one of the targets. {x}") from e self._groups.append(_sos_group(index, parent=self)) except Exception as e: - raise ValueError( - f"Failed to apply customized grouping method: {e}") from e + raise ValueError(f"Failed to apply customized grouping method: {e}") from e else: raise ValueError(f"Unsupported by option ``{by}``!") return self @@ -1808,16 +1562,14 @@ def _handle_paired_with(self, paired_with): try: var_name = ["_" + x for x in paired_with] except Exception as e: - raise ValueError( - f"Invalid value for option paired_with {paired_with}") from e + raise ValueError(f"Invalid value for option paired_with {paired_with}") from e var_value = [] for vn in var_name: if vn[1:] not in env.sos_dict: raise ValueError(f"Variable {vn[1:]} does not exist.") var_value.append(env.sos_dict[vn[1:]]) else: - raise ValueError( - f"Unacceptable value for parameter paired_with: {paired_with}") + raise ValueError(f"Unacceptable value for parameter paired_with: {paired_with}") # for vn, vv in zip(var_name, var_value): # set paired with values to step_input @@ -1843,16 +1595,14 @@ def _handle_group_with(self, group_with): try: var_name = ["_" + x for x in group_with] except Exception as e: - raise ValueError( - f"Invalud value for option group_with {group_with}") from e + raise ValueError(f"Invalud value for option group_with {group_with}") from e var_value = [] for vn in var_name: if vn[1:] not in env.sos_dict: raise ValueError(f"Variable {vn[1:]} does not exist.") var_value.append(env.sos_dict[vn[1:]]) else: - raise ValueError( - f"Unacceptable value for parameter group_with: {group_with}") + raise ValueError(f"Unacceptable value for parameter group_with: {group_with}") # for vn, vv in zip(var_name, var_value): self.group_with(vn, vv) @@ -1866,8 +1616,7 @@ def _handle_extract_pattern(self, pattern): elif isinstance(pattern, Iterable): patterns = pattern else: - raise ValueError( - f"Unacceptable value for parameter pattern: {pattern}") + raise ValueError(f"Unacceptable value for parameter pattern: {pattern}") # for pattern in patterns: res = extract_pattern(pattern, self._targets) @@ -1885,20 +1634,14 @@ def _handle_for_each(self, for_each): if all(isinstance(x, dict) for x in for_each): keys = [tuple(sorted(x.keys())) for x in for_each] # the keys should be all the same, or all different. - if len(set(keys)) == 1 and all( - isinstance(x, str) for x in keys[0]): + if len(set(keys)) == 1 and all(isinstance(x, str) for x in keys[0]): # this is a special case for specified contexts. #1403 - for_each = [{ - ",".join(keys[0]): - [[x[key] for key in keys[0]] for x in for_each] - }] + for_each = [{",".join(keys[0]): [[x[key] for key in keys[0]] for x in for_each]}] elif len(set(keys)) != len(keys): raise ValueError( - "List of dictionaries for parameter for_each should have all different, or all the same keys." - ) + "List of dictionaries for parameter for_each should have all different, or all the same keys.") else: - raise ValueError( - f"Unacceptable value for parameter for_each: {for_each}") + raise ValueError(f"Unacceptable value for parameter for_each: {for_each}") # for fe_all in for_each: if isinstance(fe_all, dict): @@ -1912,8 +1655,7 @@ def _handle_for_each(self, for_each): v = list(v) if any(len(_v) != len(names) for _v in v): raise ValueError( - f"Unable to unpack object {short_repr(v)} for variables {k} (of length {len(names)})" - ) + f"Unable to unpack object {short_repr(v)} for variables {k} (of length {len(names)})") fe_iter_names.extend(names) fe_values.extend(list(zip(*v))) else: @@ -1936,9 +1678,7 @@ def _handle_for_each(self, for_each): if name.split(".")[0] not in env.sos_dict: raise ValueError(f"Variable {name} does not exist.") if "." in name: - fe_values.append( - getattr(env.sos_dict[name.split(".")[0]], - name.split(".", 1)[-1])) + fe_values.append(getattr(env.sos_dict[name.split(".")[0]], name.split(".", 1)[-1])) else: fe_values.append(env.sos_dict[name]) @@ -1949,14 +1689,10 @@ def _handle_for_each(self, for_each): try: import pandas as pd - if not isinstance(values, - (pd.DataFrame, pd.Series, pd.Index)): - raise ValueError( - f"Unacceptable for_each data type {values.__class__.__name__}" - ) + if not isinstance(values, (pd.DataFrame, pd.Series, pd.Index)): + raise ValueError(f"Unacceptable for_each data type {values.__class__.__name__}") except Exception as e: - raise ValueError( - f"Cannot iterate through variable {name}: {e}") from e + raise ValueError(f"Cannot iterate through variable {name}: {e}") from e if loop_size is None: loop_size = len(values) elif loop_size != len(values): @@ -1974,21 +1710,15 @@ def _handle_for_each(self, for_each): for idx in range(n_grps): for var_name, values in zip(fe_iter_names, fe_values): if isinstance(values, Sequence): - self._groups[n_grps * vidx + idx].set( - var_name, values[vidx]) + self._groups[n_grps * vidx + idx].set(var_name, values[vidx]) elif isinstance(values, pd.DataFrame): - self._groups[n_grps * vidx + idx].set( - var_name, values.iloc[vidx]) + self._groups[n_grps * vidx + idx].set(var_name, values.iloc[vidx]) elif isinstance(values, pd.Series): - self._groups[n_grps * vidx + idx].set( - var_name, values.iloc[vidx]) + self._groups[n_grps * vidx + idx].set(var_name, values.iloc[vidx]) elif isinstance(values, pd.Index): - self._groups[n_grps * vidx + idx].set( - var_name, values[vidx]) + self._groups[n_grps * vidx + idx].set(var_name, values[vidx]) else: - raise ValueError( - f"Failed to iterate through for_each variable {short_repr(values)}" - ) + raise ValueError(f"Failed to iterate through for_each variable {short_repr(values)}") def __hash__(self): return hash(repr(self)) @@ -1996,9 +1726,7 @@ def __hash__(self): def __eq__(self, other): try: # allow compare to any object as long as it can be converted to sos_targets - return self._targets == ( - other._targets if isinstance(other, sos_targets) else - sos_targets(other)._targets) + return self._targets == (other._targets if isinstance(other, sos_targets) else sos_targets(other)._targets) except Exception: return False @@ -2007,31 +1735,24 @@ def __add__(self, part): return self._targets[0].__add__(part) if len(self._targets) == 0: raise ValueError(f"Cannot add {part} to empty target list") - raise ValueError( - f"Cannot add {part} to group of {len(self)} targets {self!r}") + raise ValueError(f"Cannot add {part} to group of {len(self)} targets {self!r}") def __fspath__(self): if len(self._targets) == 1: return self._targets[0].__fspath__() if len(self._targets) == 0: - raise ValueError( - "Cannot treat an empty sos_targets as single target") - raise ValueError( - f"Cannot treat an sos_targets object {self} with more than one targets as a single target" - ) + raise ValueError("Cannot treat an empty sos_targets as single target") + raise ValueError(f"Cannot treat an sos_targets object {self} with more than one targets as a single target") def __repr__(self): - return (("[" + ", ".join(repr(x) for x in self._targets) + - "]") if self.valid() else + return (("[" + ", ".join(repr(x) for x in self._targets) + "]") if self.valid() else ("Unspecified" if self.unspecified() else self._undetermined)) def __short_repr__(self): - grp_info = "" if self._num_groups( - ) <= 1 else f" in {self._num_groups()} groups" + grp_info = "" if self._num_groups() <= 1 else f" in {self._num_groups()} groups" if self.valid(): if len(self._targets) <= 2: - return " ".join([x.target_name() for x in self._targets - ]) + grp_info + return " ".join([x.target_name() for x in self._targets]) + grp_info return (" ".join([x.target_name() for x in self._targets[:2]]) + f"... ({len(self._targets)} items{grp_info})") return "Unspecified" if self.unspecified() else self._undetermined @@ -2040,8 +1761,7 @@ def __stable_repr__(self): return repr(self) def __str__(self): - return (self.__format__("") if self.valid() else - ("Unspecified" if self.unspecified() else self._undetermined)) + return (self.__format__("") if self.valid() else ("Unspecified" if self.unspecified() else self._undetermined)) def __format__(self, format_spec): if not self.valid(): @@ -2082,11 +1802,9 @@ def __init__( if not sdict: sdict = env.sos_dict if not input_files.valid(): - raise RuntimeError( - "Input files of step signature cannot be undetermined.") + raise RuntimeError("Input files of step signature cannot be undetermined.") if not dependent_files.valid(): - raise RuntimeError( - "Dependent files of step signature cannot be undetermined.") + raise RuntimeError("Dependent files of step signature cannot be undetermined.") self.input_files = input_files.remove_targets(type=sos_step) self.dependent_files = dependent_files.remove_targets(type=sos_step) @@ -2096,8 +1814,9 @@ def __init__( # signatures that exist before execution and might change during execution self.init_signature = { - x: deepcopy(sdict[x]) for x in sorted(signature_vars) if - x in sdict and not callable(sdict[x]) and pickleable(sdict[x], x) + x: deepcopy(sdict[x]) + for x in sorted(signature_vars) + if x in sdict and not callable(sdict[x]) and pickleable(sdict[x], x) } def identify_local_args(self): @@ -2129,35 +1848,27 @@ def write(self): return self.content if self.output_files.undetermined(): self.output_files = env.sos_dict["_output"] - env.log_to_file( - "TARGET", - f'Set undetermined output files to {env.sos_dict["_output"]}') + env.log_to_file("TARGET", f'Set undetermined output files to {env.sos_dict["_output"]}') input_sig = {} for f in self.input_files: try: input_sig[str(f)] = f.target_signature() except Exception: - env.logger.debug( - f"Failed to create signature: input target {f} does not exist" - ) + env.logger.debug(f"Failed to create signature: input target {f} does not exist") return False output_sig = {} for f in self.output_files: try: output_sig[str(f)] = f.target_signature() except Exception: - env.logger.debug( - f"Failed to create signature: output target {f} does not exist" - ) + env.logger.debug(f"Failed to create signature: output target {f} does not exist") return False dependent_sig = {} for f in self.dependent_files: try: dependent_sig[str(f)] = f.target_signature() except Exception: - env.logger.debug( - f"Failed to create signature: dependent target {f} does not exist" - ) + env.logger.debug(f"Failed to create signature: dependent target {f} does not exist") return False init_context_sig = { var: objectMD5(self.init_signature[var]) @@ -2190,9 +1901,7 @@ def validate(self, signature): """Check if ofiles and ifiles match signatures recorded in md5file""" if not signature: return "Empty signature" - sig_files = ( - self.input_files._targets + self.output_files._targets + - self.dependent_files._targets) + sig_files = (self.input_files._targets + self.output_files._targets + self.dependent_files._targets) for x in sig_files: if not x.target_exists("any"): return f"Missing target {x}" @@ -2210,9 +1919,7 @@ def validate(self, signature): if env.sos_dict[key] != value: return f"Context variable {key} value mismatch: {short_repr(value)} saved, {short_repr(env.sos_dict[key])} current" except Exception as e: - env.logger.debug( - f"Variable {key} of type {type(value).__name__} cannot be compared: {e}" - ) + env.logger.debug(f"Variable {key} of type {type(value).__name__} cannot be compared: {e}") elif "init_context_sig" in signature: for key, value in signature["init_context_sig"].items(): if key not in env.sos_dict: @@ -2240,23 +1947,19 @@ def validate(self, signature): target_class = eval(target_type) else: # check registry - for entrypoint in pkg_resources.iter_entry_points( - group="sos_targets"): + for entrypoint in pkg_resources.iter_entry_points(group="sos_targets"): if entrypoint.name.strip() == target_type: target_class = entrypoint.load() break if target_class is None: - raise ValueError( - f"Failed to identify target class {target_type}" - ) + raise ValueError(f"Failed to identify target class {target_type}") # parameter of class? freal = eval(f, {target_type: target_class}) else: freal = file_target(f) if not freal.validate(m): return f"Target {f} does not exist or does not match saved signature {m}" - res[cur_type].append(freal.target_name( - ) if isinstance(freal, file_target) else freal) + res[cur_type].append(freal.target_name() if isinstance(freal, file_target) else freal) files_checked[freal.target_name()] = True except Exception as e: env.logger.debug(f"Wrong md5 in signature: {e}") @@ -2337,8 +2040,7 @@ def lock(self): return # we will need to lock on a file that we do not really write to # otherwise the lock will be broken when we write to it. - self._lock = fasteners.InterProcessLock( - os.path.join(env.temp_dir, self.sig_id + ".lock")) + self._lock = fasteners.InterProcessLock(os.path.join(env.temp_dir, self.sig_id + ".lock")) if not self._lock.acquire(blocking=False): self._lock = None raise UnavailableLock(( @@ -2355,9 +2057,7 @@ def release(self, quiet=False): if not self.sig_id: return if not hasattr(self, "_lock") or self._lock is None: - env.logger.warning( - f"Releasing an non-existent or released lock for {self.sig_id}." - ) + env.logger.warning(f"Releasing an non-existent or released lock for {self.sig_id}.") return if self._lock: try: @@ -2368,9 +2068,7 @@ def release(self, quiet=False): ) except Exception as e: if not quiet: - env.logger.warning( - f"Unable to release lock for output files {self.output_files}: {e}" - ) + env.logger.warning(f"Unable to release lock for output files {self.output_files}: {e}") finally: self._lock = None @@ -2378,8 +2076,7 @@ def set_output(self, files: sos_targets): if not self.sig_id: return # add signature file if input and output files are dynamic - if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config[ - "SOS_DEBUG"]: + if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config["SOS_DEBUG"]: env.log_to_file("TARGET", f"Set output of signature to {files}") self.output_files = files @@ -2391,11 +2088,8 @@ def write(self): if not self.sig_id: return False if not self.output_files.valid(): - raise ValueError( - f"Cannot write signature with undetermined output {self.output_files}" - ) - if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config[ - "SOS_DEBUG"]: + raise ValueError(f"Cannot write signature with undetermined output {self.output_files}") + if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config["SOS_DEBUG"]: env.log_to_file( "TARGET", f"write signature {self.sig_id} with output {self.output_files}", @@ -2410,21 +2104,9 @@ def write(self): "tracked_files", self.sig_id, repr({ - "input_files": [ - str(f.resolve()) - for f in self.input_files - if isinstance(f, file_target) - ], - "dependent_files": [ - str(f.resolve()) - for f in self.dependent_files - if isinstance(f, file_target) - ], - "output_files": [ - str(f.resolve()) - for f in self.output_files - if isinstance(f, file_target) - ], + "input_files": [str(f.resolve()) for f in self.input_files if isinstance(f, file_target)], + "dependent_files": [str(f.resolve()) for f in self.dependent_files if isinstance(f, file_target)], + "output_files": [str(f.resolve()) for f in self.output_files if isinstance(f, file_target)], }), ]) return True @@ -2433,14 +2115,11 @@ def validate(self): """Check if ofiles and ifiles match signatures recorded in md5file""" if not self.sig_id: return "no signature for steps with nested workflow" - if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config[ - "SOS_DEBUG"]: + if "TARGET" in env.config["SOS_DEBUG"] or "ALL" in env.config["SOS_DEBUG"]: env.log_to_file("TARGET", f"Validating {self.sig_id}") # # file not exist? - sig_files = ( - self.input_files._targets + self.output_files._targets + - self.dependent_files._targets) + sig_files = (self.input_files._targets + self.output_files._targets + self.dependent_files._targets) for x in sig_files: if not x.target_exists("any"): return f"Missing target {x}" diff --git a/src/sos/task_executor.py b/src/sos/task_executor.py index 12714ea57..54fb1531a 100644 --- a/src/sos/task_executor.py +++ b/src/sos/task_executor.py @@ -150,8 +150,7 @@ def execute_single_task(self, task_id, params, runtime, sig_content, quiet=False "_depends", ]: if key in sos_dict and isinstance(sos_dict[key], sos_targets): - # resolve remote() target - env.sos_dict.set(key, sos_dict[key].remove_targets(type=sos_step).resolve_remote()) + env.sos_dict.set(key, sos_dict[key].remove_targets(type=sos_step)) # when no output is specified, we just treat the task as having no output (determined) env.sos_dict["_output"]._undetermined = False