-
Notifications
You must be signed in to change notification settings - Fork 27
/
main.py
executable file
·174 lines (143 loc) · 5.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
import gzip
import pathlib
import shutil
import tempfile
try:
import boto3
except ImportError:
boto3 = None
import click
import pandas as pd
from pypums import ACS
from sklearn.model_selection import train_test_split
cwd = pathlib.Path.cwd()
SEED = 11
INCOME_VARS = [
# from person file
"INTP", # interest income
"OIP", # all other income
"PAP", # public assistance income
"RETP", # retirement income
"SEMP", # self-employment income
"SSIP", # supplementary social security income
"SSP", # social security income
"WAGP", # wages or salary income
"PERNP", # total person's earnings
"PINCP", # total person's income
# from household file
'FINCP', # family income
'HINCP', # household income
]
OTHER_INCOME_VARS = [
# from person file
"FINTP", # interest income allocation flag
"FOIP", # other income allocation flag
"FPAP", # public assistance income allocation flag
"FPINCP", # total person's income allocation flag
"FRETP", # retirement income allocation flag
"FSEMP", # self-employment income allocation flag
"FSSIP", # supplementary social security income allocation flag
"FSSP", # social security income allocation flag
"FWAGP", # wages and salary income allocation flag
# from household file
'GRPIP', # gross rent as a percentage of household income
'OCPIP', # selected monthly owner costs as a % of household income
'FFINCP', # family income allocation flag
'FHINCP', # household income allocation flag
]
def download_data(dst: pathlib.Path):
dst.mkdir(exist_ok=True, parents=True)
person = ACS(
year=2018,
state="Massachusetts",
survey="1-Year",
person_or_household="person",
)
household = ACS(
year=2018,
state="Massachusetts",
survey="1-Year",
person_or_household="household",
)
with tempfile.TemporaryDirectory() as d:
d = pathlib.Path(d)
person.download_data(data_directory=d, extract=True)
household.download_data(data_directory=d, extract=True)
src = d.joinpath("interim", "ACS_18", "MA")
for p in src.iterdir():
if not dst.joinpath(p.name).exists():
shutil.move(str(p), str(dst))
else:
print(f'skipping {p.name} (already exists in dst)')
def merge(person: pd.DataFrame, household: pd.DataFrame) -> pd.DataFrame:
return pd.merge(
left=person,
right=household,
how='left',
on='SERIALNO'
)
def prepare(df, save=True):
# From Kaggle:
# > A set of reasonably clean records was extracted using the following
# > conditions: ((AAGE>16) && (AGI>100) && (AFNLWGT>1) && (HRSWK>0)).
# In PUMS, variables are named AGEP for age, PINCP for personal income from
# all source, and WKHP for hours worked in a typical week over the past 12
# months. The weight variable (AFNLWGT) is not relevant to this micro data.
mask = (df['AGEP'] > 16) & (df['PINCP'] > 100) & (df['WKHP'] > 0)
print(f'Filtering {mask.sum()} relevant rows out of {len(df)} total')
df = df.loc[mask]
y_df = df[INCOME_VARS]
X_df = df.drop(INCOME_VARS + OTHER_INCOME_VARS, axis=1)
# TODO may need to split households, not people to avoid leakage
X_df_tr, X_df_val, y_df_tr, y_df_val = train_test_split(
X_df, y_df, random_state=SEED, shuffle=True
)
if save:
cwd.joinpath("train").mkdir(exist_ok=True)
cwd.joinpath("val").mkdir(exist_ok=True)
X_df_tr.to_csv(cwd / "train" / "entities.csv", index=False)
X_df_val.to_csv(cwd / "val" / "entities.csv", index=False)
y_df_tr.to_csv(cwd / "train" / "targets.csv", index=False)
y_df_val.to_csv(cwd / "val" / "targets.csv", index=False)
return X_df_tr, X_df_val, y_df_tr, y_df_val
def compress(src):
for split in ['train', 'val']:
for table in ['entities', 'targets']:
filename = src / split / f'{table}.csv'
with filename.open('rb') as fin:
with gzip.open(filename.with_suffix('.csv.gz'), 'wb') as fout:
shutil.copyfileobj(fin, fout)
def upload(src):
if boto3 is None:
raise NotImplementedError
s3 = boto3.client('s3')
bucket = 'mit-dai-ballet'
# TODO - upload docs
pass
# upload files
for split in ['train', 'val']:
for table in ['entities', 'targets']:
for suffix in ['.csv', '.csv.gz']:
filename = src / split / (table + suffix)
objectname = f'census/{split}/{table}{suffix}'
s3.upload_file(str(filename), bucket, objectname)
# TODO - new files do not retain public read permissions, have to use CLI
# or UI to set.
@click.command()
@click.option('-u', '--upload', '_upload',
is_flag=True, default=False, help='upload files to s3')
def main(_upload):
download_data(cwd)
person_file = "psam_p25.csv"
person = pd.read_csv(person_file)
household_file = "psam_h25.csv"
household = pd.read_csv(household_file)
df = merge(person, household)
prepare(df)
compress(cwd)
if _upload:
upload(cwd)
print("done")
if __name__ == '__main__':
main()