Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(partner-sdk): added python connector example #29

Merged
merged 16 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ examples/**/*pb2.py
examples/**/*pb2.pyi
examples/**/*pb2_grpc.py
destination_run/
connector_run/
**/__pycache__/
15 changes: 15 additions & 0 deletions examples/connector/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Python Connector Example

## Pre-requisites
- Python 3.9 or later

## Steps
- Run the build.sh file to copy protos, install python dependencies in virtual environment
```commandline
sh build.sh
```

- Execute `run.sh` to run the connector
```commandline
sh run.sh
```
18 changes: 18 additions & 0 deletions examples/connector/python/build.sh
ediril marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
# Creates virtual environment to install the packages and to run the connector.
python3 -m venv connector_run
manjutapali marked this conversation as resolved.
Show resolved Hide resolved
source connector_run/bin/activate
# install the added packages
pip install -r requirements.txt

# copying protos present in the root of directory to `protos` folder
mkdir -p protos
cp ../../../*.proto protos/
# Generates the required gRPC Python files using protos into `sdk_pb2` folder
mkdir -p sdk_pb2
python -m grpc_tools.protoc \
--proto_path=./protos/ \
--python_out=sdk_pb2 \
--pyi_out=sdk_pb2 \
--grpc_python_out=sdk_pb2 protos/*.proto
deactivate
155 changes: 155 additions & 0 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import grpc
from concurrent import futures
import json
import sys
sys.path.append('sdk_pb2')

from sdk_pb2 import connector_sdk_pb2_grpc
from sdk_pb2 import common_pb2
from sdk_pb2 import connector_sdk_pb2


class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
form_fields.fields.add(name="password", label="User Password", required=True, text_field=common_pb2.TextField.Password)

form_fields.fields.add(
name="region",
label="AWS Region",
required=False,
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])
)

form_fields.fields.add(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)
form_fields.fields.add(name="isPublic", label="Public?", description="Is this public?", toggle_field=common_pb2.ToggleField())

# add setup tests
form_fields.tests.add(name="connection_test", label="Tests connection")

return form_fields

def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)
return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
table_list = common_pb2.TableList()
t1 = table_list.tables.add(name="table1")
t1.columns.add(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
t1.columns.add(name="a2", type=common_pb2.DataType.DOUBLE)

t2 = table_list.tables.add(name="table2")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example doesn't use table2. Can you add an upsert for it so it gets a row in the destination?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from the java connector example, even it does not use it table2. I'll add simple upsert operation for it.

t2.columns.add(name="b1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
t2.columns.add(name="b2", type=common_pb2.DataType.UNSPECIFIED)

return connector_sdk_pb2.SchemaResponse(without_schema=table_list)

def Update(self, request, context):

state_json = "{}"
if request.HasField('state_json'):
state_json = request.state_json

state = json.loads(state_json)
if state.get("cursor") is None:
state["cursor"] = 0

# -- Send UPSERT records
for t in range(0, 3):
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-" + str(t)

val2 = common_pb2.ValueType()
val2.double = t * 0.234

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPSERT
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)
state["cursor"] += 1

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)

# -- Send UPSERT record for table2
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "b1"
val2 = common_pb2.ValueType()
val2.string = "ben"
record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPSERT
record.table_name = "table2"
record.data["b1"].CopyFrom(val1)
record.data["b2"].CopyFrom(val2)
state["cursor"] += 1

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)

# -- Send UPDATE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-0"

val2 = common_pb2.ValueType()
val2.double = 110.234

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.UPDATE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
val1.string = "a-2"

record = connector_sdk_pb2.Record()
record.type = common_pb2.OpType.DELETE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)

operation.record.CopyFrom(record)
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

checkpoint = connector_sdk_pb2.Checkpoint()
checkpoint.state_json = json.dumps(state)
checkpoint_operation = connector_sdk_pb2.Operation()
checkpoint_operation.checkpoint.CopyFrom(checkpoint)
yield connector_sdk_pb2.UpdateResponse(operation=checkpoint_operation)

log = connector_sdk_pb2.LogEntry()
log.level = connector_sdk_pb2.LogLevel.INFO
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
connector_sdk_pb2_grpc.add_ConnectorServicer_to_server(ConnectorService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started...")
server.wait_for_termination()
print("Server terminated.")


if __name__ == '__main__':
print("Starting the server...")
start_server()
2 changes: 2 additions & 0 deletions examples/connector/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
grpcio==1.60.1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should match the versions we are showing in the development guide. Can we not use 1.61.1?

Copy link
Collaborator Author

@manjutapali manjutapali Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no 1.61.1
Screenshot 2024-04-15 at 10 37 25 AM

grpcio-tools==1.60.1
4 changes: 4 additions & 0 deletions examples/connector/python/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# activate the created virtual environment during the build and run the main file.
source connector_run/bin/activate
python main.py
deactivate
Loading