-
Notifications
You must be signed in to change notification settings - Fork 1
/
cluster_configuration.py
165 lines (137 loc) · 5.54 KB
/
cluster_configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# Code tested on Ubuntu 18.04.
import os
import json
import paramiko
import time
# Conf variables
link_key = "gdeltKeyPair.pem"
# Liste of interfaces
print("------------------------------------------------------")
print("Importing Interfaces")
print("------------------------------------------------------")
os.system("aws ec2 describe-network-interfaces > interfaces.json")
print("Interfaces Imported")
print("------------------------------------------------------")
# Read interfaces.json file
json_file = open("interfaces.json")
interfaces_json = json.load(json_file)
interfaces = interfaces_json['NetworkInterfaces']
card_interfaces = len(interfaces)
nodes = []
slave_nodes = []
seeds_nodes = []
card_seeds = 0
max_seeds = 1 if card_interfaces <= 3 else 2
for interface in interfaces:
is_seed = False
is_slave = True
dns = interface['Association']['PublicDnsName']
private_IPv4 = interface['PrivateIpAddress']
groups = interface['Groups']
for group in groups:
group_name = group['GroupName']
if 'master' in group_name:
is_slave = False
if is_slave and (card_seeds < max_seeds):
is_seed = True
card_seeds += 1
seeds_nodes.append(private_IPv4)
if is_slave:
slave_nodes.append(private_IPv4)
node = {
'p_IPv4': private_IPv4,
'is_slave': is_slave,
'dns': dns,
'is_seed': is_seed,
}
nodes.append(node)
json_file.close()
print("Nodes distribution has been created")
print("------------------------------------------------------")
print("Starting Configuration")
print("------------------------------------------------------")
def command_status(exit_status, message):
if exit_status == 0:
print (message)
else:
print("Error", exit_status)
print("------------------------------------------------------")
k = paramiko.RSAKey.from_private_key_file(link_key)
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for node in nodes:
is_slave = node['is_slave']
is_seed = node['is_seed']
dns = node['dns']
print("Connecting to a is_slave {} at public DNS '{}' (is_seed: {})".format(is_slave, dns, is_seed))
print("------------------------------------------------------")
c.connect( hostname = dns, username = "hadoop", pkey=k)
print("Connected!")
print("------------------------------------------------------")
if is_slave:
seeds_command = "sudo sed -i 's/seeds:.*/seeds: \"{}\"/g' /etc/cassandra/conf/cassandra.yaml".format(",".join(seeds_nodes))
bootstrap_command = "sudo sed -i 's/auto_bootstrap:.*/auto_bootstrap: true/g' /etc/cassandra/conf/cassandra.yaml"
stdin, stdout, stderr = c.exec_command(seeds_command)
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Seeds has been added to {}".format(dns))
if not is_seed:
stdin, stdout, stderr = c.exec_command(bootstrap_command)
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Auto Bootstrap true fixed")
stdin, stdout, stderr = c.exec_command("sudo service cassandra start")
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Starting Cassandra")
print("Now a 10s delay to not Rush Cassandra configuration")
print("------------------------------------------------------")
time.sleep(10)
else:
spark_new_line = "spark.cassandra.connection.host {}".format(",".join(slave_nodes))
command = "sudo echo '{}' | sudo tee -a /etc/spark/conf/spark-defaults.conf".format(spark_new_line)
stdin, stdout, stderr = c.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Cassandra hosts has been added to Spark")
restarting_system = [
"sudo stop hadoop-yarn-resourcemanager",
"sudo start hadoop-yarn-resourcemanager",
"sudo stop zeppelin",
"sudo start zeppelin"
]
for cmd in restarting_system:
stdin, stdout, stderr = c.exec_command(cmd)
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Restarting action done")
c.close()
print("Making sure all Cassandra nodes are up and running!")
print("------------------------------------------------------")
print("Now a 30 s delay to make sure Cassandra nodes status are accurate")
print("------------------------------------------------------")
time.sleep(10)
# Some cassandra nodes may not start after configuration
# Make sure all cassandra nodes are up and running
for node in nodes:
is_slave = node['is_slave']
dns = node['dns']
if is_slave:
print("Connecting to Slave with public DNS '{}'".format(dns))
print("------------------------------------------------------")
c.connect( hostname = dns, username = "hadoop", pkey=k)
print("Connected!")
print("------------------------------------------------------")
stdin, stdout, stderr = c.exec_command("sudo service cassandra status")
exit_status = stdout.channel.recv_exit_status()
command_status(exit_status, "Cassandra Status")
response = "".join(stdout.readlines())
if "running" in response:
print("Cassandra is running, nothing to do here.")
print("------------------------------------------------------")
elif "dead" in response:
print("Cassandra is dead in this node, needs to restart the service")
print("------------------------------------------------------")
stdin, cstdout, stderr = c.exec_command("sudo service cassandra restart")
exit_status = cstdout.channel.recv_exit_status()
command_status(exit_status, "Cassandra Restarted with message ===> {}".format(cstdout.readlines()))
elif "stopped" in response:
print("Cassandra was stopped!")
print("------------------------------------------------------")
c.close()
print("Configuration Finished!")