Skip to content

Commit

Permalink
Include metadata about replication keys (#15)
Browse files Browse the repository at this point in the history
* Include metadata about replication keys

* Make updates to use fields of data-type timestamp/timestamptz as valid replication keys

* Split date and date-time types

* Refactor unnecessary loop

* Add test
  • Loading branch information
Gbolahan Okerayi authored and rflprr committed Apr 2, 2018
1 parent 795429a commit d091943
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 10 deletions.
24 changes: 21 additions & 3 deletions tap_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from tap_redshift import resolve

__version__ = '1.0.0b3'
__version__ = '1.0.0b4'

LOGGER = singer.get_logger()

Expand All @@ -60,7 +60,9 @@

FLOAT_TYPES = {'float', 'float4', 'float8'}

DATETIME_TYPES = {'timestamp', 'timestamptz', 'date',
DATE_TYPES = {'date'}

DATETIME_TYPES = {'timestamp', 'timestamptz',
'timestamp without time zone', 'timestamp with time zone'}


Expand Down Expand Up @@ -131,7 +133,6 @@ def discover_catalog(conn, db_schema):
schema=schema,
table=qualified_table_name,
metadata=metadata)

key_properties = [
column for column in table_pks.get(table_name, [])
if schema.properties[column].inclusion != 'unsupported']
Expand Down Expand Up @@ -178,6 +179,10 @@ def schema_for_column(c):
result.type = 'string'
result.format = 'date-time'

elif column_type in DATE_TYPES:
result.type = 'string'
result.format = 'date'

else:
result = Schema(None,
inclusion='unsupported',
Expand All @@ -193,8 +198,14 @@ def schema_for_column(c):
def create_column_metadata(cols):
mdata = metadata.new()
mdata = metadata.write(mdata, (), 'selected-by-default', False)
valid_rep_keys = []

for c in cols:
if c['type'] in DATETIME_TYPES:
valid_rep_keys.append(c['name'])

schema = schema_for_column(c)

mdata = metadata.write(mdata,
('properties', c['name']),
'selected-by-default',
Expand All @@ -203,6 +214,13 @@ def create_column_metadata(cols):
('properties', c['name']),
'sql-datatype',
c['type'].lower())
if valid_rep_keys:
mdata = metadata.write(mdata, (), 'valid-replication-keys',
valid_rep_keys)
else:
mdata = metadata.write(mdata, (), 'forced-replication-method', {
'replication-method': 'FULL_TABLE',
'reason': 'No replication keys found from table'})

return metadata.to_list(mdata)

Expand Down
34 changes: 30 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def column_specs_cursor():
result = [('table1', 1, 'col1', 'int2', 'NO'),
('table1', 2, 'col2', 'float8', 'YES'),
('table1', 3, 'col3', 'timestamptz', 'NO'),
('table1', 4, 'col4', 'timestamp', 'NO'),
('table1', 5, 'col5', 'timestamp with time zone', 'NO'),
('table2', 1, 'col1', 'int4', 'NO'),
('table2', 2, 'col2', 'bool', 'YES'),
('view1', 1, 'col1', 'varchar', 'NO'),
Expand Down Expand Up @@ -95,6 +97,14 @@ def expected_catalog_from_db():
'inclusion': 'available',
'type': ['null', 'number']},
'col3': {
'inclusion': 'available',
'format': 'date-time',
'type': 'string'},
'col4': {
'inclusion': 'available',
'format': 'date-time',
'type': 'string'},
'col5': {
'inclusion': 'available',
'format': 'date-time',
'type': 'string'}},
Expand All @@ -103,7 +113,9 @@ def expected_catalog_from_db():
'stream': 'table1',
'metadata': [
{'breadcrumb': (),
'metadata': {'selected-by-default': False}},
'metadata': {'selected-by-default': False,
'valid-replication-keys': [
'col3', 'col4', 'col5']}},
{'breadcrumb': ('properties', 'col1'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'int2'}},
Expand All @@ -112,7 +124,13 @@ def expected_catalog_from_db():
'sql-datatype': 'float8'}},
{'breadcrumb': ('properties', 'col3'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'timestamptz'}}
'sql-datatype': 'timestamptz'}},
{'breadcrumb': ('properties', 'col4'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'timestamp'}},
{'breadcrumb': ('properties', 'col5'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'timestamp with time zone'}}
]},
{'tap_stream_id': 'test-db.public.table2',
'database_name': 'test-db',
Expand All @@ -134,7 +152,11 @@ def expected_catalog_from_db():
'stream': 'table2',
'metadata': [
{'breadcrumb': (),
'metadata': {'selected-by-default': False}},
'metadata': {'selected-by-default': False,
'forced-replication-method': {
'replication-method': 'FULL_TABLE',
'reason': 'No replication keys found from table'
}}},
{'breadcrumb': ('properties', 'col1'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'int4'}},
Expand All @@ -157,7 +179,11 @@ def expected_catalog_from_db():
'stream': 'view1',
'metadata': [
{'breadcrumb': (),
'metadata': {'selected-by-default': False}},
'metadata': {'selected-by-default': False,
'forced-replication-method': {
'replication-method': 'FULL_TABLE',
'reason': 'No replication keys found from table'
}}},
{'breadcrumb': ('properties', 'col1'),
'metadata': {'selected-by-default': True,
'sql-datatype': 'varchar'}},
Expand Down
56 changes: 53 additions & 3 deletions tests/tap_redshift/test_tap_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
{
'type': 'date',
'pos': 6,
'name': 'date_expired',
'nullable': 'YES'
},
{
'type': 'timestamp',
'pos': 7,
'name': 'date_created',
'nullable': 'YES'
}
Expand Down Expand Up @@ -107,6 +113,14 @@
'inclusion': 'available'
},
'expires_at': {
'type': [
'null',
'string'
],
'format': 'date',
'inclusion': 'available'
},
'created_at': {
'type': [
'null',
'string'
Expand All @@ -117,6 +131,13 @@
},
},
'metadata': [
{
'metadata': {
'selected-by-default': False,
'valid-replication-keys': ['created_at']
},
'breadcrumb': ()
},
{
'metadata': {
'sql-datatype': 'int4',
Expand Down Expand Up @@ -154,16 +175,22 @@
},
{
'metadata': {
'sql-datatype': 'timestamptz',
'sql-datatype': 'date',
'selected-by-default': True
},
'breadcrumb': ['properties', 'expires_at']
},
{
'metadata': {
'sql-datatype': 'timestamptz',
'selected-by-default': True
},
'breadcrumb': ['properties', 'created_at']
},
],
'key_properties': [
'id'
],
'is_view': False,
'table_name': 'fake name',
'stream': 'fake stream',
'tap_stream_id': 'FakeDB-fake name'
Expand Down Expand Up @@ -191,7 +218,6 @@ def test_discover_catalog(self, discovery_conn, expected_catalog_from_db):

actual_metadata = metadata.to_map(actual_entry.metadata)
expected_metadata = metadata.to_map(expected_entry.metadata)

for bcrumb, actual_mdata in actual_metadata.items():
for mdata_key, actual_value in actual_mdata.items():
assert_that(
Expand All @@ -210,6 +236,9 @@ def test_create_column_metadata(self):
expected_mdata = metadata.new()
metadata.write(expected_mdata, (), 'selected-by-default', False)
for col in cols:
metadata.write(expected_mdata, (),
'valid-replication-keys',
['col3'])
metadata.write(expected_mdata, (
'properties', col['name']), 'selected-by-default', True)
metadata.write(expected_mdata, (
Expand Down Expand Up @@ -260,4 +289,25 @@ def test_type_date(self):
expected_schema = stream_schema['schema']['properties']['expires_at']
assert_that(column_schema, equal_to(expected_schema))

def test_type_date_time(self):
col = sample_db_data['columns'][6]
column_schema = tap_redshift.schema_for_column(col).to_dict()
stream_schema = expected_result['streams'][0]
expected_schema = stream_schema['schema']['properties']['created_at']
assert_that(column_schema, equal_to(expected_schema))

def test_valid_rep_keys(self, discovery_conn, expected_catalog_from_db):
actual_catalog = tap_redshift.discover_catalog(discovery_conn,
'public')
for i, actual_entry in enumerate(actual_catalog.streams):
expected_entry = expected_catalog_from_db.streams[i]
actual_metadata = metadata.to_map(actual_entry.metadata)
expected_metadata = metadata.to_map(expected_entry.metadata)
actual_valid_rep_keys = metadata.get(
actual_metadata, (), 'valid-replication-keys')
expected_valid_rep_keys = metadata.get(
expected_metadata, (), 'valid-replication-keys')
assert_that(actual_valid_rep_keys,
equal_to(expected_valid_rep_keys))

# TODO write tests for full and incremental sync

0 comments on commit d091943

Please sign in to comment.