Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti committed Jun 2, 2019
0 parents commit 3ea371c
Show file tree
Hide file tree
Showing 12 changed files with 831 additions and 0 deletions.
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# IDE
.vscode
.idea/*


# Python
__pycache__/
*.py[cod]
*$py.class
.virtualenvs
*.egg-info/

# Singer JSON files
properties.json
config.json
state.json

*.db
.DS_Store
venv
env
blog_old.md
node_modules
*.pyc
tmp

# Docs
docs/_build/
docs/_templates/
9 changes: 9 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
MIT License

Copyright (c) 2019 TransferWise Ltd. (https://transferwise.com)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
109 changes: 109 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# pipelinewise-transform-field

[![PyPI version](https://badge.fury.io/py/pipelinewise-transform-field.svg)](https://badge.fury.io/py/pipelinewise-transform-field)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pipelinewise-transform-field.svg)](https://pypi.org/project/pipelinewise-transform-field/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Transformation component between [Singer](https://www.singer.io/) taps and targets.

This is a [PipelineWise](https://transferwise.github.io/pipelinewise) compatible component.

## How to use it

The recommended method of running this component is to use it from [PipelineWise](https://transferwise.github.io/pipelinewise). When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at [Transformations](https://transferwise.github.io/pipelinewise/connectors/user_guide/transformations.html)

If you want to run this [Singer](https://singer.io) compatible component independently please read further.

## Install

First, make sure Python 3 is installed on your system or follow these
installation instructions for [Mac](http://docs.python-guide.org/en/latest/starting/install3/osx/) or
[Ubuntu](https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04).

It's recommended to use a virtualenv:

```bash
python3 -m venv venv
pip install pipelinewise-transform-field
```

or

```bash
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
```

### To run

Put it between a tap and a target with simple unix pipes:

`some-singer-tap | transform-field --transformations [transformations.json] | some-singer-target`

It's reading incoming messages from STDIN and using `transformations.json` to transform incoming RECORD messages.

**Note**: To avoid version conflicts run `tap`, `transform` and `targets` in separate virtual environments.

### Configuration

You need to defines which columns have to be transformed by which method and in which condition the transformation needs to be applied.

**Configuring directly from JSON**:

(Tip: PipelineWise generating this for you from a more readable YAML format)


```json
{
"transformations": [
{
"field_id": "password_hash",
"tap_stream_name": "stream-id-sent-by-the-tap",
"type": "SET-NULL"
},
{
"field_id": "salt",
"tap_stream_name": "stream-id-sent-by-the-tap",
"type": "SET-NULL"
},
{
"field_id": "value",
"tap_stream_name": "stream-id-sent-by-the-tap",
"type": "SET-NULL",
"when": [
{"column": "string_column_1", "equals": "Property" },
{"column": "numeric_column", "equals": 200 },
{"column": "string_column_2", "regex_match": "sensitive.*PII" }
]
}

]
}
```

### Transformation types

* **SET-NULL**: Transforms any input to NULL
* **HASH**: Transfroms string input to hash
* **HASH-SKIP-FIRST-n**: Transforms string input to hash skipping first n characters, e.g. HASH-SKIP-FIRST-2
* **MASK-DATA**: Transforms any date to stg
* **MASK-NUMBER**: Transforms any number to zero

### To run tests:

1. Install python dependencies in a virtual env and run nose unit and integration tests
```
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
pip install nose
```
1. To run tests:
```
nosetests --where=tests
```
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
singer-python==5.2.0
21 changes: 21 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

#!/usr/bin/env python

from setuptools import setup

setup(name='pipelinewise-transform-field',
version='1.0.0',
description='Singer.io simple field transformator between taps and targets - PipelineWise compatible',
author="TransferWise",
url='https://github.com/transferwise/pipelinewise-transform-field',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['transform_field'],
install_requires=[
'singer-python==5.2.0',
],
entry_points='''
[console_scripts]
transform-field=transform_field:main
''',
packages=['transform_field']
)
4 changes: 4 additions & 0 deletions tests/resources/invalid-json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_one"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
THIS IS A TEST INPUT FROM A TAP WITH A LINE WITH INVALID JSON
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_one", "version": 1}
11 changes: 11 additions & 0 deletions tests/resources/messages.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"type": "STATE", "value": {"currently_syncing": "dummy_stream"}}
{"type": "SCHEMA", "stream": "dummy_stream", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "column_1": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_2": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_3": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_4": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_5": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}, "column_6": {"inclusion": "available", "type": ["null", "integer"]}, "column_7": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "column_8": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}, "column_9": {"inclusion": "available", "type": ["null", "integer"]}, "column_10": {"inclusion": "available", "maxLength": 64, "type": ["null", "string"]}, "column_11": {"inclusion": "available", "maxLength": 64, "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "dummy_stream", "version": 1}
{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 1, "column_1": "Dummy row 1", "column_2": "Dummy row 1", "column_3": "Dummy row 1", "column_4": "Dummy row 1", "column_5": "2019-12-21T12:12:45", "column_6": 1234, "column_7": "Dummy row 1", "column_8": "2019-12-21T12:12:45", "column_9": 100, "column_10": "column_11 is safe to keep", "column_11": "My name is John"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 2, "column_1": "Dummy row 2", "column_2": "Dummy row 2", "column_3": "Dummy row 2", "column_4": "Dummy row 2", "column_5": "2019-12-21T13:12:45", "column_6": 1234, "column_7": "Dummy row 2", "column_8": "2019-12-21T13:12:45", "column_9": 200, "column_10": "column_11 has sensitive data. Needs to transform to NULL", "column_11": "SUPER_SECRET_PASSWORD"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 3, "column_1": "Dummy row 3", "column_2": "Dummy row 3", "column_3": "Dummy row 3", "column_4": "Dummy row 3", "column_5": "2019-12-21T14:12:45", "column_6": 1234, "column_7": "Dummy row 3", "column_8": "2019-12-21T14:12:45", "column_9": 300, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 3, "column_1": "Dummy row 4", "column_2": "Dummy row 4", "column_3": "Dummy row 4", "column_4": "Dummy row 4", "column_5": "2019-12-21T15:12:45", "column_6": 1234, "column_7": "Dummy row 4", "column_8": "2019-12-21T15:12:45", "column_9": 400, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "RECORD", "stream": "dummy_stream", "record": {"c_pk": 5, "column_1": "Dummy row 5", "column_2": "Dummy row 5", "column_3": "Dummy row 5", "column_4": "Dummy row 5", "column_5": "2019-12-21T16:12:45", "column_6": 1234, "column_7": "Dummy row 5", "column_8": "2019-12-21T16:12:45", "column_9": 500, "column_10": "Dummy row 1", "column_11": "Dummy row 1"}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "STATE", "value": {"currently_syncing": "dummy_stream", "bookmarks": {"dummy_stream": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "dummy_stream", "version": 1}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"dummy_stream": {"initial_full_table_complete": true}}}}
174 changes: 174 additions & 0 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import unittest
import os
import sys
import json
import tempfile

from nose.tools import assert_raises
from transform_field import TransformField, TransformFieldException


class Base(unittest.TestCase):
def setUp(self):
self.maxDiff = None

sys.stdout = self._stdout = tempfile.NamedTemporaryFile('w+', delete=True)
sys.stderr.write(self._stdout.name + ' ')


def teardown(self):
self._stdout.close()
sys.stdout = sys.__stdout__


@property
def stdout(self):
self._stdout.seek(0)
return self._stdout.read()[:-1] # Remove trailing \n:w


def get_tap_input_messages(self, filename):
lines = []
with open('{}/resources/{}'.format(os.path.dirname(__file__), filename)) as tap_stdout:
for line in tap_stdout.readlines():
lines.append(line)

return lines


def singer_output_to_objects(self, output):
messages = []
for message in output.splitlines():
messages.append(json.loads(message))

return messages


class TestEndToEnd(Base):

def test_invalid_json(self):
"""Receiving invalid JSONs should raise an exception"""
tap_lines = self.get_tap_input_messages('invalid-json.json')
trans_config = {'transformations': []}

transform_field = TransformField(trans_config)
with assert_raises(TransformFieldException):
transform_field.consume(tap_lines)


def test_multiple_singer_json_messages(self):
"""Test a bunch of singer messages with different field transformation types"""
tap_lines = self.get_tap_input_messages('messages.json')

# Set transformations on some columns
trans_config = {'transformations': [
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_1', 'type': 'SET-NULL' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_2', 'type': 'HASH' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_3', 'type': 'HASH-SKIP-FIRST-2' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_4', 'type': 'HASH-SKIP-FIRST-3' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_5', 'type': 'MASK-DATE' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_6', 'type': 'MASK-NUMBER' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_7', 'type': 'NOT-EXISTING-TRANSFORMATION-TYPE' },
{ 'tap_stream_name': 'dummy_stream', 'field_id': 'column_11', 'type': 'SET-NULL',
'when': [
{'column': 'column_7', 'equals': "Dummy row 2" },
{'column': 'column_9', 'equals': 200 },
{'column': 'column_10', 'regex_match': 'sensitive' },
]
}
]}

transform_field = TransformField(trans_config)
transform_field.consume(tap_lines)

singer_output_messages = self.singer_output_to_objects(self.stdout)

# First message is the STATE message
self.assertEquals(
singer_output_messages[0],
{
'type': 'STATE',
'value': {'currently_syncing': 'dummy_stream'}
}
)

# Second message is the SCHEMA message
self.assertEquals(
singer_output_messages[1],
{
'type': 'SCHEMA',
'stream': 'dummy_stream',
'schema': {
'properties': {
'c_pk': {'inclusion': 'automatic', 'minimum': -2147483648, 'maximum': 2147483647, 'type': ['null', 'integer']},
'column_1': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']},
'column_2': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']},
'column_3': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']},
'column_4': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']},
'column_5': {'inclusion':'available', 'format':'date-time', 'type': ['null', 'string']},
'column_6': {'inclusion':'available', 'type': ['null', 'integer']},
'column_7': {'inclusion':'available', 'maxLength': 16, 'type': ['null', 'string']},
'column_8': {'inclusion':'available', 'format':'date-time', 'type': ['null', 'string']},
'column_9': {'inclusion':'available', 'type': ['null', 'integer']},
'column_10': {'inclusion':'available', 'maxLength': 64, 'type': ['null', 'string']},
'column_11': {'inclusion':'available', 'maxLength': 64, 'type': ['null', 'string']},
},
'type': 'object'
},
'key_properties': ['c_pk']
}
)

# Third message is a RECORD message with transformed values
self.assertEquals(
singer_output_messages[2],
{
'type': 'RECORD',
'stream': 'dummy_stream',
'record': {
'c_pk': 1,
'column_1': None, # should be SET-NULL transformed
'column_2': 'c584d22683f3e523', # Should be HASH transformed
'column_3': 'Ducd571661edac8d', # Should be HASH-SKIP-2 tranformed
'column_4': 'Dum1fe9627d907b0', # Should be HASH-SKIP-3 tranformed
'column_5': '2019-01-01T12:12:45', # Should be MASK-DATE transformed
'column_6': 0, # Should be MASK-NUMBER transformed
'column_7': 'Dummy row 1', # Should be the originl value - Unknown transformation type
'column_8': '2019-12-21T12:12:45', # Should be the original date-time value
'column_9': 100, # Should be the original number value

# Conditional transformation
'column_10': 'column_11 is safe to keep',
'column_11': 'My name is John',
},
'version': 1,
'time_extracted': '2019-01-31T15:51:50.215998Z'
}
)

# Third message is a RECORD message with transformed values
self.assertEquals(
singer_output_messages[3],
{
'type': 'RECORD',
'stream': 'dummy_stream',
'record': {
'c_pk': 2,
'column_1': None, # should be SET-NULL transformed
'column_2': '12c7ca803f4ae404', # Should be HASH tranformed
'column_3': 'Du7c2717bbc7489d', # Should be HASH-SKIP-3 tranformed
'column_4': 'Dum5b2be872199a8', # Should be HASH-SKIP-3 tranformed
'column_5': '2019-01-01T13:12:45', # Should be MASK-DATE transformed
'column_6': 0, # Should be MASK-NUMBER transformed
'column_7': 'Dummy row 2', # Should be the origian value - Unknown transformation type
'column_8': '2019-12-21T13:12:45', # Should be the original date-time value
'column_9': 200, # Should be the original number value

# Conditional transformation
'column_10': 'column_11 has sensitive data. Needs to transform to NULL',
'column_11': None, # Should be SET-NULL transformed
},
'version': 1,
'time_extracted': '2019-01-31T15:51:50.215998Z'
}
)
Loading

0 comments on commit 3ea371c

Please sign in to comment.