Skip to content

Commit

Permalink
Merge branch 'transferwise:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
jlloyd-widen authored Sep 18, 2023
2 parents 039e841 + c0806f0 commit 325de8d
Show file tree
Hide file tree
Showing 31 changed files with 1,540 additions and 383 deletions.
41 changes: 0 additions & 41 deletions .circleci/config.yml

This file was deleted.

74 changes: 74 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: CI

on:
pull_request:
push:
branches:
- master

jobs:
lint_and_test:
name: Linting and Testing
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.7, 3.8, 3.9 ]

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Setup virtual environment
run: make venv

- name: Pylinting
run: make pylint

- name: Unit Tests
run: make unit_test

integration_test:
name: Integration Testing
runs-on: ubuntu-latest
environment: ci_tests
strategy:
matrix:
python-version: [ 3.8 ]
concurrency:
group: integration_tests-${{ github.head_ref }}
cancel-in-progress: true

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Setup virtual environment
run: make venv

- name: Integration tests
run: make integration_test
env:
TARGET_SNOWFLAKE_ACCOUNT: ${{ secrets.TARGET_SNOWFLAKE_ACCOUNT }}
TARGET_SNOWFLAKE_DBNAME: ${{ secrets.TARGET_SNOWFLAKE_DBNAME }}
TARGET_SNOWFLAKE_USER: ${{ secrets.TARGET_SNOWFLAKE_USER }}
TARGET_SNOWFLAKE_PASSWORD: ${{ secrets.TARGET_SNOWFLAKE_PASSWORD }}
TARGET_SNOWFLAKE_WAREHOUSE: ${{ secrets.TARGET_SNOWFLAKE_WAREHOUSE }}
TARGET_SNOWFLAKE_SCHEMA: ${{ secrets.TARGET_SNOWFLAKE_SCHEMA }}
TARGET_SNOWFLAKE_AWS_ACCESS_KEY: ${{ secrets.TARGET_SNOWFLAKE_AWS_ACCESS_KEY }}
TARGET_SNOWFLAKE_AWS_SECRET_ACCESS_KEY: ${{ secrets.TARGET_SNOWFLAKE_AWS_SECRET_ACCESS_KEY }}
TARGET_SNOWFLAKE_S3_BUCKET: ${{ secrets.TARGET_SNOWFLAKE_S3_BUCKET }}
TARGET_SNOWFLAKE_S3_KEY_PREFIX: ${{ secrets.TARGET_SNOWFLAKE_S3_KEY_PREFIX }}
TARGET_SNOWFLAKE_STAGE: ${{ secrets.TARGET_SNOWFLAKE_STAGE }}
TARGET_SNOWFLAKE_FILE_FORMAT_CSV: ${{ secrets.TARGET_SNOWFLAKE_FILE_FORMAT_CSV }}
TARGET_SNOWFLAKE_FILE_FORMAT_PARQUET: ${{ secrets.TARGET_SNOWFLAKE_FILE_FORMAT_PARQUET }}
CLIENT_SIDE_ENCRYPTION_MASTER_KEY: ${{ secrets.TARGET_SNOWFLAKE_CLIENT_SIDE_ENCRYPTION_MASTER_KEY }}
81 changes: 81 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,84 @@
2.3.0 (2023-08-08)
-------------------

*Changes*
- Update dependencies:
- snowflake-connector-python[pandas]
- boto3
- pytest
- python-dotenv


2.2.0 (2022-05-12)
-------------------

*Changes*
- Revert use of `ujson`


2.1.0 (2022-05-05)
-------------------

*Changes*
- Use `usjon` for JSON encoding/decoding

2.0.1 (2022-04-08)
-------------------

*Fixes*
- Only drop pk constraint if table has one
- Don't raise `PrimaryKeyNotFoundException` when a record has a flasy pk value


2.0.0 (2022-03-29)
-------------------

*Fixes*
- Respecting `flush_all_streams` when SCHEMA messages arrive.
- Improve logging for failed merge & copy queries.
- Drop NOT NULL constraint from primary key columns.
- Update PK constraints according to changes to SCHEMA's key properties.

*Changes*
- Dropping support for Python 3.6
- Adding support for Python 3.9
- Bump pytest to `7.1.1`
- Bump boto3 to `1.21`


1.15.0 (2022-01-14)
-------------------

*Added*
- Support parallelism for table stages

*Fixes*
- Emit last encountered state message if there are no records.

*Changes*
- Migrate CI to github actions
- Bump dependencies


1.14.1 (2021-10-14)
-------------------
- Increase `max_records` when selecting columns by an order of magnitude
- Bumping dependencies

1.14.0 (2021-09-30)
-------------------
- Add support for `date` property format
- Stop logging record when error happens

1.13.1 (2021-07-15)
-------------------
- Fixed an issue with S3 metadata required for decryption not being included in archived load files.

1.13.0 (2021-06-23)
-------------------
- Add `archive_load_files` parameter to optionally archive load files on S3
- Bumping dependencies

1.12.0 (2021-04-12)
-------------------
- Add optional `batch_wait_limit_seconds` parameter
Expand Down
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
venv:
python3 -m venv venv ;\
. ./venv/bin/activate ;\
pip install --upgrade pip setuptools wheel ;\
pip install -e .[test]

pylint:
. ./venv/bin/activate ;\
pylint --rcfile pylintrc target_snowflake/

unit_test:
. ./venv/bin/activate ;\
pytest tests/unit -vv --cov target_snowflake --cov-fail-under=67

integration_test:
. ./venv/bin/activate ;\
pytest tests/integration/ -vvx --cov target_snowflake --cov-fail-under=86
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,13 @@ Full list of options in `config.json`:
| batch_size_rows | Integer | | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. |
| batch_wait_limit_seconds | Integer | | (Default: None) Maximum time to wait for batch to reach `batch_size_rows`. |
| flush_all_streams | Boolean | | (Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems. |
| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. **Parallelism works only with external stages. If no s3_bucket defined with an external stage then flusing tables is enforced to use a single thread.**|
| parallelism | Integer | | (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max. |
| parallelism_max | Integer | | (Default: 16) Max number of parallel threads to use when flushing tables. |
| default_target_schema | String | | Name of the schema where the tables will be created, **without** database prefix. If `schema_mapping` is not defined then every stream sent by the tap is loaded into this schema. |
| default_target_schema_select_permission | String | | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If `schema_mapping` is not defined then every stream sent by the tap is granted accordingly. |
| schema_mapping | Object | | Useful if you want to load multiple streams from one tap to multiple Snowflake schemas.<br><br>If the tap sends the `stream_id` in `<schema_name>-<table_name>` format then this option overwrites the `default_target_schema` value. Note, that using `schema_mapping` you can overwrite the `default_target_schema_select_permission` value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.<br><br> **Note**: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example]
| disable_table_cache | Boolean | | (Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With `disable_table_cache` option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime. |
| client_side_encryption_master_key | String | | (Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string. |
| client_side_encryption_stage_object | String | | (Default: None) Required when `client_side_encryption_master_key` is defined. The name of the encrypted stage object in Snowflake that created separately and using the same encryption master key. |
| add_metadata_columns | Boolean | | (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix `_SDC_`. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the `_SDC_DELETED_AT` metadata column. Without the `add_metadata_columns` option the deleted rows from singer taps will not be recongisable in Snowflake. |
| hard_delete | Boolean | | (Default: False) When `hard_delete` option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the `_SDC_DELETED_AT` metadata column sent by the singer tap. Due to deleting rows requires metadata columns, `hard_delete` option automatically enables the `add_metadata_columns` option as well. |
| data_flattening_max_level | Integer | | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.<br><br>When value is 0 (default) then flattening functionality is turned off. |
Expand All @@ -176,6 +175,9 @@ Full list of options in `config.json`:
| temp_dir | String | | (Default: platform-dependent) Directory of temporary files with RECORD messages. |
| no_compression | Boolean | | (Default: False) Generate uncompressed files when loading to Snowflake. Normally, by default GZIP compressed files are generated. |
| query_tag | String | | (Default: None) Optional string to tag executed queries in Snowflake. Replaces tokens `{{database}}`, `{{schema}}` and `{{table}}` with the appropriate values. The tags are displayed in the output of the Snowflake `QUERY_HISTORY`, `QUERY_HISTORY_BY_*` functions. |
| archive_load_files | Boolean | | (Default: False) When enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` under the key `/{archive_load_files_s3_prefix}/{schema_name}/{table_name}/`. All archived files will have `tap`, `schema`, `table` and `archived-by` as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: `incremental-key`, `incremental-key-min` and `incremental-key-max`.
| archive_load_files_s3_prefix | String | | (Default: "archive") When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
| archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket.

### To run tests:

Expand All @@ -184,7 +186,7 @@ Full list of options in `config.json`:
export TARGET_SNOWFLAKE_ACCOUNT=<snowflake-account-name>
export TARGET_SNOWFLAKE_DBNAME=<snowflake-database-name>
export TARGET_SNOWFLAKE_USER=<snowflake-user>
export TARGET_SNOWFLAKE_PASSWORD=<snowfale-password>
export TARGET_SNOWFLAKE_PASSWORD=<snowflake-password>
export TARGET_SNOWFLAKE_WAREHOUSE=<snowflake-warehouse>
export TARGET_SNOWFLAKE_SCHEMA=<snowflake-schema>
export TARGET_SNOWFLAKE_AWS_ACCESS_KEY=<aws-access-key-id>
Expand All @@ -196,7 +198,6 @@ Full list of options in `config.json`:
export TARGET_SNOWFLAKE_FILE_FORMAT_CSV=<file-format-csv-object-with-schema-name>
export TARGET_SNOWFLAKE_FILE_FORMAT_PARQUET=<file-format-parquet-object-with-schema-name>
export CLIENT_SIDE_ENCRYPTION_MASTER_KEY=<client_side_encryption_master_key>
export CLIENT_SIDE_ENCRYPTION_STAGE_OBJECT=<client_side_encryption_stage_object>
```

2. Install python test dependencies in a virtual env and run unit and integration tests
Expand Down
26 changes: 14 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,34 @@
long_description = f.read()

setup(name="pipelinewise-target-snowflake",
version="1.12.0",
version="2.3.0",
description="Singer.io target for loading data to Snowflake - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
author="TransferWise",
author="Wise",
url='https://github.com/transferwise/pipelinewise-target-snowflake',
classifiers=[
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3 :: Only'
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
py_modules=["target_snowflake"],
python_requires='>=3.7',
install_requires=[
'pipelinewise-singer-python==1.*',
'snowflake-connector-python[pandas]==2.4.2',
'snowflake-connector-python[pandas]==3.0.4',
'inflection==0.5.1',
'joblib==1.0.1',
'numpy<1.21.0',
'python-dateutil==2.8.1'
'joblib==1.2.0',
'boto3==1.28.20',
],
extras_require={
"test": [
"mock==4.0.3",
"pylint==2.7.4",
'pytest==6.2.3',
'pytest-cov==2.11.1',
"python-dotenv==0.17.0"
"pylint==2.12.*",
'pytest==7.4.0',
'pytest-cov==3.0.0',
"python-dotenv>=0.19,<1.1"
]
},
entry_points="""
Expand Down
Loading

0 comments on commit 325de8d

Please sign in to comment.