-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_table_sync.py
83 lines (64 loc) · 2.94 KB
/
test_table_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import ConfigParser
from cartodb import TableSync, InvalidJsonResponseException
import httpretty
import unittest
class TableSyncTest(unittest.TestCase):
TABLE_ID = 'table_id_1'
TABLE_NAME = 'table_name_created'
JSON_CONTENT_TYPE = "application/json"
STATUS_OK = 200
RESPONSE_OK = '{"status": "ok"}'
def setUp(self):
tests_config = ConfigParser.RawConfigParser()
try:
tests_config.read(".tests.conf")
self.host = tests_config.get('endpoint', 'host')
self.port = tests_config.getint('endpoint', 'port')
self.secure = tests_config.getboolean('endpoint', 'secure')
self.mock_server = False
except Exception:
self.host = 'localhost'
self.port = 8000
self.secure = False
self.mock_server = True
if self.mock_server:
httpretty.enable()
self.table_sync = TableSync(self.host, self.port, self.secure)
def tearDown(self):
if self.mock_server:
httpretty.disable()
def test_created_happy_case(self):
httpretty.register_uri(httpretty.POST, self._get_service_url(self.TABLE_ID),
body=self.RESPONSE_OK,
content_type=self.JSON_CONTENT_TYPE,
status=self.STATUS_OK)
response = self.table_sync.created(self.TABLE_ID, self.TABLE_NAME)
self.assertTrue(response.ok())
def test_updated_happy_case(self):
httpretty.register_uri(httpretty.PUT, self._get_service_url(self.TABLE_ID),
body=self.RESPONSE_OK,
content_type=self.JSON_CONTENT_TYPE,
status=self.STATUS_OK)
response = self.table_sync.updated(self.TABLE_ID, self.TABLE_NAME)
self.assertTrue(response.ok())
def test_deleted_happy_case(self):
httpretty.register_uri(httpretty.DELETE, self._get_service_url(self.TABLE_ID),
body=self.RESPONSE_OK,
content_type=self.JSON_CONTENT_TYPE,
status=self.STATUS_OK)
response = self.table_sync.deleted(self.TABLE_ID, self.TABLE_NAME)
self.assertTrue(response.ok())
def test_invalid_json_raises_exception(self):
httpretty.register_uri(httpretty.DELETE, self._get_service_url(self.TABLE_ID),
body='INVALIDJSON',
content_type=self.JSON_CONTENT_TYPE,
status=self.STATUS_OK)
with self.assertRaises(InvalidJsonResponseException):
self.table_sync.deleted(self.TABLE_ID, self.TABLE_NAME)
def _get_service_url(self, table_id):
return "%s://%s:%s/%s" % (
'https' if self.secure else 'http',
self.host,
self.port,
TableSync.API_ENDPOINT_PATH % {'table_id': table_id}
)