From 7c3b948cc2410b6381515f07407d7b0a8ddd5c2c Mon Sep 17 00:00:00 2001 From: Abdul Salam <104752966+fivetran-abdulsalam@users.noreply.github.com> Date: Tue, 7 May 2024 17:53:43 +0530 Subject: [PATCH] feature(partner_sdk): fix python destination examples as per updated proto files (#34) * fix python destination example * fixed tester issue --- destination_sdk.proto | 4 +- examples/destination/python/main.py | 78 ++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/destination_sdk.proto b/destination_sdk.proto index c8e6c88..451eb8f 100644 --- a/destination_sdk.proto +++ b/destination_sdk.proto @@ -73,10 +73,10 @@ message AlterTableRequest { map configuration = 1; string schema_name = 2; string table_name = 3; - repeated SchemaChange changes = 4; + repeated SchemaDiff changes = 4; } -message SchemaChange { +message SchemaDiff { oneof change { Column add_column = 1; ChangeType change_column_type = 2; diff --git a/examples/destination/python/main.py b/examples/destination/python/main.py index f222f6a..5f1a52b 100644 --- a/examples/destination/python/main.py +++ b/examples/destination/python/main.py @@ -2,6 +2,7 @@ import grpc import read_csv import sys + sys.path.append('sdk_pb2') from sdk_pb2 import destination_sdk_pb2 @@ -9,18 +10,27 @@ from sdk_pb2 import destination_sdk_pb2_grpc -class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer): +class DestinationImpl(destination_sdk_pb2_grpc.DestinationConnectorServicer): def ConfigurationForm(self, request, context): - host = common_pb2.FormField(name="host", label="Host", required=True, - text_field=common_pb2.TextField.PlainText) - password = common_pb2.FormField(name="password", label="Password", required=True, - text_field=common_pb2.TextField.Password) - region = common_pb2.FormField(name="region", label="AWS Region", required=False, - dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])) - hidden = common_pb2.FormField(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden) - is_public = common_pb2.FormField(name="isPublic", label="Public?", description="Is this public?", - toggle_field=common_pb2.ToggleField()) + host = common_pb2.FormField( + single=common_pb2.Field(name="host", label="Host", required=True, placeholder="my.example.host", + text_field=common_pb2.TextField.PlainText)) + + password = common_pb2.FormField( + single=common_pb2.Field(name="password", label="Password", required=True, placeholder="my_password", + text_field=common_pb2.TextField.Password)) + + region = common_pb2.FormField( + single=common_pb2.Field(name="region", label="AWS Region", required=False, default_value="US-EAST", + dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"]))) + + hidden = common_pb2.FormField( + single=common_pb2.Field(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)) + + is_public = common_pb2.FormField( + single=common_pb2.Field(name="isPublic", label="Public?", description="Is this public?", + toggle_field=common_pb2.ToggleField())) connect_test = common_pb2.ConfigurationTest(name="connect", label="Tests connection") select_test = common_pb2.ConfigurationTest(name="select", label="Tests selection") @@ -30,8 +40,7 @@ def ConfigurationForm(self, request, context): fields=[host, password, region, hidden, is_public], tests=[connect_test, select_test] - - ) + ) def Test(self, request, context): test_name = request.name @@ -39,17 +48,50 @@ def Test(self, request, context): return common_pb2.TestResponse(success=True) def CreateTable(self, request, context): - print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns)) + print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str( + request.table.columns)) return destination_sdk_pb2.CreateTableResponse(success=True) def AlterTable(self, request, context): - res: destination_sdk_pb2.AlterTableResponse - - print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns)) + changes_list = request.changes + change_strings = [str(change) for change in changes_list] + result = ", ".join(change_strings) + print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table_name) + " | " + str( + result)) return destination_sdk_pb2.AlterTableResponse(success=True) + def Capabilities(self, request, context): + destination_map_to_1 = destination_sdk_pb2.DestinationType(name="VARCHAR", + map_to=common_pb2.DataType.STRING) + data_type_mapping_1 = destination_sdk_pb2.DataTypeMappingEntry( + fivetran_type=common_pb2.DataType.STRING, + map_to=destination_map_to_1) + + destination_map_to_2 = destination_sdk_pb2.DestinationType(name="NUMBER", + map_to=common_pb2.DataType.INT) + data_type_mapping_2 = destination_sdk_pb2.DataTypeMappingEntry( + fivetran_type=common_pb2.DataType.FLOAT, + map_to=destination_map_to_2) + + destination_map_to_3 = destination_sdk_pb2.DestinationType(name="DATE", + map_to=common_pb2.DataType.UTC_DATETIME) + data_type_mapping_3 = destination_sdk_pb2.DataTypeMappingEntry( + fivetran_type=common_pb2.DataType.UTC_DATETIME, + map_to=destination_map_to_3) + + destination_map_to_4 = destination_sdk_pb2.DestinationType(name="BLOB", + map_to=common_pb2.DataType.BINARY) + data_type_mapping_4 = destination_sdk_pb2.DataTypeMappingEntry( + fivetran_type=common_pb2.DataType.BINARY, + map_to=destination_map_to_4) + + return destination_sdk_pb2.CapabilitiesResponse( + data_type_mappings=[data_type_mapping_1, data_type_mapping_2, data_type_mapping_3, data_type_mapping_4], + supports_history_mode=True) + def Truncate(self, request, context): - print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(request.soft)) + print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str( + request.soft)) return destination_sdk_pb2.TruncateResponse(success=True) def WriteBatch(self, request, context): @@ -81,7 +123,7 @@ def DescribeTable(self, request, context): if __name__ == '__main__': server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) server.add_insecure_port('[::]:50052') - destination_sdk_pb2_grpc.add_DestinationServicer_to_server(DestinationImpl(), server) + destination_sdk_pb2_grpc.add_DestinationConnectorServicer_to_server(DestinationImpl(), server) server.start() print("Destination gRPC server started...") server.wait_for_termination()