-
Notifications
You must be signed in to change notification settings - Fork 45
/
csv_to_elastic.py
241 lines (207 loc) · 9.08 KB
/
csv_to_elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/python3
"""
DESCRIPTION
Simple python script to import a csv into ElasticSearch. It can also update existing Elastic data if
only parameter --id-column is provided
HOW IT WORKS
The script creates an ElasticSearch API PUT request for
each row in your CSV. It is similar to running an bulk insert by:
$ curl -XPUT localhost:9200/_bulk -d '{index: "", type: ""}
{ data }'
In both `json-struct` and `elastic-index` path, the script will
insert your csv data by replacing the column name wrapped in '%'
with the data for the given row. For example, `%id%` will be
replaced with data from the `id` column of your CSV.
NOTES
- CSV must have headers
- insert elastic address (with port) as argument, it defaults to localhost:9200
EXAMPLES
1. CREATE example:
$ python csv_to_elastic.py \
--elastic-address 'localhost:9200' \
--csv-file input.csv \
--elastic-index 'index' \
--datetime-field=dateField \
--json-struct '{
"name" : "%name%",
"major" : "%major%"
}'
CSV:
| name | major |
|--------|------------------|
| Mike | Engineering |
| Erin | Computer Science |
2. CREATE/UPDATE example:
$ python csv_to_elastic.py \
--elastic-address 'localhost:9200' \
--csv-file input.csv \
--elastic-index 'index' \
--datetime-field=dateField \
--json-struct '{
"name" : "%name%",
"major" : "%major%"
}'
--id-column id
CSV:
| id | name | major |
|------|--------|------------------|
| 1 | Mike | Engineering |
| 2 | Erin | Computer Science |
"""
import argparse
import http.client
import os
import csv
import json
import dateutil.parser
from base64 import b64encode
def main(file_path, delimiter, max_rows, elastic_index, json_struct, datetime_field, elastic_type, elastic_address, ssl, username, password, id_column):
endpoint = '/_bulk'
if max_rows is None:
max_rows_disp = "all"
else:
max_rows_disp = max_rows
print("")
print(" ----- CSV to ElasticSearch ----- ")
print("Importing %s rows into `%s` from '%s'" % (max_rows_disp, elastic_index, file_path))
print("")
count = 0
headers = []
headers_position = {}
to_elastic_string = ""
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=delimiter, quotechar='"')
for row in reader:
if count == 0:
for iterator, col in enumerate(row):
headers.append(col)
headers_position[col] = iterator
elif max_rows is not None and count >= max_rows:
print('Max rows imported - exit')
break
elif len(row[0]) == 0: # Empty rows on the end of document
print("Found empty rows at the end of document")
break
else:
pos = 0
if os.name == 'nt':
_data = json_struct.replace("^", '"')
else:
_data = json_struct.replace("'", '"')
_data = _data.replace('\n','').replace('\r','')
for header in headers:
if header == datetime_field:
datetime_type = dateutil.parser.parse(row[pos])
_data = _data.replace('"%' + header + '%"', '"{:%Y-%m-%dT%H:%M}"'.format(datetime_type))
else:
try:
int(row[pos])
_data = _data.replace('"%' + header + '%"', row[pos])
except ValueError:
_data = _data.replace('%' + header + '%', row[pos])
pos += 1
# Send the request
if id_column is not None:
index_row = {"index": {"_index": elastic_index,
"_type": elastic_type,
'_id': row[headers_position[id_column]]}}
else:
index_row = {"index": {"_index": elastic_index, "_type": elastic_type}}
json_string = json.dumps(index_row) + "\n" + _data + "\n"
to_elastic_string += json_string
count += 1
if count % 10000 == 0:
send_to_elastic(elastic_address, endpoint, ssl, username, password, to_elastic_string, count)
to_elastic_string = ""
print('Reached end of CSV - sending to Elastic')
send_to_elastic(elastic_address, endpoint, ssl, username, password, to_elastic_string, count)
print("Done.")
def send_to_elastic(elastic_address, endpoint, ssl, username, password, to_elastic_string, block=0):
if ssl:
print("Using HTTPS")
connection = http.client.HTTPSConnection(elastic_address)
else:
print("Using unencrypted http")
connection = http.client.HTTPConnection(elastic_address)
headers = {"Content-type": "application/json", "Accept": "text/plain"}
if not username is None:
auth = b64encode(bytes("{}:{}".format(username,password), "utf-8")).decode("ascii")
if ssl or "localhost" in elastic_address:
headers['Authorization'] = "Basic {}".format(auth)
else:
print("*** Warning: Refusing to remotely transmit user/pass unencrypted.")
connection.request('POST', url=endpoint, headers = headers, body=to_elastic_string)
response = connection.getresponse()
body = response.read().decode('utf-8')
response_details=json.loads(body)
if response.status != 200:
print ("\n*** Error occured before import. ***")
print ("HTTP error code {} / {}".format(response.status, response.reason))
else:
if response_details['errors']:
line=1 # skip header
for i in response_details['items']:
line=line+1
if i['index']['status'] != 201:
print("\n*** Problem on line {}: {}".format(block+line,i['index']['error']))
print ("\n*** Error occured during import. See response body above. ***\n")
else:
print ("Import of {} items was successful.".format(len(response_details['items'])))
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='CSV to ElasticSearch.')
parser.add_argument('--elastic-address',
required=False,
type=str,
default='localhost:9200',
help='Your elasticsearch endpoint address')
parser.add_argument('--ssl',
dest='ssl',
action='store_true',
required=False,
help='Use SSL connection')
parser.add_argument('--username',
required=False,
type=str,
help='Username for basic auth (for example with elastic cloud)')
parser.add_argument('--password',
required=False,
type=str,
help='Password')
parser.add_argument('--csv-file',
required=True,
type=str,
help='path to csv to import')
parser.add_argument('--json-struct',
required=True,
type=str,
help='json to be inserted')
parser.add_argument('--elastic-index',
required=True,
type=str,
help='elastic index you want to put data in')
parser.add_argument('--elastic-type',
required=False,
type=str,
default='test_type',
help='Your entry type for elastic')
parser.add_argument('--max-rows',
type=int,
default=None,
help='max rows to import')
parser.add_argument('--datetime-field',
type=str,
help='datetime field for elastic')
parser.add_argument('--id-column',
type=str,
default=None,
help='If you want to have index and you have it in csv, this the argument to point to it')
parser.add_argument('--delimiter',
type=str,
default=";",
help='If you want to have a different delimiter than ;')
parsed_args = parser.parse_args()
main(file_path=parsed_args.csv_file, delimiter = parsed_args.delimiter, json_struct=parsed_args.json_struct,
elastic_index=parsed_args.elastic_index, elastic_type=parsed_args.elastic_type,
datetime_field=parsed_args.datetime_field, max_rows=parsed_args.max_rows,
elastic_address=parsed_args.elastic_address, ssl=parsed_args.ssl, username=parsed_args.username, password=parsed_args.password, id_column=parsed_args.id_column)