-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv_to_psqltable.py
executable file
·132 lines (116 loc) · 4.54 KB
/
csv_to_psqltable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/python3
import csv
import time
from optparse import OptionParser
class PGTableCreator():
""" Read a CSV file and try to guess the right DB structure to save it """
def _duplicates(self, mylist):
d = {}
for elem in mylist:
if elem in d:
d[elem] += 1
else:
d[elem] = 1
return [x for x, y in d.items() if y > 1]
def is_type_integer(self, s):
try:
int(s)
return -2147483648 <= int(s) <= +2147483647
except (ValueError, OverflowError):
return False
def is_type_float(self, s):
try:
float(s)
return True
except (ValueError, OverflowError):
return False
def is_type_boolean(self, s):
return s.lower() in ('t', 'true', 'y', 'yes', '1', 'f', 'false', 'n', 'no', '0')
def is_type_date(self, s):
# 2012-DEC-01 to "2012-Dec-01"
s = s.title()
for datetime_format in (
'%Y-%m-%d',
'%Y-%b-%d',
'%d-%b-%Y',
):
try:
time.strptime(s, datetime_format)
return True
except (ValueError, OverflowError):
pass
return False
def is_type_datetime(self, s):
# 2012-DEC-01 to "2012-Dec-01"
s = s.title()
for datetime_format in (
'%Y-%m-%d %H:%M:%S',
'%Y-%b-%d %H:%M:%S',
'%d-%b-%Y %H:%M:%S',
):
try:
time.strptime(s, datetime_format)
return True
except (ValueError, OverflowError):
pass
return False
def run(self, filename, tablename):
with open(filename, 'r', encoding='utf-8') as csvfile:
csvreader = csv.reader(csvfile, delimiter=';', quotechar='"')
headers = next(csvreader)
dups = self._duplicates(headers)
if dups:
print("Warning - duplicate columns, %s" % str(dups))
default_properties = {
'integer': True,
'float': True,
'date': True,
'boolean': True,
'datetime': True,
'maxlen': 0,
}
header_properties = dict((h, default_properties.copy()) for h in headers)
line = 0
for row in csvreader:
line += 1
# Skip invalid lines
if len(row) != len(headers):
print("Warning - skipped line %d" % (line + 1, ))
continue
i = 0
for cell in row:
for cell_type in ['integer', 'float', 'date', 'datetime', 'boolean']:
f = getattr(self, "is_type_%s" % cell_type)
if cell and header_properties[headers[i]][cell_type] and not f(cell):
header_properties[headers[i]][cell_type] = False
header_properties[headers[i]]['maxlen'] = max(header_properties[headers[i]]['maxlen'], len(cell))
i += 1
# Remove empty headers
headers = filter(bool, headers)
print("CREATE TABLE %s (" % (tablename, ))
field_declaration = []
for h in headers:
if header_properties[h]['boolean']:
field_declaration.append("%s BOOLEAN" % (h, ))
elif header_properties[h]['integer']:
field_declaration.append("%s INT" % (h, ))
elif header_properties[h]['float']:
field_declaration.append("%s FLOAT" % (h, ))
elif header_properties[h]['date']:
field_declaration.append("%s DATE" % (h, ))
elif header_properties[h]['datetime']:
field_declaration.append("%s TIMESTAMP" % (h, ))
else:
field_declaration.append("%s VARCHAR(%s)" % (h, header_properties[h]['maxlen']))
print(", ".join(field_declaration))
print(");")
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-f", "--file", dest="filename",
help="Read data from FILE", metavar="FILE")
parser.add_option("-t", "--table", dest="tablename",
help="Name of the table to create", metavar="TABLENAME",
default="xx")
(options, args) = parser.parse_args()
pgtc = PGTableCreator()
pgtc.run(options.filename, options.tablename)