Skip to content

Commit

Permalink
feature(partner-sdk): added python connector example (#29)
Browse files Browse the repository at this point in the history
* feature(partner-sdk): added new python example

* updated the connector code

* refactored the code

* refactored the code and added state, log entry

* updated the setup form

* fix(python-example): refactored imports, form fields

* fix(python-example): updated the tables with kwarg

* fix(python-example): updated the test

* fix(python-example): refactored dropdown field code

* fix(python-example): add minor formatting changes

* fix(python-example): refomatted folder structure

* fix(python-example): addressed requested changes

* fix(partner-sdk): updated the build, run script file permission and added detailed steps

* fix(partner-sdk): updated the column type for table

* fix(partner-sdk): rebased with main, fixed gitignore

* fix(partner-sdk): ignored pyenv files, added upsert operation for table2

---------

Co-authored-by: Niket Khandelwal <[email protected]>
  • Loading branch information
manjutapali and fivetran-niketkhandelwal authored May 13, 2024
1 parent 2425f72 commit 7387dbc
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ examples/**/*pb2.py
examples/**/*pb2.pyi
examples/**/*pb2_grpc.py
destination_run/
connector_run/
**/__pycache__/
15 changes: 15 additions & 0 deletions examples/connector/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Python Connector Example

## Pre-requisites
- Python 3.9 or later

## Steps
- Run the build.sh file to copy protos, install python dependencies in virtual environment
```commandline
sh build.sh
```

- Execute `run.sh` to run the connector
```commandline
sh run.sh
```
18 changes: 18 additions & 0 deletions examples/connector/python/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
# Creates virtual environment to install the packages and to run the connector.
python3 -m venv connector_run
source connector_run/bin/activate
# install the added packages
pip install -r requirements.txt

# copying protos present in the root of directory to `protos` folder
mkdir -p protos
cp ../../../*.proto protos/
# Generates the required gRPC Python files using protos into `sdk_pb2` folder
mkdir -p sdk_pb2
python -m grpc_tools.protoc \
--proto_path=./protos/ \
--python_out=sdk_pb2 \
--pyi_out=sdk_pb2 \
--grpc_python_out=sdk_pb2 protos/*.proto
deactivate
155 changes: 155 additions & 0 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import grpc
from concurrent import futures
import json
import sys
sys.path.append('sdk_pb2')

from sdk_pb2 import connector_sdk_pb2_grpc
from sdk_pb2 import common_pb2
from sdk_pb2 import connector_sdk_pb2


class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
form_fields.fields.add(name="password", label="User Password", required=True, text_field=common_pb2.TextField.Password)

form_fields.fields.add(
name="region",
label="AWS Region",
required=False,
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])
)

form_fields.fields.add(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)
form_fields.fields.add(name="isPublic", label="Public?", description="Is this public?", toggle_field=common_pb2.ToggleField())

# add setup tests
form_fields.tests.add(name="connection_test", label="Tests connection")

return form_fields

def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)
return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
table_list = common_pb2.TableList()
t1 = table_list.tables.add(name="table1")
t1.columns.add(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
t1.columns.add(name="a2", type=common_pb2.DataType.DOUBLE)

t2 = table_list.tables.add(name="table2")
t2.columns.add(name="b1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
t2.columns.add(name="b2", type=common_pb2.DataType.UNSPECIFIED)

return connector_sdk_pb2.SchemaResponse(without_schema=table_list)

def Update(self, request, context):

state_json = "{}"
if request.HasField('state_json'):
state_json = request.state_json

state = json.loads(state_json)
if state.get("cursor") is None:
state["cursor"] = 0

# -- Send UPSERT records
for t in range(0, 3):
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-" + str(t)

val2 = common_pb2.ValueType()
val2.double = t * 0.234

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPSERT
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)
state["cursor"] += 1

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)

# -- Send UPSERT record for table2
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "b1"
val2 = common_pb2.ValueType()
val2.string = "ben"
record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPSERT
record.table_name = "table2"
record.data["b1"].CopyFrom(val1)
record.data["b2"].CopyFrom(val2)
state["cursor"] += 1

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)

# -- Send UPDATE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-0"

val2 = common_pb2.ValueType()
val2.double = 110.234

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPDATE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-2"

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.DELETE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

checkpoint = connector_sdk_pb2.Checkpoint()
checkpoint.state_json = json.dumps(state)
checkpoint_operation = connector_sdk_pb2.Operation()
checkpoint_operation.checkpoint.CopyFrom(checkpoint)
yield connector_sdk_pb2.UpdateResponse(operation=checkpoint_operation)

log = connector_sdk_pb2.LogEntry()
log.level = connector_sdk_pb2.LogLevel.INFO
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
connector_sdk_pb2_grpc.add_ConnectorServicer_to_server(ConnectorService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started...")
server.wait_for_termination()
print("Server terminated.")


if __name__ == '__main__':
print("Starting the server...")
start_server()
2 changes: 2 additions & 0 deletions examples/connector/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
grpcio==1.60.1
grpcio-tools==1.60.1
4 changes: 4 additions & 0 deletions examples/connector/python/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# activate the created virtual environment during the build and run the main file.
source connector_run/bin/activate
python main.py
deactivate

0 comments on commit 7387dbc

Please sign in to comment.