forked from pydata/parallel-tutorial
-
Notifications
You must be signed in to change notification settings - Fork 0
/
prep.py
64 lines (52 loc) · 1.83 KB
/
prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
from glob import glob
import pandas as pd
try:
import ujson as json
except ImportError:
import json
import dask
import dask.dataframe as dd
import dask.multiprocessing
import os
import sys
WINDOWS = sys.platform.startswith('win')
if WINDOWS:
dask.set_options(get=dask.threaded.get)
else:
dask.set_options(get=dask.multiprocessing.get)
here = os.path.dirname(__file__)
os.makedirs(os.path.join(here, 'data', 'minute'), exist_ok=True)
stocks = ['aet', 'afl', 'aig', 'al', 'amgn', 'avy', 'b', 'bwa', 'ge',
'hal', 'hp', 'hpq', 'ibm', 'jbl', 'jpm', 'luv', 'met', 'pcg',
'tgt', 'usb', 'xom']
def write_stock(symbol):
dirname = os.path.join(here, 'data', 'minute', symbol)
if not os.path.exists(dirname):
os.mkdir(dirname)
df = dd.demo.daily_stock(symbol, '2010', '2015', freq='120s',
data_source='google')
names = [str(ts.date()) for ts in df.divisions]
df.to_csv(os.path.join(here, 'data', 'minute', symbol, '*.csv'),
name_function=names.__getitem__)
print("Finished CSV: %s" % symbol)
for symbol in stocks:
write_stock(symbol)
def convert_to_json(d):
filenames = sorted(glob(os.path.join(d, '*')))[-365:]
outfn = d.replace('minute', 'json') + '.json'
if os.path.exists(outfn):
return
with open(outfn, 'w') as f:
for fn in filenames:
df = pd.read_csv(fn)
for rec in df.to_dict(orient='records'):
json.dump(rec, f)
f.write('\n')
print("Finished JSON: %s" % d.split(os.path.sep)[-1])
js = os.path.join(here, 'data', 'json')
if not os.path.exists(js):
os.mkdir(js)
directories = sorted(glob(os.path.join(here, 'data', 'minute', '*')))
values = [dask.delayed(convert_to_json)(d) for d in directories]
dask.compute(values)