From 7387dbc3e0aeb3ff0d5eaeb6af5fd3b96a204c2b Mon Sep 17 00:00:00 2001 From: Manjunath Tapali Date: Mon, 13 May 2024 13:01:03 +0530 Subject: [PATCH] feature(partner-sdk): added python connector example (#29) * feature(partner-sdk): added new python example * updated the connector code * refactored the code * refactored the code and added state, log entry * updated the setup form * fix(python-example): refactored imports, form fields * fix(python-example): updated the tables with kwarg * fix(python-example): updated the test * fix(python-example): refactored dropdown field code * fix(python-example): add minor formatting changes * fix(python-example): refomatted folder structure * fix(python-example): addressed requested changes * fix(partner-sdk): updated the build, run script file permission and added detailed steps * fix(partner-sdk): updated the column type for table * fix(partner-sdk): rebased with main, fixed gitignore * fix(partner-sdk): ignored pyenv files, added upsert operation for table2 --------- Co-authored-by: Niket Khandelwal --- .gitignore | 1 + examples/connector/python/README.md | 15 ++ examples/connector/python/build.sh | 18 +++ examples/connector/python/main.py | 155 +++++++++++++++++++++ examples/connector/python/requirements.txt | 2 + examples/connector/python/run.sh | 4 + 6 files changed, 195 insertions(+) create mode 100644 examples/connector/python/README.md create mode 100755 examples/connector/python/build.sh create mode 100644 examples/connector/python/main.py create mode 100644 examples/connector/python/requirements.txt create mode 100755 examples/connector/python/run.sh diff --git a/.gitignore b/.gitignore index 99cdbb1..4f5d745 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ examples/**/*pb2.py examples/**/*pb2.pyi examples/**/*pb2_grpc.py destination_run/ +connector_run/ **/__pycache__/ \ No newline at end of file diff --git a/examples/connector/python/README.md b/examples/connector/python/README.md new file mode 100644 index 0000000..f6f9132 --- /dev/null +++ b/examples/connector/python/README.md @@ -0,0 +1,15 @@ +# Python Connector Example + +## Pre-requisites +- Python 3.9 or later + +## Steps +- Run the build.sh file to copy protos, install python dependencies in virtual environment +```commandline +sh build.sh +``` + +- Execute `run.sh` to run the connector +```commandline +sh run.sh +``` \ No newline at end of file diff --git a/examples/connector/python/build.sh b/examples/connector/python/build.sh new file mode 100755 index 0000000..471ab46 --- /dev/null +++ b/examples/connector/python/build.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Creates virtual environment to install the packages and to run the connector. +python3 -m venv connector_run +source connector_run/bin/activate +# install the added packages +pip install -r requirements.txt + +# copying protos present in the root of directory to `protos` folder +mkdir -p protos +cp ../../../*.proto protos/ +# Generates the required gRPC Python files using protos into `sdk_pb2` folder +mkdir -p sdk_pb2 +python -m grpc_tools.protoc \ + --proto_path=./protos/ \ + --python_out=sdk_pb2 \ + --pyi_out=sdk_pb2 \ + --grpc_python_out=sdk_pb2 protos/*.proto +deactivate \ No newline at end of file diff --git a/examples/connector/python/main.py b/examples/connector/python/main.py new file mode 100644 index 0000000..ddb9016 --- /dev/null +++ b/examples/connector/python/main.py @@ -0,0 +1,155 @@ +import grpc +from concurrent import futures +import json +import sys +sys.path.append('sdk_pb2') + +from sdk_pb2 import connector_sdk_pb2_grpc +from sdk_pb2 import common_pb2 +from sdk_pb2 import connector_sdk_pb2 + + +class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer): + def ConfigurationForm(self, request, context): + form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True, + table_selection_supported=True) + form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText) + form_fields.fields.add(name="password", label="User Password", required=True, text_field=common_pb2.TextField.Password) + + form_fields.fields.add( + name="region", + label="AWS Region", + required=False, + dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"]) + ) + + form_fields.fields.add(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden) + form_fields.fields.add(name="isPublic", label="Public?", description="Is this public?", toggle_field=common_pb2.ToggleField()) + + # add setup tests + form_fields.tests.add(name="connection_test", label="Tests connection") + + return form_fields + + def Test(self, request, context): + configuration = request.configuration + # Name of the test to be run + test_name = request.name + print("Configuration: ", configuration) + print("Test name: ", test_name) + return common_pb2.TestResponse(success=True) + + def Schema(self, request, context): + table_list = common_pb2.TableList() + t1 = table_list.tables.add(name="table1") + t1.columns.add(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True) + t1.columns.add(name="a2", type=common_pb2.DataType.DOUBLE) + + t2 = table_list.tables.add(name="table2") + t2.columns.add(name="b1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True) + t2.columns.add(name="b2", type=common_pb2.DataType.UNSPECIFIED) + + return connector_sdk_pb2.SchemaResponse(without_schema=table_list) + + def Update(self, request, context): + + state_json = "{}" + if request.HasField('state_json'): + state_json = request.state_json + + state = json.loads(state_json) + if state.get("cursor") is None: + state["cursor"] = 0 + + # -- Send UPSERT records + for t in range(0, 3): + operation = connector_sdk_pb2.Operation() + val1 = common_pb2.ValueType() + val1.string = "a-" + str(t) + + val2 = common_pb2.ValueType() + val2.double = t * 0.234 + + record = connector_sdk_pb2.Record() + record.type = common_pb2.OpType.UPSERT + record.table_name = "table1" + record.data["a1"].CopyFrom(val1) + record.data["a2"].CopyFrom(val2) + state["cursor"] += 1 + + operation.record.CopyFrom(record) + yield connector_sdk_pb2.UpdateResponse(operation=operation) + + # -- Send UPSERT record for table2 + operation = connector_sdk_pb2.Operation() + val1 = common_pb2.ValueType() + val1.string = "b1" + val2 = common_pb2.ValueType() + val2.string = "ben" + record = connector_sdk_pb2.Record() + record.type = common_pb2.OpType.UPSERT + record.table_name = "table2" + record.data["b1"].CopyFrom(val1) + record.data["b2"].CopyFrom(val2) + state["cursor"] += 1 + + operation.record.CopyFrom(record) + yield connector_sdk_pb2.UpdateResponse(operation=operation) + + # -- Send UPDATE record + operation = connector_sdk_pb2.Operation() + val1 = common_pb2.ValueType() + val1.string = "a-0" + + val2 = common_pb2.ValueType() + val2.double = 110.234 + + record = connector_sdk_pb2.Record() + record.type = common_pb2.OpType.UPDATE + record.table_name = "table1" + record.data["a1"].CopyFrom(val1) + record.data["a2"].CopyFrom(val2) + + operation.record.CopyFrom(record) + yield connector_sdk_pb2.UpdateResponse(operation=operation) + state["cursor"] += 1 + + # -- Send DELETE record + operation = connector_sdk_pb2.Operation() + val1 = common_pb2.ValueType() + val1.string = "a-2" + + record = connector_sdk_pb2.Record() + record.type = common_pb2.OpType.DELETE + record.table_name = "table1" + record.data["a1"].CopyFrom(val1) + + operation.record.CopyFrom(record) + yield connector_sdk_pb2.UpdateResponse(operation=operation) + state["cursor"] += 1 + + checkpoint = connector_sdk_pb2.Checkpoint() + checkpoint.state_json = json.dumps(state) + checkpoint_operation = connector_sdk_pb2.Operation() + checkpoint_operation.checkpoint.CopyFrom(checkpoint) + yield connector_sdk_pb2.UpdateResponse(operation=checkpoint_operation) + + log = connector_sdk_pb2.LogEntry() + log.level = connector_sdk_pb2.LogLevel.INFO + log.message = "Sync Done" + yield connector_sdk_pb2.UpdateResponse(log_entry=log) + + +def start_server(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + connector_sdk_pb2_grpc.add_ConnectorServicer_to_server(ConnectorService(), server) + server.add_insecure_port('[::]:50051') + server.start() + print("Server started...") + server.wait_for_termination() + print("Server terminated.") + + +if __name__ == '__main__': + print("Starting the server...") + start_server() diff --git a/examples/connector/python/requirements.txt b/examples/connector/python/requirements.txt new file mode 100644 index 0000000..4a290c6 --- /dev/null +++ b/examples/connector/python/requirements.txt @@ -0,0 +1,2 @@ +grpcio==1.60.1 +grpcio-tools==1.60.1 diff --git a/examples/connector/python/run.sh b/examples/connector/python/run.sh new file mode 100755 index 0000000..04fd809 --- /dev/null +++ b/examples/connector/python/run.sh @@ -0,0 +1,4 @@ +# activate the created virtual environment during the build and run the main file. +source connector_run/bin/activate +python main.py +deactivate