-
Notifications
You must be signed in to change notification settings - Fork 0
/
1_cfpb_complaints_etl.py
66 lines (56 loc) · 2.15 KB
/
1_cfpb_complaints_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
import sqlite3
import requests
import datetime
from contextlib import closing
from collections import namedtuple
from prefect import task, Flow
from prefect.tasks.database.sqlite import SQLiteScript
from prefect.schedules import IntervalSchedule
## alerting
def alert_failed(obj, old_state, new_state):
if new_state.is_failed():
print('Failed!')
## setup
create_table = SQLiteScript(
db='cfpbcomplaints.db',
script='CREATE TABLE IF NOT EXISTS complaint (timestamp TEXT, state TEXT, product TEXT, company TEXT, complaint_what_happened TEXT)'
)
## extract
@task(cache_for=datetime.timedelta(days=1), state_handlers=[alert_failed])
def get_complaint_data():
r = requests.get("https://www.consumerfinance.gov/data-research/consumer-complaints/search/api/v1/", params={'size':10})
response_json = json.loads(r.text)
return response_json['hits']['hits']
## transform
@task(state_handlers=[alert_failed])
def parse_complaint_data(raw):
complaints = []
Complaint = namedtuple('Complaint', ['data_received', 'state', 'product', 'company', 'complaint_what_happened'])
for row in raw:
source = row.get('_source')
this_complaint = Complaint(
data_received=source.get('date_recieved'),
state=source.get('state'),
product=source.get('product'),
company=source.get('company'),
complaint_what_happened=source.get('complaint_what_happened')
)
complaints.append(this_complaint)
return complaints
## load
@task(state_handlers=[alert_failed])
def store_complaints(parsed):
insert_cmd = "INSERT INTO complaint VALUES (?, ?, ?, ?, ?)"
with closing(sqlite3.connect("cfpbcomplaints.db")) as conn:
with closing(conn.cursor()) as cursor:
cursor.executemany(insert_cmd, parsed)
conn.commit()
schedule = IntervalSchedule(interval=datetime.timedelta(minutes=1))
with Flow("my etl flow", schedule, state_handlers=[alert_failed]) as f:
db_table = create_table()
raw = get_complaint_data()
parsed = parse_complaint_data(raw)
populated_table = store_complaints(parsed)
populated_table.set_upstream(db_table)
f.run()