-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpgdeploy
executable file
·175 lines (141 loc) · 6.51 KB
/
pgdeploy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# kate: space-indent on; indent-width 4; replace-tabs on;
#
# Copyright © 2016 PROEMION GmbH
#
# Authors:
# Michael Ziegler <[email protected]>
import sys
import spur
import shutil
import psycopg2
import psycopg2.extras
import threading
import os.path
from optparse import OptionParser
def go_nodaemon(func, *args, **kwargs):
thr = threading.Thread(target=func, args=args, kwargs=kwargs)
thr.daemon = False
thr.start()
return thr
class DeployError(Exception):
pass
class PgBouncerUpdater(object):
def __init__(self, cluster_name, pgconnstring, timeout=1, public_net=False):
self.cluster_name = cluster_name
self.pgconnstring = pgconnstring
self.timeout = timeout
self.public_net = public_net
self.conn = psycopg2.connect(pgconnstring)
crs = self.conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
# Query current master
crs.execute(
"""SELECT node_id, node_name, type, conninfo """
"""FROM repmgr.nodes """
"""WHERE active """
"""AND type = 'primary' """
"""ORDER BY node_id"""
)
self.master = crs.fetchone()
# Query databases
crs.execute(
"""SELECT datname """
"""FROM pg_database """
"""WHERE NOT datistemplate """
)
self.databases = crs.fetchall()
# Query user accounts
crs.execute("""select usename, passwd from pg_shadow""")
self.shadow = crs.fetchall()
if self.public_net:
# Query public master address
crs.execute(
"""select conninfo """
"""from repmgr.node_public_info """
"""where node_id = %s""", (self.master["node_id"], ))
self.master["conninfo"] = crs.fetchone()["conninfo"]
def deploy_all(self):
if not self.databases:
raise ValueError("no databases")
crs = self.conn.cursor(cursor_factory = psycopg2.extras.DictCursor)
# Query application servers
crs.execute(
"""SELECT * """
"""FROM repmgr.app_servers """
"""WHERE enabled """
"""ORDER BY node_id"""
)
self.remote_hosts = crs.fetchall()
if not self.remote_hosts:
raise ValueError("no remote hosts")
# Run deployment in parallel threads, one per remote host. They are not
# daemonized, so pgdeploy will not exit until all the threads are finished.
for remote in self.remote_hosts:
thr = go_nodaemon(self.safe_deploy, remote)
thr.name = remote["ip_addr"]
def safe_deploy(self, remote_host):
# Due to a bug in pip, we sometimes get a RequirementParseError. In that case,
# simply retrying the request seems to help.
# See <http://stackoverflow.com/questions/35625488/>
import pkg_resources
from time import sleep
no_tries = 0
while no_tries < 3:
try:
return self.deploy(remote_host)
except pkg_resources.RequirementParseError:
sleep(1)
no_tries += 1
raise DeployError("exceeded number of retries for RequirementParseError workaround")
def deploy(self, remote_host):
cluster_config = "/etc/pgbouncer/databases/%s.ini" % self.cluster_name
pgpass = os.path.expanduser("~/.pgpass")
try:
with spur.SshShell(hostname=remote_host["ip_addr"],
username=remote_host["username"],
missing_host_key=spur.ssh.MissingHostKey.accept,
connect_timeout=self.timeout) as shell:
# Update ~/.pgpass
if os.path.isfile(pgpass) and not remote_host["auth_trust"]:
with open(pgpass, "rb") as local_file:
with shell.open(pgpass, "wb") as remote_file:
shutil.copyfileobj(local_file, remote_file)
# Update database connstrings
with shell.open(cluster_config, "wb") as pgbouncer:
for database in self.databases:
if remote_host["application_name"] and "application_name" not in self.master["conninfo"]:
print("%s = %s application_name=%s" % (database["datname"], self.master["conninfo"], remote_host["application_name"]), file=pgbouncer)
else:
print("%s = %s" % (database["datname"], self.master["conninfo"]), file=pgbouncer)
# Update userlist.txt
with shell.open("/etc/pgbouncer/userlist.txt", "wb") as userlist:
for account in self.shadow:
print('"%s" "%s"' % (account["usename"], account["passwd"]), file=userlist)
# Reload pgbouncer
if remote_host["username"] == "root":
shell.run(["service", "pgbouncer", "reload"])
else:
shell.run(["sudo", "service", "pgbouncer", "reload"])
except spur.ssh.ConnectionError as err:
if remote_host["application_name"]:
print("Could not update pgbouncer on %s [%s]: %s" % (remote_host["ip_addr"], remote_host["application_name"], err.original_error.message), file=sys.stderr)
else:
print("Could not update pgbouncer on %s: %s" % (remote_host["ip_addr"], err.original_error.message), file=sys.stderr)
def main():
parser = OptionParser("Usage: %prog [options]")
parser.add_option("-c", "--cluster", help="name of the cluster")
parser.add_option("-t", "--timeout", help="timeout for the SSH connection in seconds [1]", type=int, default=1)
parser.add_option("-p", "--pgconnstring",
default="host=/var/run/postgresql",
help="postgres connection string [host=/var/run/postgresql].")
parser.add_option("-P", "--public-net", action="store_true", default=False,
help="get the master node's connstring from the node_public_info table instead of repl_nodes")
options, posargs = parser.parse_args()
if not options.cluster or posargs:
print("Need cluster option and must not have any positional args, see --help", file=sys.stderr)
return 1
updater = PgBouncerUpdater(options.cluster, options.pgconnstring, options.timeout, options.public_net)
updater.deploy_all()
if __name__ == '__main__':
sys.exit(main())