forked from cms-sw/cms-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
es-reindex
executable file
·117 lines (104 loc) · 3.55 KB
/
es-reindex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
# A script which creates a new index using a new mapping and create an alias
# for the old index name.
from argparse import ArgumentParser
from commands import getstatusoutput
from datetime import datetime
import re
import json
import urllib2
import base64
from os import getenv
# Avoid checking for certificate since we execute this only inside CERN.
# Drop the following stanza if not.
import ssl
if hasattr(ssl, '_create_unverified_context'):
ssl._create_default_https_context = ssl._create_unverified_context
def format(s, **kwds):
return s % kwds
def writeToES(server, data):
url = format("%(server)s/_bulk", server=server)
handler=urllib2.HTTPSHandler(debuglevel=0)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
request = urllib2.Request(url, data)
request.get_method = lambda : "PUT"
base64string = base64.encodestring(getenv("ES_AUTH")).replace('\n', '')
request.add_header("Authorization", "Basic %s" % base64string)
try:
response = opener.open(request)
except Exception, e:
print e
exit(1)
print response.read()
# - Get the index via scan and scroll.
# - Push items to the new index.
# - Delete old index.
# - Create alias from the old index to the new one.
if __name__ == "__main__":
# First get all the available indices
parser = ArgumentParser()
parser.add_argument("--server", "-s", dest="server",
default="localhost:9200", help="the elasticsearch server")
parser.add_argument("--dry-run", "-n", dest="dryrun", action="store_true",
default=False, help="do not change the DB.")
parser.add_argument("source")
parser.add_argument("dest")
args = parser.parse_args()
proxy_string = ""
if not getenv("ES_AUTH"):
print "ES_AUTH not specified"
exit(1)
user_string = "--user %s" % getenv("ES_AUTH")
query = {
"query": { "match_all": {}},
"size": 1000
}
cmd = format("curl -s %(user_string)s '%(server)s/%(source)s/_search?search_type=scan&scroll=1m' -d'%(query)s'",
source=args.source,
query=json.dumps(query),
server=args.server,
user_string=user_string)
print cmd
err, out = getstatusoutput(cmd)
if err:
print "Error while getting indices"
print out
exit(0)
result = json.loads(out)
while True:
cmd = format("curl -s %(user_string)s '%(server)s/_search/scroll?scroll=1m' -d'%(scroll_id)s'",
source=args.source,
query=json.dumps(query),
server=args.server,
user_string=user_string,
scroll_id=result["_scroll_id"])
err, out = getstatusoutput(cmd)
if err:
print "Error while getting entries"
print out
exit(1)
result = json.loads(out)
if result.get("status", 200) != 200:
print out
exit(1)
# Exit when there are not results
if not len(result["hits"]["hits"]):
break
line = ""
for item in result["hits"]["hits"]:
cmd = format('{ "create": { "_index": "%(index)s", "_type": "%(obj_type)s", "_id": "%(obj_id)s" }}',
index=args.dest,
obj_type=item["_type"],
obj_id=item["_id"]
)
payload = json.dumps(item["_source"])
line += cmd+"\n"+payload + "\n"
if len(line) < 500000:
continue
# print json.dumps(json.loads(cmd))
# print json.dumps(json.loads(payload))
writeToES(server=args.server, data=line)
line = ""
if line:
writeToES(server=args.server, data=line)