forked from singer-io/tap-fixerio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tap_exchangeratesapi.py
108 lines (83 loc) · 3.1 KB
/
tap_exchangeratesapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
import json
import sys
import argparse
import time
import requests
import singer
import backoff
from datetime import date, datetime, timedelta
base_url = 'https://api.exchangeratesapi.io/'
logger = singer.get_logger()
session = requests.Session()
DATE_FORMAT='%Y-%m-%d'
def parse_response(r):
flattened = r['rates']
flattened[r['base']] = 1.0
flattened['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(r['date'], DATE_FORMAT))
return flattened
schema = {'type': 'object',
'properties':
{'date': {'type': 'string',
'format': 'date-time'}},
'additionalProperties': True}
def giveup(error):
logger.error(error.response.text)
response = error.response
return not (response.status_code == 429 or
response.status_code >= 500)
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException),
jitter=backoff.random_jitter,
max_tries=5,
giveup=giveup,
interval=30)
def request(url, params):
response = requests.get(url=url, params=params)
response.raise_for_status()
return response
def do_sync(base, start_date):
logger.info('Replicating exchange rate data from exchangeratesapi.io starting from {}'.format(start_date))
singer.write_schema('exchange_rate', schema, 'date')
state = {'start_date': start_date}
next_date = start_date
try:
while True:
response = request(base_url + next_date, {'base': base})
payload = response.json()
if datetime.strptime(next_date, DATE_FORMAT) > datetime.utcnow():
break
else:
singer.write_records('exchange_rate', [parse_response(payload)])
state = {'start_date': next_date}
next_date = (datetime.strptime(next_date, DATE_FORMAT) + timedelta(days=1)).strftime(DATE_FORMAT)
except requests.exceptions.RequestException as e:
logger.fatal('Error on ' + e.request.url +
'; received status ' + str(e.response.status_code) +
': ' + e.response.text)
singer.write_state(state)
sys.exit(-1)
singer.write_state(state)
logger.info('Tap exiting normally')
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', help='Config file', required=False)
parser.add_argument(
'-s', '--state', help='State file', required=False)
args = parser.parse_args()
if args.config:
with open(args.config) as file:
config = json.load(file)
else:
config = {}
if args.state:
with open(args.state) as file:
state = json.load(file)
else:
state = {}
start_date = state.get('start_date') or config.get('start_date') or datetime.utcnow().strftime(DATE_FORMAT)
start_date = singer.utils.strptime_with_tz(start_date).date().strftime(DATE_FORMAT)
do_sync(config.get('base', 'USD'), start_date)
if __name__ == '__main__':
main()