From 76843c83b3d30cf640e3477aab009d33177397dd Mon Sep 17 00:00:00 2001 From: tirami-su Date: Fri, 15 Jul 2016 15:39:28 +0200 Subject: [PATCH 1/6] project restructured, add-slaves and remove-slaves actions added to spark_ec2.py --- .../root/spark-ec2/ec2-variables.sh | 2 - setup-slave.sh | 2 +- setup.sh | 12 +- spark_ec2.py | 435 +++++++++++++----- 4 files changed, 328 insertions(+), 123 deletions(-) diff --git a/deploy.generic/root/spark-ec2/ec2-variables.sh b/deploy.generic/root/spark-ec2/ec2-variables.sh index 4f3e8da8..96e1b217 100644 --- a/deploy.generic/root/spark-ec2/ec2-variables.sh +++ b/deploy.generic/root/spark-ec2/ec2-variables.sh @@ -18,8 +18,6 @@ # # These variables are automatically filled in by the spark-ec2 script. -export MASTERS="{{master_list}}" -export SLAVES="{{slave_list}}" export HDFS_DATA_DIRS="{{hdfs_data_dirs}}" export MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}" export SPARK_LOCAL_DIRS="{{spark_local_dirs}}" diff --git a/setup-slave.sh b/setup-slave.sh index 76372d9a..4b4ffc7a 100755 --- a/setup-slave.sh +++ b/setup-slave.sh @@ -131,4 +131,4 @@ popd > /dev/null # this is to set the ulimit for root and other users echo '* soft nofile 1000000' >> /etc/security/limits.conf -echo '* hard nofile 1000000' >> /etc/security/limits.conf \ No newline at end of file +echo '* hard nofile 1000000' >> /etc/security/limits.conf diff --git a/setup.sh b/setup.sh index 5a3beea2..af720303 100755 --- a/setup.sh +++ b/setup.sh @@ -29,14 +29,10 @@ export HOSTNAME=$PRIVATE_DNS # Fix the bash built-in hostname variable too echo "Setting up Spark on `hostname`..." -# Set up the masters, slaves, etc files based on cluster env variables -echo "$MASTERS" > masters -echo "$SLAVES" > slaves - -MASTERS=`cat masters` +export MASTERS=`cat masters` NUM_MASTERS=`cat masters | wc -l` OTHER_MASTERS=`cat masters | sed '1d'` -SLAVES=`cat slaves` +export SLAVES=`cat slaves` SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=5" if [[ "x$JAVA_HOME" == "x" ]] ; then @@ -107,7 +103,9 @@ chmod u+x /root/spark/conf/spark-env.sh for module in $MODULES; do echo "Setting up $module" module_setup_start_time="$(date +'%s')" - source ./$module/setup.sh + if [[ -e $module/setup.sh ]]; then + source ./$module/setup.sh + fi sleep 0.1 module_setup_end_time="$(date +'%s')" echo_time_diff "$module setup" "$module_setup_start_time" "$module_setup_end_time" diff --git a/spark_ec2.py b/spark_ec2.py index 95e1f584..68f6624e 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -474,24 +474,10 @@ def get_spark_ami(opts): return ami -# Launch a cluster of the given name, by setting up its security groups, -# and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves -# Fails if there already instances running in the cluster's groups. -def launch_cluster(conn, opts, cluster_name): - if opts.identity_file is None: - print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) - sys.exit(1) - - if opts.key_pair is None: - print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) - sys.exit(1) - - user_data_content = None - if opts.user_data: - with open(opts.user_data) as user_data_file: - user_data_content = user_data_file.read() - +def set_security_groups(conn, opts, cluster_name): + """ + Set security groups(acts as virtual firewall) for masters and slaves instances. + """ print("Setting up security groups...") master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) @@ -557,33 +543,10 @@ def launch_cluster(conn, opts, cluster_name): slave_group.authorize('tcp', 50075, 50075, authorized_address) slave_group.authorize('tcp', 60060, 60060, authorized_address) slave_group.authorize('tcp', 60075, 60075, authorized_address) + return (master_group, slave_group) - # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, - die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): - print("ERROR: There are already instances running in group %s or %s" % - (master_group.name, slave_group.name), file=stderr) - sys.exit(1) - - # Figure out Spark AMI - if opts.ami is None: - opts.ami = get_spark_ami(opts) - - # we use group ids to work around https://github.com/boto/boto/issues/350 - additional_group_ids = [] - if opts.additional_security_group: - additional_group_ids = [sg.id - for sg in conn.get_all_security_groups() - if opts.additional_security_group in (sg.name, sg.id)] - print("Launching instances...") - - try: - image = conn.get_all_images(image_ids=[opts.ami])[0] - except: - print("Could not find AMI " + opts.ami, file=stderr) - sys.exit(1) +def create_block_device(opts): # Create block device mapping so that we can add EBS volumes if asked to. # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz block_map = BlockDeviceMapping() @@ -603,17 +566,78 @@ def launch_cluster(conn, opts, cluster_name): # The first ephemeral drive is /dev/sdb. name = '/dev/sd' + string.ascii_letters[i + 1] block_map[name] = dev + return block_map + + +def get_user_data_content(opts): + user_data_content = None + if opts.user_data: + with open(opts.user_data) as user_data_file: + user_data_content = user_data_file.read() + return user_data_content + + +def get_additional_group_ids(conn, opts): + # we use group ids to work around https://github.com/boto/boto/issues/350 + additional_group_ids = [] + if opts.additional_security_group: + additional_group_ids = [sg.id + for sg in conn.get_all_security_groups() + if opts.additional_security_group in (sg.name, sg.id)] + return additional_group_ids + + +def launch_master(conn, opts, cluster_name, master_group, image, existing_masters): + additional_group_ids = get_additional_group_ids(conn, opts) + block_map = create_block_device(opts) + user_data_content = get_user_data_content(opts) + + # Launch or resume masters + if existing_masters: + print("Starting master...") + for inst in existing_masters: + if inst.state not in ["shutting-down", "terminated"]: + inst.start() + master_nodes = existing_masters + else: + master_type = opts.master_instance_type + if master_type == "": + master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name + master_res = image.run( + key_name=opts.key_pair, + security_group_ids=[master_group.id] + additional_group_ids, + instance_type=master_type, + placement=opts.zone, + min_count=1, + max_count=1, + block_device_map=block_map, + subnet_id=opts.subnet_id, + placement_group=opts.placement_group, + user_data=user_data_content, + instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, + instance_profile_name=opts.instance_profile_name) + + master_nodes = master_res.instances + print("Launched master in %s, regid = %s" % (opts.zone, master_res.id)) + return master_nodes + + +def launch_slaves(conn, opts, cluster_name, slave_group, image): + additional_group_ids = get_additional_group_ids(conn, opts) + block_map = create_block_device(opts) + user_data_content = get_user_data_content(opts) # Launch slaves - if opts.spot_price is not None: + if opts.spot_price: # Launch spot instances with the requested price print("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) - i = 0 my_req_ids = [] - for zone in zones: + for i, zone in enumerate(zones): num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) slave_reqs = conn.request_spot_instances( price=opts.spot_price, @@ -630,7 +654,6 @@ def launch_cluster(conn, opts, cluster_name): user_data=user_data_content, instance_profile_name=opts.instance_profile_name) my_req_ids += [req.id for req in slave_reqs] - i += 1 print("Waiting for spot instances to be granted...") try: @@ -668,9 +691,8 @@ def launch_cluster(conn, opts, cluster_name): # Launch non-spot instances zones = get_zones(conn, opts) num_zones = len(zones) - i = 0 slave_nodes = [] - for zone in zones: + for i, zone in enumerate(zones): num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) if num_slaves_this_zone > 0: slave_res = image.run( @@ -692,42 +714,25 @@ def launch_cluster(conn, opts, cluster_name): plural_s=('' if num_slaves_this_zone == 1 else 's'), z=zone, r=slave_res.id)) - i += 1 + return slave_nodes - # Launch or resume masters - if existing_masters: - print("Starting master...") - for inst in existing_masters: - if inst.state not in ["shutting-down", "terminated"]: - inst.start() - master_nodes = existing_masters - else: - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type - if opts.zone == 'all': - opts.zone = random.choice(conn.get_all_zones()).name - master_res = image.run( - key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, - placement=opts.zone, - min_count=1, - max_count=1, - block_device_map=block_map, - subnet_id=opts.subnet_id, - placement_group=opts.placement_group, - user_data=user_data_content, - instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, - instance_profile_name=opts.instance_profile_name) - master_nodes = master_res.instances - print("Launched master in %s, regid = %s" % (zone, master_res.id)) +def get_image(conn, opts): + image = None - # This wait time corresponds to SPARK-4983 - print("Waiting for AWS to propagate instance metadata...") - time.sleep(15) + # Figure out Spark AMI + if opts.ami is None: + opts.ami = get_spark_ami(opts) + try: + image = conn.get_all_images(image_ids=[opts.ami])[0] + except: + print("Could not find AMI " + opts.ami, file=stderr) + sys.exit(1) + return image + + +def add_tags(opts, cluster_name, nodes, role): # Give the instances descriptive names and set additional tags additional_tags = {} if opts.additional_tags.strip(): @@ -735,20 +740,143 @@ def launch_cluster(conn, opts, cluster_name): map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') ) - for master in master_nodes: - master.add_tags( - dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + for node in nodes: + node.add_tags( + dict(additional_tags, Name='{cn}-{role}-{iid}'.format(cn=cluster_name, + role=role, + iid=node.id)) ) - for slave in slave_nodes: - slave.add_tags( - dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) - ) + +def check_certificates(opts): + if opts.identity_file is None: + print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) + sys.exit(1) + + if opts.key_pair is None: + print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) + sys.exit(1) + + +# Launch a cluster of the given name, by setting up its security groups, +# and then starting new instances in them. +# Returns a tuple of EC2 reservation objects for the master and slaves +# Fails if there already instances running in the cluster's groups. +def launch_cluster(conn, opts, cluster_name): + check_certificates(opts) + + master_group, slave_group = set_security_groups(conn, opts, cluster_name) + + # Check if instances are already running in our groups + existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if existing_slaves or (existing_masters and not opts.use_existing_master): + print("ERROR: There are already instances running in group %s or %s" % + (master_group.name, slave_group.name), file=stderr) + sys.exit(1) + + print("Launching instances...") + image = get_image(conn, opts) + + slave_nodes = launch_slaves(conn, opts, cluster_name, + slave_group, image) + master_nodes = launch_master(conn, opts, cluster_name, + master_group, image, existing_masters) + + # This wait time corresponds to SPARK-4983 + print("Waiting for AWS to propagate instance metadata...") + time.sleep(15) + + add_tags(opts, cluster_name, master_nodes, role="master") + add_tags(opts, cluster_name, slave_nodes, role="slave") + + # Return all the instances + return (master_nodes, slave_nodes) + + +def add_slaves(conn, opts, cluster_name): + check_certificates(opts) + + master_group, slave_group = set_security_groups(conn, opts, cluster_name) + + # Check if instances are already running in our groups + existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if not existing_masters: + print("ERROR: There is no master running in group %s." % + (master_group.name), file=stderr) + sys.exit(1) + + print("Launching instances...") + image = get_image(conn, opts) + + slave_nodes = launch_slaves(conn, opts, cluster_name, + slave_group, image) + master_nodes = existing_masters + + # This wait time corresponds to SPARK-4983 + print("Waiting for AWS to propagate instance metadata...") + time.sleep(15) + + add_tags(opts, cluster_name, slave_nodes, role="slave") # Return all the instances return (master_nodes, slave_nodes) +def remove_slaves(conn, opts, cluster_name): + master_nodes, slave_nodes = get_existing_cluster(conn, opts, + cluster_name, die_on_error=False) + + if not master_nodes: + print("ERROR: There is no running master in this cluster: %s" % + (cluster_name), file=stderr) + sys.exit(1) + + master = get_dns_name(master_nodes[0], opts.private_ips) + + if not slave_nodes: + return ; + + slave_to_remove = [] + if opts.slaves == -1 or opts.slaves >= len(slave_nodes): + slave_to_remove = slave_nodes + print("All slaves will be terminated.") + else: + print("The following slave instance(s) will be terminated:") + for _ in range(opts.slaves): + slave_to_remove.append(random.choice(slave_nodes)) + + for inst in slave_to_remove: + print("> %s" % get_dns_name(inst, opts.private_ips)) + + msg = "Are you sure you want to delete " \ + "these slave(s) on cluster {c} ? (y/N) ".format(c=cluster_name) + response = raw_input(msg) + if response == "y": + print("Terminating slave(s)...") + for inst in slave_to_remove: + inst.terminate() + + master_nodes, slave_nodes = get_existing_cluster(conn, opts, + cluster_name, die_on_error=False) + + master = get_dns_name(master_nodes[0], opts.private_ips) + + print("Deploying files to master...") + deploy_files( + conn=conn, + root_dir=SPARK_EC2_DIR + "/" + "entities.generic", + opts=opts, + master_nodes=master_nodes, + slave_nodes=slave_nodes, + modules=[] + ) + + print("Propagate slaves files...") + ssh(master, opts, "chmod u+x /root/spark-ec2/update_entities.sh") + ssh(master, opts, "/root/spark-ec2/update_entities.sh") + def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. @@ -787,9 +915,47 @@ def get_instances(group_names): return (master_instances, slave_instances) +def transfert_SSH_key(opts, master, slave_nodes): + """ + Transfert SSH key to slaves + """ + dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to slaves...") + for slave in slave_nodes: + slave_address = get_dns_name(slave, opts.private_ips) + print(slave_address) + ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) + + +def deploy_repository(opts, master): + """ + Deploy spark_ec2 repo to master + """ + print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( + r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) + ssh( + host=master, + opts=opts, + command="rm -rf spark-ec2" + + " && " + + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo, + b=opts.spark_ec2_git_branch) + ) + + +def setup_spark_cluster(master, opts, setup_script): + ssh(master, opts, "chmod u+x {setup_script}".format(setup_script=setup_script)) + ssh(master, opts, "{setup_script}".format(setup_script=setup_script)) + print("Spark standalone cluster started at http://%s:8080" % master) + + if opts.ganglia: + print("Ganglia started at http://%s:5080/ganglia" % master) + + # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): + setup_script = "spark-ec2/setup.sh" master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: print("Generating cluster's SSH key on master...") @@ -799,18 +965,13 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ ssh(master, opts, key_setup) - dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print("Transferring cluster's SSH key to slaves...") - for slave in slave_nodes: - slave_address = get_dns_name(slave, opts.private_ips) - print(slave_address) - ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) + transfert_SSH_key(opts, master, slave_nodes) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio'] if opts.hadoop_major_version == "1": - modules = list(filter(lambda x: x != "mapreduce", modules)) + modules = [module for module in modules if module != "mapreduce"] if opts.ganglia: modules.append('ganglia') @@ -821,16 +982,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten - print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( - r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) - ssh( - host=master, - opts=opts, - command="rm -rf spark-ec2" - + " && " - + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo, - b=opts.spark_ec2_git_branch) - ) + deploy_repository(opts, master) print("Deploying files to master...") deploy_files( @@ -841,6 +993,15 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): slave_nodes=slave_nodes, modules=modules ) + + deploy_files( + conn=conn, + root_dir=SPARK_EC2_DIR + "/" + "entities.generic", + opts=opts, + master_nodes=master_nodes, + slave_nodes=slave_nodes, + modules=[] + ) if opts.deploy_root_dir is not None: print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) @@ -851,17 +1012,46 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): ) print("Running setup on master...") - setup_spark_cluster(master, opts) + setup_spark_cluster(master, opts, setup_script) print("Done!") -def setup_spark_cluster(master, opts): - ssh(master, opts, "chmod u+x spark-ec2/setup.sh") - ssh(master, opts, "spark-ec2/setup.sh") - print("Spark standalone cluster started at http://%s:8080" % master) +def setup_new_slaves(conn, new_slave_nodes, opts, cluster_name, deploy_ssh_key): + setup_script = "spark-ec2/setup_new_slaves.sh" + master_nodes, all_slave_nodes = get_existing_cluster(conn, opts, cluster_name) + master = get_dns_name(master_nodes[0], opts.private_ips) + if deploy_ssh_key: + transfert_SSH_key(opts, master, new_slave_nodes) - if opts.ganglia: - print("Ganglia started at http://%s:5080/ganglia" % master) + deploy_files( + conn=conn, + root_dir=SPARK_EC2_DIR + "/" + "new_slaves.generic", + opts=opts, + master_nodes=master_nodes, + slave_nodes=new_slave_nodes, + modules=[] + ) + + deploy_files( + conn=conn, + root_dir=SPARK_EC2_DIR + "/" + "entities.generic", + opts=opts, + master_nodes=master_nodes, + slave_nodes=all_slave_nodes, + modules=[] + ) + + if opts.deploy_root_dir is not None: + print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) + deploy_user_files( + root_dir=opts.deploy_root_dir, + opts=opts, + master_nodes=master_nodes + ) + + print("Running setup on master...") + setup_spark_cluster(master, opts, setup_script) + print("Done!") def is_ssh_available(host, opts, print_ssh_output=True): @@ -1058,7 +1248,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) tachyon_v = "" print("Deploying Spark via git hash; Tachyon won't be set up") - modules = filter(lambda x: x != "tachyon", modules) + modules = [module for module in modules if module != "tachyon"] master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] @@ -1357,6 +1547,25 @@ def real_main(): ) setup_cluster(conn, master_nodes, slave_nodes, opts, True) + elif action == "add-slaves": + if opts.slaves <= 0: + print("ERROR: You must start at least 1 slave.", file=sys.stderr) + sys.exit(1) + master_nodes, slave_nodes = add_slaves(conn, opts, cluster_name) + wait_for_cluster_state( + conn=conn, + opts=opts, + cluster_instances=(master_nodes + slave_nodes), + cluster_state='ssh-ready' + ) + setup_new_slaves(conn, slave_nodes, opts, cluster_name, True) + + elif action == "remove-slaves": + if opts.slaves <= 0 and opts.slaves != -1: + print("ERROR: You must remove at least 1 slave.", file=sys.stderr) + sys.exit(1) + remove_slaves(conn, opts, cluster_name) + elif action == "destroy": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) From 8c696bc8ee10b0793daedf0a541b2f92dfa81002 Mon Sep 17 00:00:00 2001 From: tirami-su Date: Fri, 15 Jul 2016 15:40:13 +0200 Subject: [PATCH 2/6] new generics files added --- entities.generic/root/spark-ec2/masters | 1 + entities.generic/root/spark-ec2/slaves | 1 + new_slaves.generic/root/spark-ec2/new_slaves | 1 + 3 files changed, 3 insertions(+) create mode 100644 entities.generic/root/spark-ec2/masters create mode 100644 entities.generic/root/spark-ec2/slaves create mode 100644 new_slaves.generic/root/spark-ec2/new_slaves diff --git a/entities.generic/root/spark-ec2/masters b/entities.generic/root/spark-ec2/masters new file mode 100644 index 00000000..c531652d --- /dev/null +++ b/entities.generic/root/spark-ec2/masters @@ -0,0 +1 @@ +{{master_list}} diff --git a/entities.generic/root/spark-ec2/slaves b/entities.generic/root/spark-ec2/slaves new file mode 100644 index 00000000..05f969e0 --- /dev/null +++ b/entities.generic/root/spark-ec2/slaves @@ -0,0 +1 @@ +{{slave_list}} diff --git a/new_slaves.generic/root/spark-ec2/new_slaves b/new_slaves.generic/root/spark-ec2/new_slaves new file mode 100644 index 00000000..05f969e0 --- /dev/null +++ b/new_slaves.generic/root/spark-ec2/new_slaves @@ -0,0 +1 @@ +{{slave_list}} From c346112e5ad897333df173697a412f8a395106d8 Mon Sep 17 00:00:00 2001 From: tirami-su Date: Fri, 15 Jul 2016 15:41:08 +0200 Subject: [PATCH 3/6] setup_new_slaves script added along with scripts needed by each module to setup a new slave. --- ephemeral-hdfs/init_new_slaves.sh | 3 + ephemeral-hdfs/setup_new_slaves.sh | 38 +++++++++ ganglia/init_new_slaves.sh | 9 +++ ganglia/setup_new_slaves.sh | 7 ++ mapreduce/init_new_slaves.sh | 3 + mapreduce/setup_new_slaves.sh | 9 +++ persistent-hdfs/init_new_slaves.sh | 3 + persistent-hdfs/setup_new_slaves.sh | 21 +++++ scala/setup_new_slaves.sh | 3 + setup_new_slaves.sh | 114 +++++++++++++++++++++++++++ spark-standalone/setup_new_slaves.sh | 18 +++++ spark/setup_new_slaves.sh | 3 + tachyon/setup_new_slaves.sh | 9 +++ 13 files changed, 240 insertions(+) create mode 100755 ephemeral-hdfs/init_new_slaves.sh create mode 100755 ephemeral-hdfs/setup_new_slaves.sh create mode 100644 ganglia/init_new_slaves.sh create mode 100644 ganglia/setup_new_slaves.sh create mode 100755 mapreduce/init_new_slaves.sh create mode 100755 mapreduce/setup_new_slaves.sh create mode 100755 persistent-hdfs/init_new_slaves.sh create mode 100755 persistent-hdfs/setup_new_slaves.sh create mode 100755 scala/setup_new_slaves.sh create mode 100755 setup_new_slaves.sh create mode 100755 spark-standalone/setup_new_slaves.sh create mode 100755 spark/setup_new_slaves.sh create mode 100755 tachyon/setup_new_slaves.sh diff --git a/ephemeral-hdfs/init_new_slaves.sh b/ephemeral-hdfs/init_new_slaves.sh new file mode 100755 index 00000000..88532e78 --- /dev/null +++ b/ephemeral-hdfs/init_new_slaves.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/ephemeral-hdfs diff --git a/ephemeral-hdfs/setup_new_slaves.sh b/ephemeral-hdfs/setup_new_slaves.sh new file mode 100755 index 00000000..ff9139e8 --- /dev/null +++ b/ephemeral-hdfs/setup_new_slaves.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +EPHEMERAL_HDFS=/root/ephemeral-hdfs + +# Set hdfs url to make it easier +export HDFS_URL="hdfs://$PUBLIC_DNS:9000" + +pushd /root/spark-ec2/ephemeral-hdfs > /dev/null + +for node in $NEW_SLAVES; do + echo $node + ssh -t -t $SSH_OPTS root@$node "/root/spark-ec2/ephemeral-hdfs/setup-slave.sh" & sleep 0.3 +done +wait + +/root/spark-ec2/copy-dir $EPHEMERAL_HDFS/conf + +echo "Starting ephemeral HDFS..." + +# This is different depending on version. +case "$HADOOP_MAJOR_VERSION" in + 1) + $EPHEMERAL_HDFS/bin/start-dfs.sh + ;; + 2) + $EPHEMERAL_HDFS/sbin/start-dfs.sh + ;; + yarn) + $EPHEMERAL_HDFS/sbin/start-dfs.sh + echo "Starting YARN" + $EPHEMERAL_HDFS/sbin/start-yarn.sh + ;; + *) + echo "ERROR: Unknown Hadoop version" + return -1 +esac + +popd > /dev/null diff --git a/ganglia/init_new_slaves.sh b/ganglia/init_new_slaves.sh new file mode 100644 index 00000000..f5a17b8f --- /dev/null +++ b/ganglia/init_new_slaves.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# Install ganglia on new slaves +# TODO: Remove this once the AMI has ganglia by default + +for node in $NEW_SLAVES; do + ssh -t -t $SSH_OPTS root@$node "if ! rpm --quiet -q $GANGLIA_PACKAGES; then yum install -q -y $GANGLIA_PACKAGES; fi" & sleep 0.3 +done +wait diff --git a/ganglia/setup_new_slaves.sh b/ganglia/setup_new_slaves.sh new file mode 100644 index 00000000..1a979fed --- /dev/null +++ b/ganglia/setup_new_slaves.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /etc/ganglia/ + +for node in $NEW_SLAVES; do + ssh -t -t $SSH_OPTS root@$node "/etc/init.d/gmond restart" +done diff --git a/mapreduce/init_new_slaves.sh b/mapreduce/init_new_slaves.sh new file mode 100755 index 00000000..75a4a013 --- /dev/null +++ b/mapreduce/init_new_slaves.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/mapreduce diff --git a/mapreduce/setup_new_slaves.sh b/mapreduce/setup_new_slaves.sh new file mode 100755 index 00000000..711976f5 --- /dev/null +++ b/mapreduce/setup_new_slaves.sh @@ -0,0 +1,9 @@ +#!/bin/bash +MAPREDUCE=/root/mapreduce + +for node in $NEW_SLAVES; do + ssh -t $SSH_OPTS root@$node "mkdir -p /mnt/mapreduce/logs && chown hadoop:hadoop /mnt/mapreduce/logs && chown hadoop:hadoop /mnt/mapreduce" & sleep 0.3 +done +wait + +/root/spark-ec2/copy-dir $MAPREDUCE/conf diff --git a/persistent-hdfs/init_new_slaves.sh b/persistent-hdfs/init_new_slaves.sh new file mode 100755 index 00000000..3da690af --- /dev/null +++ b/persistent-hdfs/init_new_slaves.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/persistent-hdfs diff --git a/persistent-hdfs/setup_new_slaves.sh b/persistent-hdfs/setup_new_slaves.sh new file mode 100755 index 00000000..85bcda45 --- /dev/null +++ b/persistent-hdfs/setup_new_slaves.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +PERSISTENT_HDFS=/root/persistent-hdfs + +pushd /root/spark-ec2/persistent-hdfs > /dev/null + +for node in $NEW_SLAVES; do + ssh -t $SSH_OPTS root@$node "/root/spark-ec2/persistent-hdfs/setup-slave.sh" & sleep 0.3 +done +wait + +/root/spark-ec2/copy-dir $PERSISTENT_HDFS/conf + +if [[ ! -e /vol/persistent-hdfs/dfs/name ]] ; then + echo "Formatting persistent HDFS namenode..." + $PERSISTENT_HDFS/bin/hadoop namenode -format +fi + +echo "Persistent HDFS installed, won't start by default..." + +popd > /dev/null diff --git a/scala/setup_new_slaves.sh b/scala/setup_new_slaves.sh new file mode 100755 index 00000000..6aa5d27b --- /dev/null +++ b/scala/setup_new_slaves.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/scala diff --git a/setup_new_slaves.sh b/setup_new_slaves.sh new file mode 100755 index 00000000..b74b0b04 --- /dev/null +++ b/setup_new_slaves.sh @@ -0,0 +1,114 @@ +#!/bin/bash + +# usage: echo_time_diff name start_time end_time +echo_time_diff () { + local format='%Hh %Mm %Ss' + + local diff_secs="$(($3-$2))" + echo "[timing] $1: " "$(date -u -d@"$diff_secs" +"$format")" +} + +# Make sure we are in the spark-ec2 directory +pushd /root/spark-ec2 > /dev/null + +# Load the environment variables specific to this AMI +source /root/.bash_profile + +# Load the cluster variables set by the deploy script +source ec2-variables.sh + +# Set hostname based on EC2 private DNS name, so that it is set correctly +# even if the instance is restarted with a different private DNS name +PRIVATE_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname` +PUBLIC_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/hostname` +hostname $PRIVATE_DNS +echo $PRIVATE_DNS > /etc/hostname +export HOSTNAME=$PRIVATE_DNS # Fix the bash built-in hostname variable too + +echo "Setting up Spark on `hostname`..." + +export MASTERS=`cat masters` +NUM_MASTERS=`cat masters | wc -l` +OTHER_MASTERS=`cat masters | sed '1d'` +export SLAVES=`cat slaves` +export NEW_SLAVES=`cat new_slaves` +SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=5" + +if [[ "x$JAVA_HOME" == "x" ]] ; then + echo "Expected JAVA_HOME to be set in .bash_profile!" + exit 1 +fi + +if [[ `tty` == "not a tty" ]] ; then + echo "Expecting a tty or pty! (use the ssh -t option)." + exit 1 +fi + +echo "Setting executable permissions on scripts..." +find . -regex "^.+.\(sh\|py\)" | xargs chmod a+x + +echo "RSYNC'ing /root/spark-ec2 to other cluster nodes..." +rsync_start_time="$(date +'%s')" +for node in $SLAVES $OTHER_MASTERS; do + echo $node + rsync -e "ssh $SSH_OPTS" -az /root/spark-ec2 $node:/root & + scp $SSH_OPTS ~/.ssh/id_rsa $node:.ssh & + sleep 0.1 +done +wait +rsync_end_time="$(date +'%s')" +echo_time_diff "rsync /root/spark-ec2" "$rsync_start_time" "$rsync_end_time" + +echo "Running setup-slave on new slave nodes to mount filesystems, etc..." +setup_slave_start_time="$(date +'%s')" +pssh --inline \ + --host "$NEW_SLAVES" \ + --user root \ + --extra-args "-t -t $SSH_OPTS" \ + --timeout 0 \ + "spark-ec2/setup-slave.sh" +setup_slave_end_time="$(date +'%s')" +echo_time_diff "setup-slave" "$setup_slave_start_time" "$setup_slave_end_time" + +# Always include 'scala' module if it's not defined as a work around +# for older versions of the scripts. +if [[ ! $MODULES =~ *scala* ]]; then + MODULES=$(printf "%s\n%s\n" "scala" $MODULES) +fi + +# Install / Init module +for module in $MODULES; do + echo "Initializing $module" + module_init_start_time="$(date +'%s')" + if [[ -e $module/init_new_slaves.sh ]]; then + source $module/init_new_slaves.sh + fi + module_init_end_time="$(date +'%s')" + echo_time_diff "$module init" "$module_init_start_time" "$module_init_end_time" + cd /root/spark-ec2 # guard against init.sh changing the cwd +done + +# Deploy templates +# TODO: Move configuring templates to a per-module ? +echo "Creating local config files..." +./deploy_templates.py + +# Copy spark conf by default +echo "Deploying Spark config files..." +chmod u+x /root/spark/conf/spark-env.sh +/root/spark-ec2/copy-dir /root/spark/conf + +# Setup each module +for module in $MODULES; do + echo "Setting up $module" + module_setup_start_time="$(date +'%s')" + if [[ -e $module/setup_new_slaves.sh ]]; then + source ./$module/setup_new_slaves.sh + fi + sleep 0.1 + module_setup_end_time="$(date +'%s')" + echo_time_diff "$module setup" "$module_setup_start_time" "$module_setup_end_time" + cd /root/spark-ec2 # guard against setup.sh changing the cwd +done + +popd > /dev/null diff --git a/spark-standalone/setup_new_slaves.sh b/spark-standalone/setup_new_slaves.sh new file mode 100755 index 00000000..2f1415da --- /dev/null +++ b/spark-standalone/setup_new_slaves.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +BIN_FOLDER="/root/spark/sbin" + +if [[ "0.7.3 0.8.0 0.8.1" =~ $SPARK_VERSION ]]; then + BIN_FOLDER="/root/spark/bin" +fi + +# Copy the slaves to spark conf +cp /root/spark-ec2/slaves /root/spark/conf/ +/root/spark-ec2/copy-dir /root/spark/conf + +# Set cluster-url to standalone master +echo "spark://""`cat /root/spark-ec2/masters`"":7077" > /root/spark-ec2/cluster-url +/root/spark-ec2/copy-dir /root/spark-ec2 + +# Start Workers +$BIN_FOLDER/start-slaves.sh diff --git a/spark/setup_new_slaves.sh b/spark/setup_new_slaves.sh new file mode 100755 index 00000000..9035d438 --- /dev/null +++ b/spark/setup_new_slaves.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/spark diff --git a/tachyon/setup_new_slaves.sh b/tachyon/setup_new_slaves.sh new file mode 100755 index 00000000..8d946abc --- /dev/null +++ b/tachyon/setup_new_slaves.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +/root/spark-ec2/copy-dir /root/tachyon + +/root/tachyon/bin/tachyon format + +sleep 1 + +/root/tachyon/bin/tachyon-start.sh all Mount From 2ce9eaafaf7a2734a18e382b21c690daa2c1291e Mon Sep 17 00:00:00 2001 From: tirami-su Date: Fri, 15 Jul 2016 15:41:30 +0200 Subject: [PATCH 4/6] update_entities will be used to update slaves and masters files on each remaining instance when slaves are removed. --- update_entities.sh | 73 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100755 update_entities.sh diff --git a/update_entities.sh b/update_entities.sh new file mode 100755 index 00000000..fbba0c90 --- /dev/null +++ b/update_entities.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +# usage: echo_time_diff name start_time end_time +echo_time_diff () { + local format='%Hh %Mm %Ss' + + local diff_secs="$(($3-$2))" + echo "[timing] $1: " "$(date -u -d@"$diff_secs" +"$format")" +} + +# Make sure we are in the spark-ec2 directory +pushd /root/spark-ec2 > /dev/null + +# Load the environment variables specific to this AMI +source /root/.bash_profile + +# Load the cluster variables set by the deploy script +source ec2-variables.sh + +# Set hostname based on EC2 private DNS name, so that it is set correctly +# even if the instance is restarted with a different private DNS name +PRIVATE_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname` +PUBLIC_DNS=`wget -q -O - http://169.254.169.254/latest/meta-data/hostname` +hostname $PRIVATE_DNS +export HOSTNAME=$PRIVATE_DNS # Fix the bash built-in hostname variable too + +echo "Setting up Spark on `hostname`..." + +export MASTERS=`cat masters` +NUM_MASTERS=`cat masters | wc -l` +OTHER_MASTERS=`cat masters | sed '1d'` +export SLAVES=`cat slaves` +SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=5" + +if [[ "x$JAVA_HOME" == "x" ]] ; then + echo "Expected JAVA_HOME to be set in .bash_profile!" + exit 1 +fi + +if [[ `tty` == "not a tty" ]] ; then + echo "Expecting a tty or pty! (use the ssh -t option)." + exit 1 +fi + +echo "Setting executable permissions on scripts..." +find . -regex "^.+.\(sh\|py\)" | xargs chmod a+x + +# Always include 'scala' module if it's not defined as a work around +# for older versions of the scripts. +if [[ ! $MODULES =~ *scala* ]]; then + MODULES=$(printf "%s\n%s\n" "scala" $MODULES) +fi + +cp /root/spark-ec2/slaves /root/spark/conf/ + +# Deploy templates +# TODO: Move configuring templates to a per-module ? +echo "Creating local config files..." +./deploy_templates.py + +# Updating config folder for each module +for module in $MODULES; do + echo "Updating config files..." + module_update_start_time="$(date +'%s')" + if [[ -d $module/conf ]]; then + /root/spark-ec2/copy-dir ./$module/conf + fi + sleep 0.1 + module_update_end_time="$(date +'%s')" + echo_time_diff "$module update" "$module_update_start_time" "$module_update_end_time" +done + +popd > /dev/null From ff11a07113d0e2a5cc3a687ba761101e3f8ed82d Mon Sep 17 00:00:00 2001 From: Abdul-Raheman MouhamadSultane Date: Sun, 17 Jul 2016 19:51:56 +0200 Subject: [PATCH 5/6] Bug corrected: selection slaves to remove. remove_slaves func modified to keep at least one slave in the cluster. --- spark_ec2.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/spark_ec2.py b/spark_ec2.py index 68f6624e..3b4a82f5 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -835,23 +835,26 @@ def remove_slaves(conn, opts, cluster_name): master = get_dns_name(master_nodes[0], opts.private_ips) - if not slave_nodes: + if not slave_nodes or len(slave_nodes) == 1: + print("Cannot remove mores slaves, " \ + "at least one slave is required by the cluster.") return ; slave_to_remove = [] - if opts.slaves == -1 or opts.slaves >= len(slave_nodes): - slave_to_remove = slave_nodes - print("All slaves will be terminated.") - else: - print("The following slave instance(s) will be terminated:") - for _ in range(opts.slaves): - slave_to_remove.append(random.choice(slave_nodes)) + nb_slave_to_remove = opts.slaves + if nb_slave_to_remove == -1 or nb_slave_to_remove >= len(slave_nodes): + nb_slave_to_remove = len(slave_nodes) - 1 + + print("The following slave instance(s) will be terminated:") + for _ in range(nb_slave_to_remove): + remain_nodes = list(set(slave_nodes) - set(slave_to_remove)) + slave_inst = random.choice(remain_nodes) + slave_to_remove.append(slave_inst) + print("> %s" % get_dns_name(slave_inst, opts.private_ips)) - for inst in slave_to_remove: - print("> %s" % get_dns_name(inst, opts.private_ips)) msg = "Are you sure you want to delete " \ - "these slave(s) on cluster {c} ? (y/N) ".format(c=cluster_name) + "the preceding slave(s) on cluster {c} ? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": print("Terminating slave(s)...") @@ -861,8 +864,6 @@ def remove_slaves(conn, opts, cluster_name): master_nodes, slave_nodes = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - master = get_dns_name(master_nodes[0], opts.private_ips) - print("Deploying files to master...") deploy_files( conn=conn, From d73aa85d9faf7917decb31e69f047e211a88a49f Mon Sep 17 00:00:00 2001 From: tirami-su Date: Wed, 27 Jul 2016 13:56:01 +0200 Subject: [PATCH 6/6] transfert -> transfer renaming --- spark_ec2.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark_ec2.py b/spark_ec2.py index 3b4a82f5..debcef6f 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -916,9 +916,9 @@ def get_instances(group_names): return (master_instances, slave_instances) -def transfert_SSH_key(opts, master, slave_nodes): +def transfer_SSH_key(opts, master, slave_nodes): """ - Transfert SSH key to slaves + Transfer SSH key to slaves """ dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print("Transferring cluster's SSH key to slaves...") @@ -966,7 +966,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ ssh(master, opts, key_setup) - transfert_SSH_key(opts, master, slave_nodes) + transfer_SSH_key(opts, master, slave_nodes) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio'] @@ -1022,7 +1022,7 @@ def setup_new_slaves(conn, new_slave_nodes, opts, cluster_name, deploy_ssh_key): master_nodes, all_slave_nodes = get_existing_cluster(conn, opts, cluster_name) master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: - transfert_SSH_key(opts, master, new_slave_nodes) + transfer_SSH_key(opts, master, new_slave_nodes) deploy_files( conn=conn,