Skip to content

Commit

Permalink
feature(partner_sdk): fix python destination examples as per updated …
Browse files Browse the repository at this point in the history
…proto files (#34)

* fix python destination example

* fixed tester issue
  • Loading branch information
fivetran-abdulsalam authored and manjutapali committed May 13, 2024
1 parent cf5a305 commit 8b9df55
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
4 changes: 2 additions & 2 deletions destination_sdk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ message AlterTableRequest {
map<string, string> configuration = 1;
string schema_name = 2;
string table_name = 3;
repeated SchemaChange changes = 4;
repeated SchemaDiff changes = 4;
}

message SchemaChange {
message SchemaDiff {
oneof change {
Column add_column = 1;
ChangeType change_column_type = 2;
Expand Down
78 changes: 60 additions & 18 deletions examples/destination/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,35 @@
import grpc
import read_csv
import sys

sys.path.append('sdk_pb2')

from sdk_pb2 import destination_sdk_pb2
from sdk_pb2 import common_pb2
from sdk_pb2 import destination_sdk_pb2_grpc


class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer):
class DestinationImpl(destination_sdk_pb2_grpc.DestinationConnectorServicer):
def ConfigurationForm(self, request, context):

host = common_pb2.FormField(name="host", label="Host", required=True,
text_field=common_pb2.TextField.PlainText)
password = common_pb2.FormField(name="password", label="Password", required=True,
text_field=common_pb2.TextField.Password)
region = common_pb2.FormField(name="region", label="AWS Region", required=False,
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"]))
hidden = common_pb2.FormField(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden)
is_public = common_pb2.FormField(name="isPublic", label="Public?", description="Is this public?",
toggle_field=common_pb2.ToggleField())
host = common_pb2.FormField(
single=common_pb2.Field(name="host", label="Host", required=True, placeholder="my.example.host",
text_field=common_pb2.TextField.PlainText))

password = common_pb2.FormField(
single=common_pb2.Field(name="password", label="Password", required=True, placeholder="my_password",
text_field=common_pb2.TextField.Password))

region = common_pb2.FormField(
single=common_pb2.Field(name="region", label="AWS Region", required=False, default_value="US-EAST",
dropdown_field=common_pb2.DropdownField(dropdown_field=["US-EAST", "US-WEST"])))

hidden = common_pb2.FormField(
single=common_pb2.Field(name="hidden", label="my-hidden-value", text_field=common_pb2.TextField.Hidden))

is_public = common_pb2.FormField(
single=common_pb2.Field(name="isPublic", label="Public?", description="Is this public?",
toggle_field=common_pb2.ToggleField()))

connect_test = common_pb2.ConfigurationTest(name="connect", label="Tests connection")
select_test = common_pb2.ConfigurationTest(name="select", label="Tests selection")
Expand All @@ -30,26 +40,58 @@ def ConfigurationForm(self, request, context):
fields=[host, password, region, hidden,
is_public],
tests=[connect_test, select_test]

)
)

def Test(self, request, context):
test_name = request.name
print("test name: " + test_name)
return common_pb2.TestResponse(success=True)

def CreateTable(self, request, context):
print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
print("[CreateTable] :" + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(
request.table.columns))
return destination_sdk_pb2.CreateTableResponse(success=True)

def AlterTable(self, request, context):
res: destination_sdk_pb2.AlterTableResponse

print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table.name) + " | " + str(request.table.columns))
changes_list = request.changes
change_strings = [str(change) for change in changes_list]
result = ", ".join(change_strings)
print("[AlterTable]: " + str(request.schema_name) + " | " + str(request.table_name) + " | " + str(
result))
return destination_sdk_pb2.AlterTableResponse(success=True)

def Capabilities(self, request, context):
destination_map_to_1 = destination_sdk_pb2.DestinationType(name="VARCHAR",
map_to=common_pb2.DataType.STRING)
data_type_mapping_1 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.STRING,
map_to=destination_map_to_1)

destination_map_to_2 = destination_sdk_pb2.DestinationType(name="NUMBER",
map_to=common_pb2.DataType.INT)
data_type_mapping_2 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.FLOAT,
map_to=destination_map_to_2)

destination_map_to_3 = destination_sdk_pb2.DestinationType(name="DATE",
map_to=common_pb2.DataType.UTC_DATETIME)
data_type_mapping_3 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.UTC_DATETIME,
map_to=destination_map_to_3)

destination_map_to_4 = destination_sdk_pb2.DestinationType(name="BLOB",
map_to=common_pb2.DataType.BINARY)
data_type_mapping_4 = destination_sdk_pb2.DataTypeMappingEntry(
fivetran_type=common_pb2.DataType.BINARY,
map_to=destination_map_to_4)

return destination_sdk_pb2.CapabilitiesResponse(
data_type_mappings=[data_type_mapping_1, data_type_mapping_2, data_type_mapping_3, data_type_mapping_4],
supports_history_mode=True)

def Truncate(self, request, context):
print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(request.soft))
print("[TruncateTable]: " + str(request.schema_name) + " | " + str(request.schema_name) + " | soft" + str(
request.soft))
return destination_sdk_pb2.TruncateResponse(success=True)

def WriteBatch(self, request, context):
Expand Down Expand Up @@ -81,7 +123,7 @@ def DescribeTable(self, request, context):
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
server.add_insecure_port('[::]:50052')
destination_sdk_pb2_grpc.add_DestinationServicer_to_server(DestinationImpl(), server)
destination_sdk_pb2_grpc.add_DestinationConnectorServicer_to_server(DestinationImpl(), server)
server.start()
print("Destination gRPC server started...")
server.wait_for_termination()
Expand Down

0 comments on commit 8b9df55

Please sign in to comment.