From 3ea371cc35525878f7e4d9499b497ab19782f87e Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Sun, 2 Jun 2019 16:37:17 +0100 Subject: [PATCH] initial commit --- .gitignore | 29 ++++ LICENSE | 9 ++ README.md | 109 ++++++++++++++ requirements.txt | 1 + setup.py | 21 +++ tests/resources/invalid-json.json | 4 + tests/resources/messages.json | 11 ++ tests/test_e2e.py | 174 ++++++++++++++++++++++ tests/test_transformations.py | 112 ++++++++++++++ transform_field/__init__.py | 238 ++++++++++++++++++++++++++++++ transform_field/timings.py | 35 +++++ transform_field/transform.py | 88 +++++++++++ 12 files changed, 831 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 tests/resources/invalid-json.json create mode 100644 tests/resources/messages.json create mode 100644 tests/test_e2e.py create mode 100644 tests/test_transformations.py create mode 100644 transform_field/__init__.py create mode 100644 transform_field/timings.py create mode 100644 transform_field/transform.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1265da6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# IDE +.vscode +.idea/* + + +# Python +__pycache__/ +*.py[cod] +*$py.class +.virtualenvs +*.egg-info/ + +# Singer JSON files +properties.json +config.json +state.json + +*.db +.DS_Store +venv +env +blog_old.md +node_modules +*.pyc +tmp + +# Docs +docs/_build/ +docs/_templates/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6675fc3 --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2019 TransferWise Ltd. (https://transferwise.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..3ef5fcb --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +# pipelinewise-transform-field + +[![PyPI version](https://badge.fury.io/py/pipelinewise-transform-field.svg)](https://badge.fury.io/py/pipelinewise-transform-field) +[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pipelinewise-transform-field.svg)](https://pypi.org/project/pipelinewise-transform-field/) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) + +Transformation component between [Singer](https://www.singer.io/) taps and targets. + +This is a [PipelineWise](https://transferwise.github.io/pipelinewise) compatible component. + +## How to use it + +The recommended method of running this component is to use it from [PipelineWise](https://transferwise.github.io/pipelinewise). When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at [Transformations](https://transferwise.github.io/pipelinewise/connectors/user_guide/transformations.html) + +If you want to run this [Singer](https://singer.io) compatible component independently please read further. + +## Install + +First, make sure Python 3 is installed on your system or follow these +installation instructions for [Mac](http://docs.python-guide.org/en/latest/starting/install3/osx/) or +[Ubuntu](https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04). + +It's recommended to use a virtualenv: + +```bash + python3 -m venv venv + pip install pipelinewise-transform-field +``` + +or + +```bash + python3 -m venv venv + . venv/bin/activate + pip install --upgrade pip + pip install . +``` + +### To run + +Put it between a tap and a target with simple unix pipes: + +`some-singer-tap | transform-field --transformations [transformations.json] | some-singer-target` + +It's reading incoming messages from STDIN and using `transformations.json` to transform incoming RECORD messages. + +**Note**: To avoid version conflicts run `tap`, `transform` and `targets` in separate virtual environments. + +### Configuration + +You need to defines which columns have to be transformed by which method and in which condition the transformation needs to be applied. + +**Configuring directly from JSON**: + +(Tip: PipelineWise generating this for you from a more readable YAML format) + + + ```json + { + "transformations": [ + { + "field_id": "password_hash", + "tap_stream_name": "stream-id-sent-by-the-tap", + "type": "SET-NULL" + }, + { + "field_id": "salt", + "tap_stream_name": "stream-id-sent-by-the-tap", + "type": "SET-NULL" + }, + { + "field_id": "value", + "tap_stream_name": "stream-id-sent-by-the-tap", + "type": "SET-NULL", + "when": [ + {"column": "string_column_1", "equals": "Property" }, + {"column": "numeric_column", "equals": 200 }, + {"column": "string_column_2", "regex_match": "sensitive.*PII" } + ] + } + + ] + } + ``` + +### Transformation types + +* **SET-NULL**: Transforms any input to NULL +* **HASH**: Transfroms string input to hash +* **HASH-SKIP-FIRST-n**: Transforms string input to hash skipping first n characters, e.g. HASH-SKIP-FIRST-2 +* **MASK-DATA**: Transforms any date to stg +* **MASK-NUMBER**: Transforms any number to zero + +### To run tests: + +1. Install python dependencies in a virtual env and run nose unit and integration tests +``` + python3 -m venv venv + . venv/bin/activate + pip install --upgrade pip + pip install . + pip install nose +``` + +1. To run tests: +``` + nosetests --where=tests +``` + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2a1c896 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +singer-python==5.2.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..608e9ca --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ + +#!/usr/bin/env python + +from setuptools import setup + +setup(name='pipelinewise-transform-field', + version='1.0.0', + description='Singer.io simple field transformator between taps and targets - PipelineWise compatible', + author="TransferWise", + url='https://github.com/transferwise/pipelinewise-transform-field', + classifiers=['Programming Language :: Python :: 3 :: Only'], + py_modules=['transform_field'], + install_requires=[ + 'singer-python==5.2.0', + ], + entry_points=''' + [console_scripts] + transform-field=transform_field:main + ''', + packages=['transform_field'] +) \ No newline at end of file diff --git a/tests/resources/invalid-json.json b/tests/resources/invalid-json.json new file mode 100644 index 0000000..13784c2 --- /dev/null +++ b/tests/resources/invalid-json.json @@ -0,0 +1,4 @@ +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]} +THIS IS A TEST INPUT FROM A TAP WITH A LINE WITH INVALID JSON +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1} diff --git a/tests/resources/messages.json b/tests/resources/messages.json new file mode 100644 index 0000000..cd9be3c --- /dev/null +++ b/tests/resources/messages.json @@ -0,0 +1,11 @@ +{"type": "STATE", "value": {"currently_syncing": "dummy_stream"}} +{"type": "SCHEMA", "stream": "dummy_stream", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "column_1": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_2": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_3": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_4": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_5": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}, "column_6": {"inclusion": "available", "type": ["null", "integer"]}, "column_7": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_8": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}, "column_9": {"inclusion": "available", "type": ["null", "integer"]}, "column_10": {"inclusion": "available", "maxLength": 64, "type": ["null", "string"]}, "column_11": {"inclusion": "available", "maxLength": 64, "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]} +{"type": "ACTIVATE_VERSION", "stream": "dummy_stream", "version": 1} +{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 1, "column_1": "Dummy row 1", "column_2": "Dummy row 1", "column_3": "Dummy row 1", "column_4": "Dummy row 1", "column_5": "2019-12-21T12:12:45", "column_6": 1234, "column_7": "Dummy row 1", "column_8": "2019-12-21T12:12:45", "column_9": 100, "column_10": "column_11 is safe to keep", "column_11": "My name is John"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 2, "column_1": "Dummy row 2", "column_2": "Dummy row 2", "column_3": "Dummy row 2", "column_4": "Dummy row 2", "column_5": "2019-12-21T13:12:45", "column_6": 1234, "column_7": "Dummy row 2", "column_8": "2019-12-21T13:12:45", "column_9": 200, "column_10": "column_11 has sensitive data. Needs to transform to NULL", "column_11": "SUPER_SECRET_PASSWORD"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 3, "column_1": "Dummy row 3", "column_2": "Dummy row 3", "column_3": "Dummy row 3", "column_4": "Dummy row 3", "column_5": "2019-12-21T14:12:45", "column_6": 1234, "column_7": "Dummy row 3", "column_8": "2019-12-21T14:12:45", "column_9": 300, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 3, "column_1": "Dummy row 4", "column_2": "Dummy row 4", "column_3": "Dummy row 4", "column_4": "Dummy row 4", "column_5": "2019-12-21T15:12:45", "column_6": 1234, "column_7": "Dummy row 4", "column_8": "2019-12-21T15:12:45", "column_9": 400, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 5, "column_1": "Dummy row 5", "column_2": "Dummy row 5", "column_3": "Dummy row 5", "column_4": "Dummy row 5", "column_5": "2019-12-21T16:12:45", "column_6": 1234, "column_7": "Dummy row 5", "column_8": "2019-12-21T16:12:45", "column_9": 500, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"} +{"type": "STATE", "value": {"currently_syncing": "dummy_stream", "bookmarks": {"dummy_stream": {"initial_full_table_complete": true}}}} +{"type": "ACTIVATE_VERSION", "stream": "dummy_stream", "version": 1} +{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"dummy_stream": {"initial_full_table_complete": true}}}} diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..9e84d29 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,174 @@ +import unittest +import os +import sys +import json +import tempfile + +from nose.tools import assert_raises +from transform_field import TransformField, TransformFieldException + + +class Base(unittest.TestCase): + def setUp(self): + self.maxDiff = None + + sys.stdout = self._stdout = tempfile.NamedTemporaryFile('w+', delete=True) + sys.stderr.write(self._stdout.name + ' ') + + + def teardown(self): + self._stdout.close() + sys.stdout = sys.__stdout__ + + + @property + def stdout(self): + self._stdout.seek(0) + return self._stdout.read()[:-1] # Remove trailing \n:w + + + def get_tap_input_messages(self, filename): + lines = [] + with open('{}/resources/{}'.format(os.path.dirname(__file__), filename)) as tap_stdout: + for line in tap_stdout.readlines(): + lines.append(line) + + return lines + + + def singer_output_to_objects(self, output): + messages = [] + for message in output.splitlines(): + messages.append(json.loads(message)) + + return messages + + +class TestEndToEnd(Base): + + def test_invalid_json(self): + """Receiving invalid JSONs should raise an exception""" + tap_lines = self.get_tap_input_messages('invalid-json.json') + trans_config = {'transformations': []} + + transform_field = TransformField(trans_config) + with assert_raises(TransformFieldException): + transform_field.consume(tap_lines) + + + def test_multiple_singer_json_messages(self): + """Test a bunch of singer messages with different field transformation types""" + tap_lines = self.get_tap_input_messages('messages.json') + + # Set transformations on some columns + trans_config = {'transformations': [ + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_1', 'type': 'SET-NULL' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_2', 'type': 'HASH' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_3', 'type': 'HASH-SKIP-FIRST-2' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_4', 'type': 'HASH-SKIP-FIRST-3' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_5', 'type': 'MASK-DATE' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_6', 'type': 'MASK-NUMBER' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_7', 'type': 'NOT-EXISTING-TRANSFORMATION-TYPE' }, + { 'tap_stream_name': 'dummy_stream', 'field_id': 'column_11', 'type': 'SET-NULL', + 'when': [ + {'column': 'column_7', 'equals': "Dummy row 2" }, + {'column': 'column_9', 'equals': 200 }, + {'column': 'column_10', 'regex_match': 'sensitive' }, + ] + } + ]} + + transform_field = TransformField(trans_config) + transform_field.consume(tap_lines) + + singer_output_messages = self.singer_output_to_objects(self.stdout) + + # First message is the STATE message + self.assertEquals( + singer_output_messages[0], + { + 'type': 'STATE', + 'value': {'currently_syncing': 'dummy_stream'} + } + ) + + # Second message is the SCHEMA message + self.assertEquals( + singer_output_messages[1], + { + 'type': 'SCHEMA', + 'stream': 'dummy_stream', + 'schema': { + 'properties': { + 'c_pk': {'inclusion': 'automatic', 'minimum': -2147483648, 'maximum': 2147483647, 'type': ['null', 'integer']}, + 'column_1': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']}, + 'column_2': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']}, + 'column_3': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']}, + 'column_4': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']}, + 'column_5': {'inclusion':'available', 'format':'date-time', 'type': ['null', 'string']}, + 'column_6': {'inclusion':'available', 'type': ['null', 'integer']}, + 'column_7': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']}, + 'column_8': {'inclusion':'available', 'format':'date-time', 'type': ['null', 'string']}, + 'column_9': {'inclusion':'available', 'type': ['null', 'integer']}, + 'column_10': {'inclusion':'available', 'maxLength': 64, 'type': ['null', 'string']}, + 'column_11': {'inclusion':'available', 'maxLength': 64, 'type': ['null', 'string']}, + }, + 'type': 'object' + }, + 'key_properties': ['c_pk'] + } + ) + + # Third message is a RECORD message with transformed values + self.assertEquals( + singer_output_messages[2], + { + 'type': 'RECORD', + 'stream': 'dummy_stream', + 'record': { + 'c_pk': 1, + 'column_1': None, # should be SET-NULL transformed + 'column_2': 'c584d22683f3e523', # Should be HASH transformed + 'column_3': 'Ducd571661edac8d', # Should be HASH-SKIP-2 tranformed + 'column_4': 'Dum1fe9627d907b0', # Should be HASH-SKIP-3 tranformed + 'column_5': '2019-01-01T12:12:45', # Should be MASK-DATE transformed + 'column_6': 0, # Should be MASK-NUMBER transformed + 'column_7': 'Dummy row 1', # Should be the originl value - Unknown transformation type + 'column_8': '2019-12-21T12:12:45', # Should be the original date-time value + 'column_9': 100, # Should be the original number value + + # Conditional transformation + 'column_10': 'column_11 is safe to keep', + 'column_11': 'My name is John', + }, + 'version': 1, + 'time_extracted': '2019-01-31T15:51:50.215998Z' + } + ) + + # Third message is a RECORD message with transformed values + self.assertEquals( + singer_output_messages[3], + { + 'type': 'RECORD', + 'stream': 'dummy_stream', + 'record': { + 'c_pk': 2, + 'column_1': None, # should be SET-NULL transformed + 'column_2': '12c7ca803f4ae404', # Should be HASH tranformed + 'column_3': 'Du7c2717bbc7489d', # Should be HASH-SKIP-3 tranformed + 'column_4': 'Dum5b2be872199a8', # Should be HASH-SKIP-3 tranformed + 'column_5': '2019-01-01T13:12:45', # Should be MASK-DATE transformed + 'column_6': 0, # Should be MASK-NUMBER transformed + 'column_7': 'Dummy row 2', # Should be the origian value - Unknown transformation type + 'column_8': '2019-12-21T13:12:45', # Should be the original date-time value + 'column_9': 200, # Should be the original number value + + # Conditional transformation + 'column_10': 'column_11 has sensitive data. Needs to transform to NULL', + 'column_11': None, # Should be SET-NULL transformed + }, + 'version': 1, + 'time_extracted': '2019-01-31T15:51:50.215998Z' + } + ) diff --git a/tests/test_transformations.py b/tests/test_transformations.py new file mode 100644 index 0000000..e8d676b --- /dev/null +++ b/tests/test_transformations.py @@ -0,0 +1,112 @@ +import unittest +from nose.tools import assert_raises + +import transform_field + +import hashlib + + +class TestUnit(unittest.TestCase): + """ + Unit Tests + """ + @classmethod + def setUp(self): + self.config = {} + + + def test_set_null(self): + """TEST SET-NULL transformation""" + self.assertEquals( + transform_field.transform.do_transform({"col_1":"John"}, "col_1", "SET-NULL"), + None + ) + + + def test_hash(self): + """Test HASH transformation""" + self.assertEquals( + transform_field.transform.do_transform({"col_1":"John"}, "col_1", "HASH"), + hashlib.sha256("John".encode('utf-8')).hexdigest() + ) + + + def test_mask_date(self): + """Test MASK-DATE transformation""" + self.assertEquals( + transform_field.transform.do_transform({"col_1":"2019-05-21"}, "col_1", "MASK-DATE"), + "2019-01-01T00:00:00" + ) + + # Mask date should keep the time elements + self.assertEquals( + transform_field.transform.do_transform({"col_1":"2019-05-21T13:34:11"}, "col_1", "MASK-DATE"), + "2019-01-01T13:34:11" + ) + + # Mask date should keep the time elements + self.assertEquals( + transform_field.transform.do_transform({"col_1":"2019-05-21T13:34:99"}, "col_1", "MASK-DATE"), + "2019-05-21T13:34:99" + ) + + + def test_mask_number(self): + """Test MASK-NUMBER transformation""" + self.assertEquals( + transform_field.transform.do_transform({"col_1":"1234567890"}, "col_1", "MASK-NUMBER"), + 0 + ) + + + def test_unknown_transformation_type(self): + """Test not existing transformation type""" + # Should return the original value + self.assertEqual( + transform_field.transform.do_transform({"col_1":"John"}, "col_1", "NOT-EXISTING-TRANSFORMATION-TYPE"), + "John" + ) + + + def test_conditions(self): + """Test conditional transformations""" + + # Matching condition: Should transform to NULL + self.assertEqual( + transform_field.transform.do_transform( + # Record: + {"col_1":"com.transferwise.fx.user.User", "col_2":"passwordHash", "col_3":"lkj"}, + # Column to transform: + "col_3", + # Transform method: + "SET-NULL", + # Conditions when to transform: + [ + {'column': 'col_1', 'equals': "com.transferwise.fx.user.User" }, + {'column': 'col_2', 'equals': "passwordHash" }, + ] + ), + + # Expected output: + None + ) + + # Not matching condition: Should keep the original value + self.assertEqual( + transform_field.transform.do_transform( + # Record: + {"col_1":"com.transferwise.fx.user.User", "col_2":"id", "col_3":"123456789"}, + # Column to transform: + "col_3", + # Transform method: + "SET-NULL", + # Conditions when to transform: + [ + {'column': 'col_1', 'equals': "com.transferwise.fx.user.User" }, + {'column': 'col_2', 'equals': "passwordHash" }, + ] + ), + + # Expected output: + "123456789" + ) \ No newline at end of file diff --git a/transform_field/__init__.py b/transform_field/__init__.py new file mode 100644 index 0000000..de8cf3c --- /dev/null +++ b/transform_field/__init__.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 + +import argparse +import io +import sys +import json +import time + +from collections import namedtuple +from decimal import Decimal +from jsonschema import ValidationError, Draft4Validator, FormatChecker +import singer +from singer import utils + +import transform_field.transform + +from transform_field.timings import Timings + +LOGGER = singer.get_logger() +TIMINGS = Timings(LOGGER) +DEFAULT_MAX_BATCH_BYTES = 4000000 +DEFAULT_MAX_BATCH_RECORDS = 20000 +DEFAULT_BATCH_DELAY_SECONDS = 300.0 + +StreamMeta = namedtuple('StreamMeta', ['schema', 'key_properties', 'bookmark_properties']) +TransMeta = namedtuple('TransMeta', ['field_id', 'type', 'when']) + +REQUIRED_CONFIG_KEYS = [ + "transformations" +] + +def float_to_decimal(value): + '''Walk the given data structure and turn all instances of float into + double.''' + if isinstance(value, float): + return Decimal(str(value)) + if isinstance(value, list): + return [float_to_decimal(child) for child in value] + if isinstance(value, dict): + return {k: float_to_decimal(v) for k, v in value.items()} + return value + +class TransformFieldException(Exception): + '''A known exception for which we don't need to pring a stack trace''' + pass + +class TransformField(object): + def __init__(self, trans_config): + self.trans_config = trans_config + self.messages = [] + self.buffer_size_bytes = 0 + self.state = None + + # TODO: Make it configurable + self.max_batch_bytes = DEFAULT_MAX_BATCH_BYTES + self.max_batch_records = DEFAULT_MAX_BATCH_RECORDS + + # Minimum frequency to send a batch, used with self.time_last_batch_sent + self.batch_delay_seconds = DEFAULT_BATCH_DELAY_SECONDS + + # Time that the last batch was sent + self.time_last_batch_sent = time.time() + + # Mapping from stream name to {'schema': ..., 'key_names': ..., 'bookmark_names': ... } + self.stream_meta = {} + + # Writer that we write state records to + self.state_writer = sys.stdout + + # Mapping from transformation stream to {'stream': [ 'field_id': ..., 'type': ... ] ... } + self.trans_meta = {} + for trans in trans_config["transformations"]: + # Naming differences in stream ids: + # 1. properties.json and transformation_json using 'tap_stream_id' + # 2. taps send in the 'stream' key in singer messages + stream = trans["tap_stream_name"] + if stream not in self.trans_meta: + self.trans_meta[stream] = [] + + self.trans_meta[stream].append(TransMeta( + trans["field_id"], + trans["type"], + trans.get('when') + )) + + def flush(self): + '''Give batch to handlers to process''' + + if self.messages: + stream = self.messages[0].stream + stream_meta = self.stream_meta[stream] + + # Transform columns + messages = self.messages + schema = float_to_decimal(stream_meta.schema) + key_properties = stream_meta.key_properties + validator = Draft4Validator(schema, format_checker=FormatChecker()) + trans_meta = [] + if stream in self.trans_meta: + trans_meta = self.trans_meta[stream] + + for i, message in enumerate(messages): + if isinstance(message, singer.RecordMessage): + + # Do transformation on every column where it is required + for trans in trans_meta: + + if trans.field_id in message.record: + transformed = transform.do_transform(message.record, trans.field_id, trans.type, trans.when) + + # Truncate to transformed value to the max allowed length if required + if transformed is not None: + max_length = False + if trans.field_id in schema['properties']: + if 'maxLength' in schema['properties'][trans.field_id]: + max_length = schema['properties'][trans.field_id]['maxLength'] + + if max_length: + message.record[trans.field_id] = transformed[:max_length] + else: + message.record[trans.field_id] = transformed + else: + message.record[trans.field_id] = transformed + + # Validate the transformed columns + data = float_to_decimal(message.record) + try: + validator.validate(data) + if key_properties: + for k in key_properties: + if k not in data: + raise TransformFieldException( + 'Message {} is missing key property {}'.format( + i, k)) + + # Write the transformed message + singer.write_message(message) + + except Exception as e: + if type(e).__name__ == "InvalidOperation": + raise TransformFieldException( + "Record does not pass schema validation. RECORD: {}\n'multipleOf' validations that allows long precisions are not supported (i.e. with 15 digits or more). Try removing 'multipleOf' methods from JSON schema.\n{}" + .format(message.record, e) + ) + else: + raise TransformFieldException( + "Record does not pass schema validation. RECORD: {}\n{}".format(message.record, e)) + + LOGGER.debug("Batch is valid with {} messages".format(len(messages))) + + # Update stats + self.time_last_batch_sent = time.time() + self.messages = [] + self.buffer_size_bytes = 0 + + if self.state: + singer.write_message(singer.StateMessage(self.state)) + self.state = None + + TIMINGS.log_timings() + + def handle_line(self, line): + '''Takes a raw line from stdin and transforms it''' + try : + message = singer.parse_message(line) + + if not message: + raise TransformFieldException('Unknown message type') + except Exception as exc: + raise TransformFieldException('Failed to process incoming message: {}\n{}'.format(line, exc)) + + LOGGER.debug(message) + + # If we got a Schema, set the schema and key properties for this + # stream. Flush the batch, if there is one, in case the schema is + # different + if isinstance(message, singer.SchemaMessage): + self.flush() + + self.stream_meta[message.stream] = StreamMeta( + message.schema, + message.key_properties, + message.bookmark_properties) + + # Write the transformed message + singer.write_message(message) + + elif isinstance(message, (singer.RecordMessage, singer.ActivateVersionMessage)): + if self.messages and ( + message.stream != self.messages[0].stream or + message.version != self.messages[0].version): + self.flush() + self.messages.append(message) + self.buffer_size_bytes += len(line) + + num_bytes = self.buffer_size_bytes + num_messages = len(self.messages) + num_seconds = time.time() - self.time_last_batch_sent + + enough_bytes = num_bytes >= self.max_batch_bytes + enough_messages = num_messages >= self.max_batch_records + enough_time = num_seconds >= self.batch_delay_seconds + if enough_bytes or enough_messages or enough_time: + LOGGER.debug('Flushing %d bytes, %d messages, after %.2f seconds', num_bytes, num_messages, num_seconds) + self.flush() + + elif isinstance(message, singer.StateMessage): + self.state = message.value + + def consume(self, reader): + '''Consume all the lines from the queue, flushing when done.''' + for line in reader: + self.handle_line(line) + self.flush() + +def main_impl(): + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + trans_config = {'transformations': args.config['transformations']} + + reader = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') + TransformField(trans_config).consume(reader) + LOGGER.info("Exiting normally") + +def main(): + '''Main entry point''' + try: + main_impl() + + except TransformFieldException as exc: + for line in str(exc).splitlines(): + LOGGER.critical(line) + sys.exit(1) + except Exception as exc: + LOGGER.critical(exc) + raise exc + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/transform_field/timings.py b/transform_field/timings.py new file mode 100644 index 0000000..96c5a49 --- /dev/null +++ b/transform_field/timings.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +import time + +from contextlib import contextmanager + +class Timings(object): + '''Gathers timing information for the three main steps of the Transformer.''' + def __init__(self, logger): + self.LOGGER = logger + self.last_time = time.time() + self.timings = { + 'validating': 0.0, + 'transforming': 0.0, + None: 0.0 + } + + @contextmanager + def mode(self, mode): + '''We wrap the big steps of the Tap in this context manager to accumulate + timing info.''' + + start = time.time() + yield + end = time.time() + self.timings[None] += start - self.last_time + self.timings[mode] += end - start + self.last_time = end + + def log_timings(self): + '''We call this with every flush to print out the accumulated timings''' + self.LOGGER.debug('Timings: unspecified: %.3f; validating: %.3f; transforming: %.3f;', + self.timings[None], + self.timings['validating'], + self.timings['transforming']) diff --git a/transform_field/transform.py b/transform_field/transform.py new file mode 100644 index 0000000..740e3b6 --- /dev/null +++ b/transform_field/transform.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +import sys +import hashlib +import re + +from datetime import datetime +from dateutil import parser + + +def is_transform_required(record, when): + """Detects if the transformation is required or not based on + the defined conditions and the actual values in a record""" + transform_required = False + + # Check if conditional transformation matches criterias + if when: + + # Evaluate every condition + for condition in when: + column_to_match = condition.get('column') + column_value = record.get(column_to_match, "") + cond_equals = condition.get('equals') + cond_pattern = condition.get('regex_match') + + # Exact condition + if cond_equals: + if column_value == cond_equals: + transform_required = True + else: + transform_required = False + break + + # Regex based condition + if cond_pattern: + matcher = re.compile(cond_pattern) + if matcher.search(column_value): + transform_required = True + + # Condition does not meet, exit the loop + else: + transform_required = False + break + + # Transformation is always required if 'when' condition not defined + else: + transform_required = True + + return transform_required + + +def do_transform(record, field, trans_type, when=None): + """Transform a value by a certain transformation type. + Optionally can set conditional criterias based on other + values of the record""" + try: + value = record.get(field) + + # Do transformation only if required + if is_transform_required(record, when): + + # Transforms any input to NULL + if trans_type == "SET-NULL": + return None + # Transfroms string input to hash + elif trans_type == "HASH": + return hashlib.sha256(value.encode('utf-8')).hexdigest() + # Transforms string input to hash skipping first n characters, e.g. HASH-SKIP-FIRST-2 + elif 'HASH-SKIP-FIRST' in trans_type: + return value[:int(trans_type[-1])] + hashlib.sha256(value.encode('utf-8')[int(trans_type[-1]):]).hexdigest() + # Transforms any date to stg + elif trans_type == "MASK-DATE": + return parser.parse(value).replace(month=1, day=1).isoformat() + # Transforms any number to zero + elif trans_type == "MASK-NUMBER": + return 0 + # Return the original value if cannot find transformation type + else: + return value + + # Return the original value if cannot find transformation type + else: + return value + + # Return the original value if cannot transform + except Exception: + return value +