From 44f8fb232c3d75a052cc56dddc9575d7a5e0d5b4 Mon Sep 17 00:00:00 2001 From: Abdul Salam <104752966+fivetran-abdulsalam@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:41:34 +0530 Subject: [PATCH] feature(partner-sdk): added python destination example (#30) * destination tester * deleted duplicate requirement file * refactor * refactor based on comments * deleted destination folder * change readme * changed mode and added comments * changed grpc version in requirements --- .gitignore | 5 ++ examples/destination/python/README.md | 15 ++++ examples/destination/python/build.sh | 29 +++++++ examples/destination/python/main.py | 88 ++++++++++++++++++++ examples/destination/python/read_csv.py | 32 +++++++ examples/destination/python/requirements.txt | 8 ++ examples/destination/python/run.sh | 3 + 7 files changed, 180 insertions(+) create mode 100644 examples/destination/python/README.md create mode 100755 examples/destination/python/build.sh create mode 100644 examples/destination/python/main.py create mode 100644 examples/destination/python/read_csv.py create mode 100644 examples/destination/python/requirements.txt create mode 100755 examples/destination/python/run.sh diff --git a/.gitignore b/.gitignore index f38b613..99cdbb1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,8 @@ bin/* examples/**/*.proto examples/**/*.pb.go +examples/**/*pb2.py +examples/**/*pb2.pyi +examples/**/*pb2_grpc.py +destination_run/ +**/__pycache__/ \ No newline at end of file diff --git a/examples/destination/python/README.md b/examples/destination/python/README.md new file mode 100644 index 0000000..f154a71 --- /dev/null +++ b/examples/destination/python/README.md @@ -0,0 +1,15 @@ +# Python Destination Example + +## Pre-requisites +- Python 3.9 or later + +## Steps +- Run the build.sh file to copy protos, install python dependencies in virtual environment +```commandline +sh build.sh +``` + +- Execute `run.sh` to run the connector +```commandline +sh run.sh +``` \ No newline at end of file diff --git a/examples/destination/python/build.sh b/examples/destination/python/build.sh new file mode 100755 index 0000000..77bf52c --- /dev/null +++ b/examples/destination/python/build.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +#Create virtual environment +python3 -m venv destination_run + +#Activate virtual environment +source destination_run/bin/activate + +# Make a directory protos +mkdir -p protos + +# Copy proto files t oprotos directory +cp ../../../*.proto protos/ + +# Install the required packages +pip install -r requirements.txt + +# Make a directory sdk_pb2 +mkdir -p sdk_pb2 + +# Generate grpc python code and store it in sdk_pb2 +python -m grpc_tools.protoc \ + --proto_path=./protos/ \ + --python_out=sdk_pb2 \ + --pyi_out=sdk_pb2 \ + --grpc_python_out=sdk_pb2 protos/*.proto + +# Deactivate virtual environment +deactivate \ No newline at end of file diff --git a/examples/destination/python/main.py b/examples/destination/python/main.py new file mode 100644 index 0000000..f222f6a --- /dev/null +++ b/examples/destination/python/main.py @@ -0,0 +1,88 @@ +from concurrent import futures +import grpc +import read_csv +import sys +sys.path.append('sdk_pb2') + +from sdk_pb2 import destination_sdk_pb2 +from sdk_pb2 import common_pb2 +from sdk_pb2 import destination_sdk_pb2_grpc + + +class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer): + def ConfigurationForm(self, request, context): + + host = common_pb2.FormField(name="host", label="Host", required=True, + text_field=common_pb2.TextField.PlainText) + password = common_pb2.FormField(name="password", label="Password", required=True, + text_field=common_pb2.TextField.Password) + region = common_pb2.FormField(name="region", label="AWS Region", required=False, + dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])) + hidden = common_pb2.FormField(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden) + is_public = common_pb2.FormField(name="isPublic", label="Public?", description="Is this public?", + toggle_field=common_pb2.ToggleField()) + + connect_test = common_pb2.ConfigurationTest(name="connect", label="Tests connection") + select_test = common_pb2.ConfigurationTest(name="select", label="Tests selection") + return common_pb2.ConfigurationFormResponse( + schema_selection_supported=True, + table_selection_supported=True, + fields=[host, password, region, hidden, + is_public], + tests=[connect_test, select_test] + + ) + + def Test(self, request, context): + test_name = request.name + print("test name: " + test_name) + return common_pb2.TestResponse(success=True) + + def CreateTable(self, request, context): + print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns)) + return destination_sdk_pb2.CreateTableResponse(success=True) + + def AlterTable(self, request, context): + res: destination_sdk_pb2.AlterTableResponse + + print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns)) + return destination_sdk_pb2.AlterTableResponse(success=True) + + def Truncate(self, request, context): + print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(request.soft)) + return destination_sdk_pb2.TruncateResponse(success=True) + + def WriteBatch(self, request, context): + for replace_file in request.replace_files: + print("replace files: " + str(replace_file)) + for update_file in request.update_files: + print("replace files: " + str(update_file)) + for delete_file in request.delete_files: + print("replace files: " + str(delete_file)) + + print("Data loading started for table " + request.table.name) + for key, value in request.keys.items(): + print("----------------------------------------------------------------------------") + print("Decrypting and printing file :" + str(key)) + print("----------------------------------------------------------------------------") + read_csv.decrypt_file(key, value) + print("\nData loading completed for table " + request.table.name + "\n") + + res: destination_sdk_pb2.WriteBatchResponse = destination_sdk_pb2.WriteBatchResponse(success=True) + return res + + def DescribeTable(self, request, context): + column1 = common_pb2.Column(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True) + column2 = common_pb2.Column(name="a2", type=common_pb2.DataType.DOUBLE) + table: common_pb2.Table = common_pb2.Table(name=request.table_name, columns=[column1, column2]) + return destination_sdk_pb2.DescribeTableResponse(not_found=False, table=table) + + +if __name__ == '__main__': + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + server.add_insecure_port('[::]:50052') + destination_sdk_pb2_grpc.add_DestinationServicer_to_server(DestinationImpl(), server) + server.start() + print("Destination gRPC server started...") + server.wait_for_termination() + print("Destination gRPC server terminated...") diff --git a/examples/destination/python/read_csv.py b/examples/destination/python/read_csv.py new file mode 100644 index 0000000..56a9216 --- /dev/null +++ b/examples/destination/python/read_csv.py @@ -0,0 +1,32 @@ +from zstandard import ZstdDecompressor +from Crypto.Cipher import AES +import csv + + +# AES decryption function +def aes_decrypt(key, ciphertext): + cipher = AES.new(key, AES.MODE_CBC, iv=ciphertext[:AES.block_size]) + plaintext = cipher.decrypt(ciphertext[AES.block_size:]) + return plaintext.rstrip(b'\0') + + +# Zstandard decompression function +def zstd_decompress(compressed_data): + decompressor = ZstdDecompressor() + decompressed_data = decompressor.decompressobj().decompress(compressed_data) + return decompressed_data + + +# Read the encrypted and compressed data +def decrypt_file(input_file_path, value): + with open(input_file_path, 'rb') as file: + encrypted_and_compressed_data = file.read() + decrypted_data = aes_decrypt(value, encrypted_and_compressed_data) + decompressed_data = zstd_decompress(decrypted_data) + csv_data = decompressed_data.decode('utf-8') + csv_reader = csv.reader(csv_data.splitlines()) + headers = next(csv_reader) + print(f"{' | '.join(headers)}") + print('-' * (len(headers) * 15)) + for row in csv_reader: + print(f"{' | '.join(row)}") diff --git a/examples/destination/python/requirements.txt b/examples/destination/python/requirements.txt new file mode 100644 index 0000000..9e5b8e3 --- /dev/null +++ b/examples/destination/python/requirements.txt @@ -0,0 +1,8 @@ +grpcio==1.60.1 +grpcio-tools==1.60.1 +protobuf==4.25.3 +google~=3.0.0 +pip~=23.0.1 +setuptools~=65.5.0 +zstandard~=0.22.0 +pycryptodome==3.19.1 \ No newline at end of file diff --git a/examples/destination/python/run.sh b/examples/destination/python/run.sh new file mode 100755 index 0000000..d90200f --- /dev/null +++ b/examples/destination/python/run.sh @@ -0,0 +1,3 @@ +source destination_run/bin/activate +python main.py +deactivate \ No newline at end of file