-
Notifications
You must be signed in to change notification settings - Fork 2
/
es.py
executable file
·66 lines (54 loc) · 2.07 KB
/
es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/python3
import sys
import os
import datetime
import json
from elasticsearch import Elasticsearch, RequestsHttpConnection
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.getLogger('elasticsearch').setLevel(logging.WARN)
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[%(levelname)s] %(asctime)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if (len(sys.argv) != 4):
logger.error("Usage: ./es.py <mode> <index> <filename>")
sys.exit(1)
mode = sys.argv[1]
index = sys.argv[2]
fname = sys.argv[3]
if (mode == '--aws'):
from requests_aws4auth import AWS4Auth
es_host = os.environ['AWS_ES_HOST']
auth = AWS4Auth(os.environ['AWS_ACCESS_KEY_ID'],
os.environ['AWS_SECRET_ACCESS_KEY'],
os.environ['AWS_DEFAULT_REGION'],
'es')
# use the requests connection_class and pass in our custom auth class
es = Elasticsearch(host=es_host, scheme='https', port=443, http_auth=auth,
connection_class=RequestsHttpConnection)
elif (mode == '--local'):
es = Elasticsearch(host='localhost', port=9200, scheme='http')
print(es.info())
with open('mappings.json') as f: indexDef = f.read()
if (es.indices.exists(index=index)):
res = es.indices.delete(index=index)
if (not res['acknowledged']):
logger.error("Could not delete index {}".format(index))
raise Exception("Failed to delete index")
res = es.indices.create(index=index, body=indexDef)
if (not res['acknowledged']):
logger.error("Could not create index {}".format(index))
raise Exception("Failed to create index")
with open(fname, 'r') as f:
lineNo = 0
for l in f:
if (lineNo % 1000 == 0):
logger.info("Inserted {:,} records".format(lineNo))
id = json.loads(l)['id']
res = es.create(index=index, id=id, body=l)
if (res['result'] != 'created'):
logger.error("Failed to create record for {}: {}".format(id, res))
lineNo += 1
logger.info("Inserted {:,} records".format(lineNo))