Skip to content

Commit

Permalink
feature(partner-sdk): added python destination example (#30)
Browse files Browse the repository at this point in the history
* destination tester

* deleted duplicate requirement file

* refactor

* refactor based on comments

* deleted destination folder

* change readme

* changed mode and added comments

* changed grpc version in requirements
  • Loading branch information
fivetran-abdulsalam authored Apr 16, 2024
1 parent 71bd89f commit 44f8fb2
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
bin/*
examples/**/*.proto
examples/**/*.pb.go
examples/**/*pb2.py
examples/**/*pb2.pyi
examples/**/*pb2_grpc.py
destination_run/
**/__pycache__/
15 changes: 15 additions & 0 deletions examples/destination/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Python Destination Example

## Pre-requisites
- Python 3.9 or later

## Steps
- Run the build.sh file to copy protos, install python dependencies in virtual environment
```commandline
sh build.sh
```

- Execute `run.sh` to run the connector
```commandline
sh run.sh
```
29 changes: 29 additions & 0 deletions examples/destination/python/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

#Create virtual environment
python3 -m venv destination_run

#Activate virtual environment
source destination_run/bin/activate

# Make a directory protos
mkdir -p protos

# Copy proto files t oprotos directory
cp ../../../*.proto protos/

# Install the required packages
pip install -r requirements.txt

# Make a directory sdk_pb2
mkdir -p sdk_pb2

# Generate grpc python code and store it in sdk_pb2
python -m grpc_tools.protoc \
--proto_path=./protos/ \
--python_out=sdk_pb2 \
--pyi_out=sdk_pb2 \
--grpc_python_out=sdk_pb2 protos/*.proto

# Deactivate virtual environment
deactivate
88 changes: 88 additions & 0 deletions examples/destination/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from concurrent import futures
import grpc
import read_csv
import sys
sys.path.append('sdk_pb2')

from sdk_pb2 import destination_sdk_pb2
from sdk_pb2 import common_pb2
from sdk_pb2 import destination_sdk_pb2_grpc


class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer):
def ConfigurationForm(self, request, context):

host = common_pb2.FormField(name="host", label="Host", required=True,
text_field=common_pb2.TextField.PlainText)
password = common_pb2.FormField(name="password", label="Password", required=True,
text_field=common_pb2.TextField.Password)
region = common_pb2.FormField(name="region", label="AWS Region", required=False,
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"]))
hidden = common_pb2.FormField(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)
is_public = common_pb2.FormField(name="isPublic", label="Public?", description="Is this public?",
toggle_field=common_pb2.ToggleField())

connect_test = common_pb2.ConfigurationTest(name="connect", label="Tests connection")
select_test = common_pb2.ConfigurationTest(name="select", label="Tests selection")
return common_pb2.ConfigurationFormResponse(
schema_selection_supported=True,
table_selection_supported=True,
fields=[host, password, region, hidden,
is_public],
tests=[connect_test, select_test]

)

def Test(self, request, context):
test_name = request.name
print("test name: " + test_name)
return common_pb2.TestResponse(success=True)

def CreateTable(self, request, context):
print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
return destination_sdk_pb2.CreateTableResponse(success=True)

def AlterTable(self, request, context):
res: destination_sdk_pb2.AlterTableResponse

print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
return destination_sdk_pb2.AlterTableResponse(success=True)

def Truncate(self, request, context):
print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(request.soft))
return destination_sdk_pb2.TruncateResponse(success=True)

def WriteBatch(self, request, context):
for replace_file in request.replace_files:
print("replace files: " + str(replace_file))
for update_file in request.update_files:
print("replace files: " + str(update_file))
for delete_file in request.delete_files:
print("replace files: " + str(delete_file))

print("Data loading started for table " + request.table.name)
for key, value in request.keys.items():
print("----------------------------------------------------------------------------")
print("Decrypting and printing file :" + str(key))
print("----------------------------------------------------------------------------")
read_csv.decrypt_file(key, value)
print("\nData loading completed for table " + request.table.name + "\n")

res: destination_sdk_pb2.WriteBatchResponse = destination_sdk_pb2.WriteBatchResponse(success=True)
return res

def DescribeTable(self, request, context):
column1 = common_pb2.Column(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
column2 = common_pb2.Column(name="a2", type=common_pb2.DataType.DOUBLE)
table: common_pb2.Table = common_pb2.Table(name=request.table_name, columns=[column1, column2])
return destination_sdk_pb2.DescribeTableResponse(not_found=False, table=table)


if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
server.add_insecure_port('[::]:50052')
destination_sdk_pb2_grpc.add_DestinationServicer_to_server(DestinationImpl(), server)
server.start()
print("Destination gRPC server started...")
server.wait_for_termination()
print("Destination gRPC server terminated...")
32 changes: 32 additions & 0 deletions examples/destination/python/read_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from zstandard import ZstdDecompressor
from Crypto.Cipher import AES
import csv


# AES decryption function
def aes_decrypt(key, ciphertext):
cipher = AES.new(key, AES.MODE_CBC, iv=ciphertext[:AES.block_size])
plaintext = cipher.decrypt(ciphertext[AES.block_size:])
return plaintext.rstrip(b'\0')


# Zstandard decompression function
def zstd_decompress(compressed_data):
decompressor = ZstdDecompressor()
decompressed_data = decompressor.decompressobj().decompress(compressed_data)
return decompressed_data


# Read the encrypted and compressed data
def decrypt_file(input_file_path, value):
with open(input_file_path, 'rb') as file:
encrypted_and_compressed_data = file.read()
decrypted_data = aes_decrypt(value, encrypted_and_compressed_data)
decompressed_data = zstd_decompress(decrypted_data)
csv_data = decompressed_data.decode('utf-8')
csv_reader = csv.reader(csv_data.splitlines())
headers = next(csv_reader)
print(f"{' | '.join(headers)}")
print('-' * (len(headers) * 15))
for row in csv_reader:
print(f"{' | '.join(row)}")
8 changes: 8 additions & 0 deletions examples/destination/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
grpcio==1.60.1
grpcio-tools==1.60.1
protobuf==4.25.3
google~=3.0.0
pip~=23.0.1
setuptools~=65.5.0
zstandard~=0.22.0
pycryptodome==3.19.1
3 changes: 3 additions & 0 deletions examples/destination/python/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
source destination_run/bin/activate
python main.py
deactivate

0 comments on commit 44f8fb2

Please sign in to comment.