Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(partner_sdk): fix python destination examples as per updated proto files #34

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions destination_sdk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ message AlterTableRequest {
map<string, string> configuration = 1;
string schema_name = 2;
string table_name = 3;
repeated SchemaChange changes = 4;
repeated SchemaDiff changes = 4;
}

message SchemaChange {
message SchemaDiff {
oneof change {
Column add_column = 1;
ChangeType change_column_type = 2;
Expand Down
78 changes: 60 additions & 18 deletions examples/destination/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,35 @@
import grpc
import read_csv
import sys

sys.path.append('sdk_pb2')

from sdk_pb2 import destination_sdk_pb2
from sdk_pb2 import common_pb2
from sdk_pb2 import destination_sdk_pb2_grpc


class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer):
class DestinationImpl(destination_sdk_pb2_grpc.DestinationConnectorServicer):
def ConfigurationForm(self, request, context):

host = common_pb2.FormField(name="host", label="Host", required=True,
text_field=common_pb2.TextField.PlainText)
password = common_pb2.FormField(name="password", label="Password", required=True,
text_field=common_pb2.TextField.Password)
region = common_pb2.FormField(name="region", label="AWS Region", required=False,
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"]))
hidden = common_pb2.FormField(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)
is_public = common_pb2.FormField(name="isPublic", label="Public?", description="Is this public?",
toggle_field=common_pb2.ToggleField())
host = common_pb2.FormField(
single=common_pb2.Field(name="host", label="Host", required=True, placeholder="my.example.host",
text_field=common_pb2.TextField.PlainText))

password = common_pb2.FormField(
single=common_pb2.Field(name="password", label="Password", required=True, placeholder="my_password",
text_field=common_pb2.TextField.Password))

region = common_pb2.FormField(
single=common_pb2.Field(name="region", label="AWS Region", required=False, default_value="US-EAST",
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])))

hidden = common_pb2.FormField(
common_pb2.Field(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden))

is_public = common_pb2.FormField(
single=common_pb2.Field(name="isPublic", label="Public?", description="Is this public?",
toggle_field=common_pb2.ToggleField()))

connect_test = common_pb2.ConfigurationTest(name="connect", label="Tests connection")
select_test = common_pb2.ConfigurationTest(name="select", label="Tests selection")
Expand All @@ -30,26 +40,58 @@ def ConfigurationForm(self, request, context):
fields=[host, password, region, hidden,
is_public],
tests=[connect_test, select_test]

)
)

def Test(self, request, context):
test_name = request.name
print("test name: " + test_name)
return common_pb2.TestResponse(success=True)

def CreateTable(self, request, context):
print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(
request.table.columns))
return destination_sdk_pb2.CreateTableResponse(success=True)

def AlterTable(self, request, context):
res: destination_sdk_pb2.AlterTableResponse

print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
changes_list = request.changes
change_strings = [str(change) for change in changes_list]
result = ", ".join(change_strings)
print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table_name) + " | " + str(
result))
return destination_sdk_pb2.AlterTableResponse(success=True)

def Capabilities(self, request, context):
destination_map_to_1 = destination_sdk_pb2.DestinationType(name="VARCHAR",
map_to=destination_sdk_pb2.DestinationType.map_to.STRING)
data_type_mapping_1 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.STRING,
map_to=destination_map_to_1)

destination_map_to_2 = destination_sdk_pb2.DestinationType(name="NUMBER",
map_to=destination_sdk_pb2.DestinationType.map_to.INT)
data_type_mapping_2 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.FLOAT,
map_to=destination_map_to_2)

destination_map_to_3 = destination_sdk_pb2.DestinationType(name="DATE",
map_to=destination_sdk_pb2.DestinationType.map_to.UTC_DATETIME)
data_type_mapping_3 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.UTC_DATETIME,
map_to=destination_map_to_3)

destination_map_to_4 = destination_sdk_pb2.DestinationType(name="BLOB",
map_to=destination_sdk_pb2.DestinationType.map_to.BINARY)
data_type_mapping_4 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.BINARY,
map_to=destination_map_to_4)

return destination_sdk_pb2.CapabilitiesResponse(
data_type_mappings=[data_type_mapping_1, data_type_mapping_2, data_type_mapping_3, data_type_mapping_4],
supports_history_mode=True)

def Truncate(self, request, context):
print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(request.soft))
print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(
request.soft))
return destination_sdk_pb2.TruncateResponse(success=True)

def WriteBatch(self, request, context):
Expand Down Expand Up @@ -81,7 +123,7 @@ def DescribeTable(self, request, context):
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
server.add_insecure_port('[::]:50052')
destination_sdk_pb2_grpc.add_DestinationServicer_to_server(DestinationImpl(), server)
destination_sdk_pb2_grpc.add_DestinationConnectorServicer_to_server(DestinationImpl(), server)
server.start()
print("Destination gRPC server started...")
server.wait_for_termination()
Expand Down
Loading