diff --git a/src/sos/__main__.py b/src/sos/__main__.py index e634f0584..a8e6b89c3 100755 --- a/src/sos/__main__.py +++ b/src/sos/__main__.py @@ -1057,22 +1057,6 @@ def cmd_remote(args, workflow_args): from .remote import run_command_on_hosts run_command_on_hosts(cfg, args.hosts, args.cmd, args.verbosity) - elif args.action == "push": - if not args.files: - raise ValueError( - "Please specify files to push to remote host with option --files" - ) - from .remote import push_to_hosts - - push_to_hosts(cfg, args.hosts, args.files, args.verbosity) - elif args.action == "pull": - if not args.files: - raise ValueError( - "Please specify files to pull from remote host with option --files" - ) - from .remote import pull_from_host - - pull_from_host(cfg, args.hosts, args.files, args.verbosity) else: raise ValueError( "Unacceptable remote action. Use command 'sos remote -h' to check allowable actions." diff --git a/src/sos/hosts.py b/src/sos/hosts.py index 85c41984f..0900398ce 100755 --- a/src/sos/hosts.py +++ b/src/sos/hosts.py @@ -153,12 +153,6 @@ def target_exists(self, targets): def target_signature(self, targets): return targets.target_signature() - def send_to_host(self, items): - return {x: x for x in items} - - def receive_from_host(self, items): - return {x: x for x in items} - def prepare_task(self, task_id): task_file = os.path.join( os.path.expanduser("~"), ".sos", "tasks", task_id + ".task") @@ -623,148 +617,6 @@ def _remote_abs(self, path): return path return os.path.join(self._map_var(os.getcwd()), path) - def send_to_host(self, items): - # we only copy files and directories, not other types of targets - if isinstance(items, str): - p = [path(items)] - elif isinstance(items, path): - p = [items] - elif isinstance(items, Sequence): - ignored = [x for x in items if not isinstance(x, (str, path))] - if ignored: - env.logger.info(f"``Ignore`` {ignored}") - p = [path(x) for x in items if isinstance(x, (str, path))] - else: - env.logger.warning( - f"Unrecognized items to be sent to host: {items}") - return {} - - items = sum( - [list(x.parent.glob(x.name)) for x in p], - [], - ) - - from .utils import find_symbolic_links - - new_items = [] - for item in items: - links = find_symbolic_links(item) - for link, realpath in links.items(): - env.logger.info(f"Adding {realpath} for symbolic link {link}") - new_items.extend(links.values()) - items.extend(new_items) - - sending = self._map_path(items) - - sent = {} - for source in sorted(sending.keys()): - dest = self._remote_abs(sending[source]) - if self.is_shared(source): - if ("TASK" in env.config["SOS_DEBUG"] or - "ALL" in env.config["SOS_DEBUG"]): - env.log_to_file( - "TASK", f"Skip sending {source} on shared file system") - else: - if ("TASK" in env.config["SOS_DEBUG"] or - "ALL" in env.config["SOS_DEBUG"]): - env.log_to_file( - "TASK", f"Sending ``{source}`` to {self.alias}:{dest}") - cmd = cfg_interpolate( - self._get_send_cmd( - rename=os.path.basename(source) != os.path.basename( - dest)), - { - "source": sos_targets(str(source).rstrip("/")), - "dest": sos_targets(dest), - "host": self.address, - "port": self.port, - }, - ) - if ("TASK" in env.config["SOS_DEBUG"] or - "ALL" in env.config["SOS_DEBUG"]): - env.log_to_file("TASK", cmd) - ret = subprocess.call( - cmd, - shell=True, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - ) - if ret != 0: - raise RuntimeError( - f'Failed to copy {source} to {self.alias} using command "{cmd}". The remote host might be unavailable.' - ) - sent[source] = dest - return sent - - def receive_from_host(self, items): - if isinstance(items, str): - items = [items] - elif isinstance(items, path): - items = [str(items)] - elif isinstance(items, Sequence): - ignored = [x for x in items if not isinstance(x, (str, path))] - if ignored: - env.logger.info(f"``Ignore`` {ignored}") - items = [x for x in items if isinstance(x, (str, path))] - else: - env.logger.warning( - f"Unrecognized items to be retrieved from host: {items}") - return {} - - # y could be path - receiving = { - self._remote_abs(y): str(x) - for x, y in self._map_path(items).items() - } - # - received = {} - for source in sorted(receiving.keys()): - dest = receiving[source] - dest_dir = os.path.dirname(dest) - if dest_dir and not os.path.isdir(dest_dir): - try: - os.makedirs(dest_dir) - except Exception as e: - env.logger.error( - f"Failed to create destination directory {dest_dir}: {e}" - ) - if self.is_shared(dest) and os.path.basename( - source) == os.path.basename(dest): - env.logger.debug( - f"Skip retrieving ``{dest}`` from shared file system") - received[dest] = source - else: - cmd = cfg_interpolate( - self._get_receive_cmd( - rename=os.path.basename(source) != os.path.basename( - dest)), - { - "source": sos_targets(str(source).rstrip("/")), - "dest": sos_targets(dest), - "host": self.address, - "port": self.port, - }, - ) - if ("TASK" in env.config["SOS_DEBUG"] or - "ALL" in env.config["SOS_DEBUG"]): - env.log_to_file("TASK", cmd) - try: - ret = subprocess.call( - cmd, - shell=True, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - ) - if ret != 0: - raise RuntimeError(f"command return {ret}") - received[dest] = source - except Exception as e: - raise RuntimeError( - f'Failed to copy {source} from {self.alias} using command "{cmd}": {e}' - ) from e - return received - - # # Interface # def prepare_task(self, task_id): @@ -817,37 +669,6 @@ def _prepare_task(self, task_id): f'Task {task_id} requested more walltime ({task_vars["_runtime"]["walltime"]}) than allowed max_walltime ({self.config["max_walltime"]})' ) - if task_vars["_input"] and not isinstance(task_vars["_input"], - Undetermined): - sent = self.send_to_host(task_vars["_input"]) - if sent: - env.logger.info( - f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}" - ) - if task_vars["_depends"] and not isinstance(task_vars["_depends"], - Undetermined): - sent = self.send_to_host(task_vars["_depends"]) - if sent: - env.logger.info( - f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}" - ) - if "to_host" in task_vars["_runtime"]: - if not isinstance( - task_vars["_runtime"]["to_host"], - (str, Sequence)) or (isinstance( - task_vars["_runtime"]["to_host"], Sequence) and not all( - isinstance(x, str) - for x in task_vars["_runtime"]["to_host"])): - raise ValueError( - f'Parameter to_host accepts a list of paths (strings). {task_vars["_runtime"]["to_host"]} provided' - ) - - sent = self.send_to_host(task_vars["_runtime"]["to_host"]) - if sent: - env.logger.info( - f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}" - ) - # map variables runtime["_runtime"]["workdir"] = ( task_vars["_runtime"]["workdir"] if "workdir" @@ -1428,13 +1249,6 @@ def _get_host_agent(self, start_engine: bool, # public interface # # based on Host definition - # - def send_to_host(self, items): - return self._host_agent.send_to_host(items) - - def receive_from_host(self, items): - return self._host_agent.receive_from_host(items) - def map_var(self, rvars): return self._host_agent._map_var(rvars) diff --git a/src/sos/remote.py b/src/sos/remote.py index 9bca8b0cf..7532a646e 100755 --- a/src/sos/remote.py +++ b/src/sos/remote.py @@ -216,13 +216,6 @@ def test_paths(host): tFile.write(f"{tID}") except Exception: return f"Failed to write to mapped directory {local}" - # test if file can be sent - try: - host.send_to_host(os.path.join(local, f".sos_test_{tID}.txt")) - except Exception as e: - return ( - f"Failed to send files under {local} to remote host under {remote}: {e}" - ) # the file should be available on remote host try: @@ -584,69 +577,3 @@ def run_command_on_hosts(cfg, hosts, cmd, verbosity): if verbosity and verbosity > 2: sys.stderr.write(get_traceback()) env.logger.error(str(e)) - - -def push_to_hosts(cfg, hosts, items, verbosity): - env.verbosity = verbosity - if not hosts: - hosts = cfg.get("hosts", []) - if not hosts: - env.logger.warning( - "No remote host or task queue is defined in ~/.sos/hosts.yml.") - return - for host in hosts: - try: - env.logger.info(f'Pushing ``{" ".join(items)}`` to ``{host}``') - - h = Host(host, start_engine=False) - # - sent = h.send_to_host(items) - # - env.logger.info("{} item{} sent:\n{}".format( - len(sent), - " is" if len(sent) <= 1 else "s are", - "\n".join([ - f"{x} => {sent[x]}" for x in sorted(sent.keys()) - ]), - )) - except Exception as e: - from .utils import get_traceback - - if verbosity and verbosity > 2: - sys.stderr.write(get_traceback()) - env.logger.error(str(e)) - sys.exit(1) - - -def pull_from_host(cfg, hosts, items, verbosity): - env.verbosity = verbosity - if not hosts: - hosts = cfg.get("hosts", []) - if not hosts: - env.logger.warning( - "No remote host or task queue is defined in ~/.sos/hosts.yml.") - return - if len(hosts) > 1: - raise ValueError("Can only pull from a single remote host.") - try: - env.logger.info(f'Pulling ``{" ".join(items)}`` from ``{hosts[0]}``') - - host = Host(hosts[0], start_engine=False) - # - received = host.receive_from_host(items) - # - print("{} item{} received:\n{}".format( - len(received), - " is" if len(received) <= 1 else "s are", - "\n".join([ - f"{x} <= {received[x]}" - for x in sorted(received.keys()) - ]), - )) - except Exception as e: - from .utils import get_traceback - - if verbosity and verbosity > 2: - sys.stderr.write(get_traceback()) - env.logger.error(str(e)) - sys.exit(1) diff --git a/src/sos/workflow_engines.py b/src/sos/workflow_engines.py index e2c094544..93ee0c2d6 100644 --- a/src/sos/workflow_engines.py +++ b/src/sos/workflow_engines.py @@ -88,18 +88,8 @@ def execute_workflow(self, filename, command, **template_args): # copy the files over self.agent.send_job_file(f"~/.sos/config_{self.alias}.yml", dir=".") - self.local_filename = filename - self.job_name = workflow.calc_md5(workflow_args) - - if os.path.isfile(filename): - ret = self.agent.send_to_host([filename]) - elif os.path.isfile(filename + ".sos"): - ret = self.agent.send_to_host([filename + ".sos"]) - elif os.path.isfile(filename + ".ipynb"): - ret = self.agent.send_to_host([filename + ".ipynb"]) - - self.filename = list(ret.values())[0] + self.filename = filename self.command = self.remove_arg(command, "-r") # -c only point to local config file. self.command = self.remove_arg(self.command, "-c")