Skip to content

Commit

Permalink
Merge pull request #483 from BiBiServ/feature-increase-ssh-timeout
Browse files Browse the repository at this point in the history
Added keyword for ssh_timeout and improved argument passing for ssh.
  • Loading branch information
XaverStiensmeier authored Apr 10, 2024
2 parents d4d3fc4 + 6ef3acb commit 55a06f3
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 96 deletions.
4 changes: 1 addition & 3 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ updates:
schedule:
interval: "daily"
open-pull-requests-limit: 10
versioning-strategy: "widen"
target:
versions: [">=3.0.0"]
versioning-strategy: "auto"
2 changes: 2 additions & 0 deletions bibigrid.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
cloud: openstack # name of clouds.yaml cloud-specification key (which is value to top level key clouds)

# -- BEGIN: GENERAL CLUSTER INFORMATION --
# sshTimeout: 5 # Number of ssh connection attempts with 2^attempt seconds in between (2^sshTimeout-1 is the max time before returning with an error)

## sshPublicKeyFiles listed here will be added to access the cluster. A temporary key is created by bibigrid itself.
#sshPublicKeyFiles:
# - [public key one]
Expand Down
24 changes: 13 additions & 11 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, providers, configurations, config_path, log, debug=False, clu
self.ssh_user = configurations[0].get("sshUser") or "ubuntu"
self.ssh_add_public_key_commands = ssh_handler.get_add_ssh_public_key_commands(
configurations[0].get("sshPublicKeyFiles"))
self.ssh_timeout = configurations[0].get("sshTimeout", 4)
self.config_path = config_path
self.master_ip = None
self.log.debug("Cluster-ID: %s", self.cluster_id)
Expand Down Expand Up @@ -256,17 +257,17 @@ def initialize_instances(self):
Setup all servers
"""
for configuration in self.configurations:
ssh_data = {"floating_ip": configuration["floating_ip"], "private_key": KEY_FOLDER + self.key_name,
"username": self.ssh_user, "commands": None, "filepaths": None,
"gateway": configuration.get("gateway", {}), "timeout": self.ssh_timeout}
if configuration.get("masterInstance"):
self.master_ip = configuration["floating_ip"]
ssh_handler.ansible_preparation(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=self.ssh_add_public_key_commands, log=self.log,
gateway=configuration.get("gateway", {}))
ssh_data["commands"] = self.ssh_add_public_key_commands + ssh_handler.ANSIBLE_SETUP
ssh_data["filepaths"] = [(ssh_data["private_key"], ssh_handler.PRIVATE_KEY_FILE)]
ssh_handler.execute_ssh(ssh_data, self.log)
elif configuration.get("vpnInstance"):
ssh_handler.execute_ssh(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=ssh_handler.VPN_SETUP, log=self.log,
gateway=configuration.get("gateway", {}))
ssh_data["commands"] = ssh_handler.VPN_SETUP
ssh_handler.execute_ssh(ssh_data, self.log)

def prepare_volumes(self, provider, mounts):
"""
Expand Down Expand Up @@ -343,9 +344,10 @@ def upload_data(self):
self.log.debug(f"Starting playbook with {ansible_start}.")
commands = [ssh_handler.get_ac_command(self.providers, AC_NAME.format(
cluster_id=self.cluster_id))] + ssh_handler.ANSIBLE_START
ssh_handler.execute_ssh(floating_ip=self.master_ip, private_key=KEY_FOLDER + self.key_name,
username=self.ssh_user, filepaths=FILEPATHS, commands=commands, log=self.log,
gateway=self.configurations[0].get("gateway", {}))
ssh_data = {"floating_ip": self.master_ip, "private_key": KEY_FOLDER + self.key_name,
"username": self.ssh_user, "commands": commands, "filepaths": FILEPATHS,
"gateway": self.configurations[0].get("gateway", {}), "timeout": self.ssh_timeout}
ssh_handler.execute_ssh(ssh_data=ssh_data, log=self.log)

def start_start_server_threads(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion bibigrid/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def run_action(args, configurations, config_path):
creator = create.Create(providers=providers, configurations=configurations, log=LOG, debug=args.debug,
config_path=config_path)
LOG.log(42, "Creating a new cluster takes about 10 or more minutes depending on your cloud provider "
"and your configuration. Be patient.")
"and your configuration. Please be patient.")
exit_state = creator.create()
else:
if not args.cluster_id:
Expand Down
11 changes: 7 additions & 4 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import mergedeep
import yaml
from bibigrid.core.actions.version import __version__

from bibigrid.core.actions import create
from bibigrid.core.actions import ide
from bibigrid.core.actions.version import __version__
from bibigrid.core.utility import id_generation
from bibigrid.core.utility import yaml_dumper
from bibigrid.core.utility.handler import configuration_handler
Expand All @@ -30,6 +30,7 @@
SLURM_CONF = {"db": "slurm", "db_user": "slurm", "db_password": "changeme",
"munge_key": id_generation.generate_munge_key(),
"elastic_scheduling": {"SuspendTime": 3600, "ResumeTimeout": 900, "TreeWidth": 128}}
CLOUD_SCHEDULING = {"sshTimeout": 4}


def delete_old_vars(log):
Expand Down Expand Up @@ -184,7 +185,10 @@ def generate_common_configuration_yaml(cidrs, configurations, cluster_id, ssh_us
"slurm": master_configuration.get("slurm", True), "ssh_user": ssh_user,
"slurm_conf": mergedeep.merge({}, SLURM_CONF,
master_configuration.get("slurmConf", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)}
strategy=mergedeep.Strategy.TYPESAFE_REPLACE),
"cloud_scheduling": mergedeep.merge({}, CLOUD_SCHEDULING,
master_configuration.get("cloudScheduling", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)}
if master_configuration.get("nfs"):
nfs_shares = master_configuration.get("nfsShares", [])
nfs_shares = nfs_shares + DEFAULT_NFS_SHARES
Expand All @@ -201,8 +205,7 @@ def generate_common_configuration_yaml(cidrs, configurations, cluster_id, ssh_us
master_configuration.get("zabbixConf", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)

for from_key, to_key in [("ansibleRoles", "ansible_roles"),
("ansibleGalaxyRoles", "ansible_galaxy_roles")]:
for from_key, to_key in [("ansibleRoles", "ansible_roles"), ("ansibleGalaxyRoles", "ansible_galaxy_roles")]:
pass_through(master_configuration, common_configuration_yaml, from_key, to_key)

if len(configurations) > 1:
Expand Down
111 changes: 43 additions & 68 deletions bibigrid/core/utility/handler/ssh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,51 +87,52 @@ def copy_to_server(sftp, local_path, remote_path, log):
copy_to_server(sftp, os.path.join(local_path, filename), os.path.join(remote_path, filename), log)


def is_active(client, floating_ip_address, private_key, username, log, gateway, timeout=5):
def is_active(client, paramiko_key, ssh_data, log):
"""
Checks if connection is possible and therefore if server is active.
Raises paramiko.ssh_exception.NoValidConnectionsError if timeout is reached
@param client: created client
@param floating_ip_address: ip to connect to
@param private_key: SSH-private_key
@param username: SSH-username
@param paramiko_key: SSH-private_key
@param log:
@param timeout: how long to wait between ping
@param gateway: if node should be reached over a gateway port is set to 30000 + subnet * 256 + host
@param ssh_data: dict containing among other things gateway, floating_ip, username
(waiting grows quadratically till 2**timeout before accepting failure)
"""
attempts = 0
establishing_connection = True
log.info(f"Attempting to connect to {ssh_data['floating_ip']}... This might take a while")
port = 22
if ssh_data.get('gateway'):
log.info(f"Using SSH Gateway {ssh_data['gateway'].get('ip')}")
octets = {f'oct{enum + 1}': int(elem) for enum, elem in enumerate(ssh_data['floating_ip'].split("."))}
port = int(sympy.sympify(ssh_data['gateway']["portFunction"]).subs(dict(octets)))
log.info(f"Port {port} will be used (see {ssh_data['gateway']['portFunction']} and octets {octets}).")
while establishing_connection:
try:
port = 22
if gateway:
log.info(f"Using SSH Gateway {gateway.get('ip')}")
octets = {f'oct{enum + 1}': int(elem) for enum, elem in enumerate(floating_ip_address.split("."))}
port = int(sympy.sympify(gateway["portFunction"]).subs(dict(octets)))
log.info(f"Port {port} will be used (see {gateway['portFunction']} and octets {octets}).")
client.connect(hostname=gateway.get("ip") or floating_ip_address, username=username,
pkey=private_key, timeout=7, auth_timeout=5, port=port)
log.info(f"Attempt {attempts}/{ssh_data['timeout']}. Connecting to {ssh_data['floating_ip']}")
client.connect(hostname=ssh_data['gateway'].get("ip") or ssh_data['floating_ip'],
username=ssh_data['username'], pkey=paramiko_key, timeout=7,
auth_timeout=ssh_data['timeout'], port=port)
establishing_connection = False
log.info(f"Successfully connected to {floating_ip_address}")
log.info(f"Successfully connected to {ssh_data['floating_ip']}.")
except paramiko.ssh_exception.NoValidConnectionsError as exc:
log.info(f"Attempting to connect to {floating_ip_address}... This might take a while", )
if attempts < timeout:
time.sleep(2 ** attempts)
if attempts < ssh_data['timeout']:
sleep_time = 2 ** (attempts+2)
time.sleep(sleep_time)
log.info(f"Waiting {sleep_time} before attempting to reconnect.")
attempts += 1
else:
log.error(f"Attempt to connect to {floating_ip_address} failed.")
log.error(f"Attempt to connect to {ssh_data['floating_ip']} failed.")
raise ConnectionException(exc) from exc
except socket.timeout as exc:
log.warning("Socket timeout exception occurred. Try again ...")
if attempts < timeout:
if attempts < ssh_data['timeout']:
attempts += 1
else:
log.error(f"Attempt to connect to {floating_ip_address} failed, due to a socket timeout.")
log.error(f"Attempt to connect to {ssh_data['floating_ip']} failed, due to a socket timeout.")
raise ConnectionException(exc) from exc
except TimeoutError as exc: # pylint: disable=duplicate-except
log.error("The attempt to connect to %s failed. Possible known reasons:"
"\n\t-Your network's security group doesn't allow SSH.", floating_ip_address)
"\n\t-Your network's security group doesn't allow SSH.", ssh_data['floating_ip'])
raise ConnectionException(exc) from exc


Expand Down Expand Up @@ -183,61 +184,35 @@ def execute_ssh_cml_commands(client, commands, log):
raise ExecutionException(msg)


def ansible_preparation(floating_ip, private_key, username, log, gateway, commands=None, filepaths=None):
"""
Installs python and pip. Then installs ansible via pip.
Copies private key to instance so cluster-nodes are reachable and sets permission as necessary.
Copies additional files and executes additional commands if given.
The playbook is copied later, because it needs all servers setup and is not time intensive.
See: create.update_playbooks
@param floating_ip: public ip of server to ansible-prepare
@param private_key: generated private key of all cluster-server
@param username: username of all server
@param log:
@param commands: additional commands to execute
@param filepaths: additional files to copy: (localpath, remotepath)
@param gateway
"""
if filepaths is None:
filepaths = []
if commands is None:
commands = []
log.info("Ansible preparation...")
commands = ANSIBLE_SETUP + commands
filepaths.append((private_key, PRIVATE_KEY_FILE))
execute_ssh(floating_ip, private_key, username, log, gateway, commands, filepaths)


def execute_ssh(floating_ip, private_key, username, log, gateway, commands=None, filepaths=None):
def execute_ssh(ssh_data, log):
"""
Executes commands on remote and copies files given in filepaths
@param floating_ip: public ip of remote
@param private_key: key of remote
@param username: username of remote
@param commands: commands
@param ssh_data: Dict containing floating_ip, private_key, username, commands, filepaths, gateway, timeout
@param log:
@param filepaths: filepaths (localpath, remotepath)
@param gateway: gateway if used
"""
if commands is None:
commands = []
paramiko_key = paramiko.ECDSAKey.from_private_key_file(private_key)
log.debug(f"Running execute_sshc with ssh_data: {ssh_data}.")
if ssh_data.get("filepaths") is None:
ssh_data["filepaths"] = []
if ssh_data.get("commands") is None:
ssh_data["commands"] = []
paramiko_key = paramiko.ECDSAKey.from_private_key_file(ssh_data["private_key"])
with paramiko.SSHClient() as client:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
is_active(client=client, floating_ip_address=floating_ip, username=username, private_key=paramiko_key,
log=log, gateway=gateway)
is_active(client=client, paramiko_key=paramiko_key, ssh_data=ssh_data, log=log)
except ConnectionException as exc:
log.error(f"Couldn't connect to ip {gateway or floating_ip} using private key {private_key}.")
log.error(f"Couldn't connect to ip {ssh_data['gateway'] or ssh_data['floating_ip']} using private key "
f"{ssh_data['private_key']}.")
raise exc
else:
log.debug(f"Setting up {floating_ip}")
if filepaths:
log.debug(f"Setting up filepaths for {floating_ip}")
log.debug(f"Setting up {ssh_data['floating_ip']}")
if ssh_data['filepaths']:
log.debug(f"Setting up filepaths for {ssh_data['floating_ip']}")
sftp = client.open_sftp()
for local_path, remote_path in filepaths:
for local_path, remote_path in ssh_data['filepaths']:
copy_to_server(sftp=sftp, local_path=local_path, remote_path=remote_path, log=log)
log.debug("SFTP: Files %s copied.", filepaths)
if commands:
log.debug(f"Setting up commands for {floating_ip}")
execute_ssh_cml_commands(client=client, commands=commands, log=log)
log.debug("SFTP: Files %s copied.", ssh_data['filepaths'])
if ssh_data["floating_ip"]:
log.debug(f"Setting up commands for {ssh_data['floating_ip']}")
execute_ssh_cml_commands(client=client, commands=ssh_data["commands"], log=log)
13 changes: 12 additions & 1 deletion documentation/markdown/features/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ sshPublicKeyFiles:
- /home/user/.ssh/id_ecdsa_colleague.pub
```
#### sshTimeout (optional)
Defines the number of attempts that BiBiGrid will try to connect to the master instance via ssh.
Attempts have a pause of `2^(attempts+2)` seconds in between. Default value is 4.

#### cloudScheduling (optional)
This key allows you to influence cloud scheduling. Currently, only a single key `sshTimeout` can be set here.

##### sshTimeout (optional)
Defines the number of attempts that the master will try to connect to on demand created worker instances via ssh.
Attempts have a pause of `2^(attempts+2)` seconds in between. Default value is 4.

#### autoMount (optional)
> **Warning:** If a volume has an obscure filesystem, this might overwrite your data!

Expand Down Expand Up @@ -149,7 +160,7 @@ This is required if your provider has any post-launch services interfering with
seemingly random errors can occur when the service interrupts ansible's execution. Services are
listed on [de.NBI Wiki](https://cloud.denbi.de/wiki/) at `Computer Center Specific` (not yet).

####
#### gateway (optional)
In order to save valuable floating ips, BiBiGrid can also make use of a gateway to create the cluster.
For more information on how to set up a gateway, how gateways work and why they save floating ips please continue reading [here](https://cloud.denbi.de/wiki/Tutorials/SaveFloatingIPs/).

Expand Down
16 changes: 8 additions & 8 deletions resources/playbook/roles/bibigrid/files/slurm/create_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def start_server(worker, start_worker_group, start_data):
server_start_data["other_openstack_exception"].append(worker)


def check_ssh_active(private_ip, private_key="/opt/slurm/.ssh/id_ecdsa", username="ubuntu", timeout=7):
def check_ssh_active(private_ip, private_key="/opt/slurm/.ssh/id_ecdsa", username="ubuntu"):
"""
Waits until SSH connects successful. This guarantees that the node can be reached via Ansible.
@param private_ip: ip of node
Expand All @@ -138,8 +138,8 @@ def check_ssh_active(private_ip, private_key="/opt/slurm/.ssh/id_ecdsa", usernam
establishing_connection = False
except paramiko.ssh_exception.NoValidConnectionsError as exc:
logging.info("Attempting to connect to %s... This might take a while", private_ip)
if attempts < timeout:
time.sleep(2 ** attempts)
if attempts < common_config["cloud_scheduling"]["sshTimeout"]:
time.sleep(2 ** (2+attempts))
attempts += 1
else:
logging.warning("Attempt to connect to %s failed.", private_ip)
Expand Down Expand Up @@ -213,16 +213,16 @@ def _run_playbook(cmdline_args):
worker_groups = []
for filename in os.listdir(GROUP_VARS_PATH):
if filename != "master.yml":
f = os.path.join(GROUP_VARS_PATH, filename)
worker_group_yaml_file = os.path.join(GROUP_VARS_PATH, filename)
# checking if it is a file
if os.path.isfile(f):
with open(f, mode="r", encoding="utf-8") as worker_group:
worker_groups.append(yaml.safe_load(worker_group))
if os.path.isfile(worker_group_yaml_file):
with open(worker_group_yaml_file, mode="r", encoding="utf-8") as worker_group_yaml:
worker_groups.append(yaml.safe_load(worker_group_yaml))

# read common configuration
with open("/opt/playbook/vars/common_configuration.yml", mode="r", encoding="utf-8") as common_configuration_file:
common_config = yaml.safe_load(common_configuration_file)

logging.info(f"Maximum 'is active' attempts: {common_config['cloud_scheduling']['sshTimeout']}")
# read clouds.yaml
with open("/etc/openstack/clouds.yaml", mode="r", encoding="utf-8") as clouds_file:
clouds = yaml.safe_load(clouds_file)["clouds"]
Expand Down

0 comments on commit 55a06f3

Please sign in to comment.