Skip to content

Commit

Permalink
Remove sos remote push and sos remote pull and send_to_hot function
Browse files Browse the repository at this point in the history
  • Loading branch information
BoPeng committed Feb 9, 2024
1 parent c3ea3f1 commit 5ab24eb
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 286 deletions.
16 changes: 0 additions & 16 deletions src/sos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,22 +1057,6 @@ def cmd_remote(args, workflow_args):
from .remote import run_command_on_hosts

run_command_on_hosts(cfg, args.hosts, args.cmd, args.verbosity)
elif args.action == "push":
if not args.files:
raise ValueError(
"Please specify files to push to remote host with option --files"
)
from .remote import push_to_hosts

push_to_hosts(cfg, args.hosts, args.files, args.verbosity)
elif args.action == "pull":
if not args.files:
raise ValueError(
"Please specify files to pull from remote host with option --files"
)
from .remote import pull_from_host

pull_from_host(cfg, args.hosts, args.files, args.verbosity)
else:
raise ValueError(
"Unacceptable remote action. Use command 'sos remote -h' to check allowable actions."
Expand Down
186 changes: 0 additions & 186 deletions src/sos/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,6 @@ def target_exists(self, targets):
def target_signature(self, targets):
return targets.target_signature()

def send_to_host(self, items):
return {x: x for x in items}

def receive_from_host(self, items):
return {x: x for x in items}

def prepare_task(self, task_id):
task_file = os.path.join(
os.path.expanduser("~"), ".sos", "tasks", task_id + ".task")
Expand Down Expand Up @@ -623,148 +617,6 @@ def _remote_abs(self, path):
return path
return os.path.join(self._map_var(os.getcwd()), path)

def send_to_host(self, items):
# we only copy files and directories, not other types of targets
if isinstance(items, str):
p = [path(items)]
elif isinstance(items, path):
p = [items]
elif isinstance(items, Sequence):
ignored = [x for x in items if not isinstance(x, (str, path))]
if ignored:
env.logger.info(f"``Ignore`` {ignored}")
p = [path(x) for x in items if isinstance(x, (str, path))]
else:
env.logger.warning(
f"Unrecognized items to be sent to host: {items}")
return {}

items = sum(
[list(x.parent.glob(x.name)) for x in p],
[],
)

from .utils import find_symbolic_links

new_items = []
for item in items:
links = find_symbolic_links(item)
for link, realpath in links.items():
env.logger.info(f"Adding {realpath} for symbolic link {link}")
new_items.extend(links.values())
items.extend(new_items)

sending = self._map_path(items)

sent = {}
for source in sorted(sending.keys()):
dest = self._remote_abs(sending[source])
if self.is_shared(source):
if ("TASK" in env.config["SOS_DEBUG"] or
"ALL" in env.config["SOS_DEBUG"]):
env.log_to_file(
"TASK", f"Skip sending {source} on shared file system")
else:
if ("TASK" in env.config["SOS_DEBUG"] or
"ALL" in env.config["SOS_DEBUG"]):
env.log_to_file(
"TASK", f"Sending ``{source}`` to {self.alias}:{dest}")
cmd = cfg_interpolate(
self._get_send_cmd(
rename=os.path.basename(source) != os.path.basename(
dest)),
{
"source": sos_targets(str(source).rstrip("/")),
"dest": sos_targets(dest),
"host": self.address,
"port": self.port,
},
)
if ("TASK" in env.config["SOS_DEBUG"] or
"ALL" in env.config["SOS_DEBUG"]):
env.log_to_file("TASK", cmd)
ret = subprocess.call(
cmd,
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
if ret != 0:
raise RuntimeError(
f'Failed to copy {source} to {self.alias} using command "{cmd}". The remote host might be unavailable.'
)
sent[source] = dest
return sent

def receive_from_host(self, items):
if isinstance(items, str):
items = [items]
elif isinstance(items, path):
items = [str(items)]
elif isinstance(items, Sequence):
ignored = [x for x in items if not isinstance(x, (str, path))]
if ignored:
env.logger.info(f"``Ignore`` {ignored}")
items = [x for x in items if isinstance(x, (str, path))]
else:
env.logger.warning(
f"Unrecognized items to be retrieved from host: {items}")
return {}

# y could be path
receiving = {
self._remote_abs(y): str(x)
for x, y in self._map_path(items).items()
}
#
received = {}
for source in sorted(receiving.keys()):
dest = receiving[source]
dest_dir = os.path.dirname(dest)
if dest_dir and not os.path.isdir(dest_dir):
try:
os.makedirs(dest_dir)
except Exception as e:
env.logger.error(
f"Failed to create destination directory {dest_dir}: {e}"
)
if self.is_shared(dest) and os.path.basename(
source) == os.path.basename(dest):
env.logger.debug(
f"Skip retrieving ``{dest}`` from shared file system")
received[dest] = source
else:
cmd = cfg_interpolate(
self._get_receive_cmd(
rename=os.path.basename(source) != os.path.basename(
dest)),
{
"source": sos_targets(str(source).rstrip("/")),
"dest": sos_targets(dest),
"host": self.address,
"port": self.port,
},
)
if ("TASK" in env.config["SOS_DEBUG"] or
"ALL" in env.config["SOS_DEBUG"]):
env.log_to_file("TASK", cmd)
try:
ret = subprocess.call(
cmd,
shell=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
if ret != 0:
raise RuntimeError(f"command return {ret}")
received[dest] = source
except Exception as e:
raise RuntimeError(
f'Failed to copy {source} from {self.alias} using command "{cmd}": {e}'
) from e
return received

#
# Interface
#
def prepare_task(self, task_id):
Expand Down Expand Up @@ -817,37 +669,6 @@ def _prepare_task(self, task_id):
f'Task {task_id} requested more walltime ({task_vars["_runtime"]["walltime"]}) than allowed max_walltime ({self.config["max_walltime"]})'
)

if task_vars["_input"] and not isinstance(task_vars["_input"],
Undetermined):
sent = self.send_to_host(task_vars["_input"])
if sent:
env.logger.info(
f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}"
)
if task_vars["_depends"] and not isinstance(task_vars["_depends"],
Undetermined):
sent = self.send_to_host(task_vars["_depends"])
if sent:
env.logger.info(
f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}"
)
if "to_host" in task_vars["_runtime"]:
if not isinstance(
task_vars["_runtime"]["to_host"],
(str, Sequence)) or (isinstance(
task_vars["_runtime"]["to_host"], Sequence) and not all(
isinstance(x, str)
for x in task_vars["_runtime"]["to_host"])):
raise ValueError(
f'Parameter to_host accepts a list of paths (strings). {task_vars["_runtime"]["to_host"]} provided'
)

sent = self.send_to_host(task_vars["_runtime"]["to_host"])
if sent:
env.logger.info(
f"{task_id} ``sent`` {short_repr(sent.keys())} to {self.alias}"
)

# map variables
runtime["_runtime"]["workdir"] = (
task_vars["_runtime"]["workdir"] if "workdir"
Expand Down Expand Up @@ -1428,13 +1249,6 @@ def _get_host_agent(self, start_engine: bool,
# public interface
#
# based on Host definition
#
def send_to_host(self, items):
return self._host_agent.send_to_host(items)

def receive_from_host(self, items):
return self._host_agent.receive_from_host(items)

def map_var(self, rvars):
return self._host_agent._map_var(rvars)

Expand Down
73 changes: 0 additions & 73 deletions src/sos/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,6 @@ def test_paths(host):
tFile.write(f"{tID}")
except Exception:
return f"Failed to write to mapped directory {local}"
# test if file can be sent
try:
host.send_to_host(os.path.join(local, f".sos_test_{tID}.txt"))
except Exception as e:
return (
f"Failed to send files under {local} to remote host under {remote}: {e}"
)

# the file should be available on remote host
try:
Expand Down Expand Up @@ -584,69 +577,3 @@ def run_command_on_hosts(cfg, hosts, cmd, verbosity):
if verbosity and verbosity > 2:
sys.stderr.write(get_traceback())
env.logger.error(str(e))


def push_to_hosts(cfg, hosts, items, verbosity):
env.verbosity = verbosity
if not hosts:
hosts = cfg.get("hosts", [])
if not hosts:
env.logger.warning(
"No remote host or task queue is defined in ~/.sos/hosts.yml.")
return
for host in hosts:
try:
env.logger.info(f'Pushing ``{" ".join(items)}`` to ``{host}``')

h = Host(host, start_engine=False)
#
sent = h.send_to_host(items)
#
env.logger.info("{} item{} sent:\n{}".format(
len(sent),
" is" if len(sent) <= 1 else "s are",
"\n".join([
f"{x} => {sent[x]}" for x in sorted(sent.keys())
]),
))
except Exception as e:
from .utils import get_traceback

if verbosity and verbosity > 2:
sys.stderr.write(get_traceback())
env.logger.error(str(e))
sys.exit(1)


def pull_from_host(cfg, hosts, items, verbosity):
env.verbosity = verbosity
if not hosts:
hosts = cfg.get("hosts", [])
if not hosts:
env.logger.warning(
"No remote host or task queue is defined in ~/.sos/hosts.yml.")
return
if len(hosts) > 1:
raise ValueError("Can only pull from a single remote host.")
try:
env.logger.info(f'Pulling ``{" ".join(items)}`` from ``{hosts[0]}``')

host = Host(hosts[0], start_engine=False)
#
received = host.receive_from_host(items)
#
print("{} item{} received:\n{}".format(
len(received),
" is" if len(received) <= 1 else "s are",
"\n".join([
f"{x} <= {received[x]}"
for x in sorted(received.keys())
]),
))
except Exception as e:
from .utils import get_traceback

if verbosity and verbosity > 2:
sys.stderr.write(get_traceback())
env.logger.error(str(e))
sys.exit(1)
12 changes: 1 addition & 11 deletions src/sos/workflow_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,8 @@ def execute_workflow(self, filename, command, **template_args):
# copy the files over
self.agent.send_job_file(f"~/.sos/config_{self.alias}.yml", dir=".")

self.local_filename = filename

self.job_name = workflow.calc_md5(workflow_args)

if os.path.isfile(filename):
ret = self.agent.send_to_host([filename])
elif os.path.isfile(filename + ".sos"):
ret = self.agent.send_to_host([filename + ".sos"])
elif os.path.isfile(filename + ".ipynb"):
ret = self.agent.send_to_host([filename + ".ipynb"])

self.filename = list(ret.values())[0]
self.filename = filename
self.command = self.remove_arg(command, "-r")
# -c only point to local config file.
self.command = self.remove_arg(self.command, "-c")
Expand Down

0 comments on commit 5ab24eb

Please sign in to comment.