Skip to content

Commit

Permalink
fix(partner-sdk): updated the go example with the new proto changes (#35
Browse files Browse the repository at this point in the history
)

* fix(partner-sdk): updated the go example with new proto changes

* fix(partner-sdk): re-formatted the code
  • Loading branch information
manjutapali committed Jun 18, 2024
1 parent aa705cc commit d876a8f
Showing 1 changed file with 88 additions and 88 deletions.
176 changes: 88 additions & 88 deletions examples/connector/golang/golang_connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type MyState struct {
}

type server struct {
pb.UnimplementedConnectorServer
pb.UnimplementedSourceConnectorServer
}

func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) error {
func (s *server) Update(in *pb.UpdateRequest, stream pb.SourceConnector_UpdateServer) error {
config := in.Configuration
selection := in.Selection
state_json := in.GetStateJson()
Expand All @@ -33,31 +33,25 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state)

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_LogEntry{
LogEntry: &pb.LogEntry{
Level: pb.LogLevel_INFO,
Message: "Sync STARTING",
},
},
})
logEntry := map[string]interface{}{
"level": "INFO",
"message": "Sync STARTING",
}
logJSON, _ := json.Marshal(logEntry)
fmt.Println(string(logJSON))

// -- Send UPSERT records
schemaName := "schema1"
for i := 0; i < 3; i++ {
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Operation: &pb.Operation{
Op: &pb.Operation_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.OpType_UPSERT,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-" + strconv.Itoa(i)}},
"a2": {Inner: &pb.ValueType_Double{Double: float64(i) * float64(0.234)}},
},
},
Operation: &pb.UpdateResponse_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.RecordType_UPSERT,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-" + strconv.Itoa(i)}},
"a2": {Inner: &pb.ValueType_Double{Double: float64(i) * float64(0.234)}},
},
},
},
Expand All @@ -68,18 +62,14 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)

// -- Send UPDATE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Operation: &pb.Operation{
Op: &pb.Operation_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.OpType_UPDATE,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-0"}},
"a2": {Inner: &pb.ValueType_Double{Double: float64(110.234)}},
},
},
Operation: &pb.UpdateResponse_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.RecordType_UPDATE,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-0"}},
"a2": {Inner: &pb.ValueType_Double{Double: float64(110.234)}},
},
},
},
Expand All @@ -88,17 +78,13 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)

// -- Send DELETE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Operation: &pb.Operation{
Op: &pb.Operation_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.OpType_DELETE,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-2"}},
},
},
Operation: &pb.UpdateResponse_Record{
Record: &pb.Record{
SchemaName: &schemaName,
TableName: "table1",
Type: pb.RecordType_DELETE,
Data: map[string]*pb.ValueType{
"a1": {Inner: &pb.ValueType_String_{String_: "a-2"}},
},
},
},
Expand All @@ -112,26 +98,20 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)

// -- Send Checkpoint
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Operation: &pb.Operation{
Op: &pb.Operation_Checkpoint{
Checkpoint: &pb.Checkpoint{
StateJson: new_state,
},
},
Operation: &pb.UpdateResponse_Checkpoint{
Checkpoint: &pb.Checkpoint{
StateJson: new_state,
},
},
})

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_LogEntry{
LogEntry: &pb.LogEntry{
Level: pb.LogLevel_INFO,
Message: "Sync DONE",
},
},
})
syncEndLog := map[string]interface{}{
"level": "INFO",
"message": "Sync DONE",
}
syncEndLogJson, _ := json.Marshal(syncEndLog)
fmt.Println(string(syncEndLogJson))

// End the RPC call
return nil
Expand Down Expand Up @@ -189,45 +169,65 @@ func (s *server) ConfigurationForm(ctx context.Context, in *pb.ConfigurationForm
TableSelectionSupported: true,
Fields: []*pb.FormField{
{
Name: "apikey",
Label: "API key",
Required: true,
Type: &pb.FormField_TextField{
TextField: pb.TextField_PlainText,
Field: &pb.FormField_Single{
Single: &pb.Field{
Name: "apikey",
Label: "API key",
Required: true,
Type: &pb.Field_TextField{
TextField: pb.TextField_PlainText,
},
},
},
},
{
Name: "password",
Label: "User password",
Required: true,
Type: &pb.FormField_TextField{
TextField: pb.TextField_Password,
Field: &pb.FormField_Single{
Single: &pb.Field{
Name: "password",
Label: "User password",
Required: true,
Type: &pb.Field_TextField{
TextField: pb.TextField_Password,
},
},
},
},
{
Name: "hidden",
Label: "my-hidden-value",
Type: &pb.FormField_TextField{
TextField: pb.TextField_Hidden,
Field: &pb.FormField_Single{
Single: &pb.Field{
Name: "hidden",
Label: "my-hidden-value",
Type: &pb.Field_TextField{
TextField: pb.TextField_Hidden,
},
},
},
},
{
Name: "isPublic",
Label: "Public?",
Description: &toggleDescription,
Required: false,
Type: &pb.FormField_ToggleField{
ToggleField: &pb.ToggleField{},
Field: &pb.FormField_Single{
Single: &pb.Field{
Name: "isPublic",
Label: "Public?",
Description: &toggleDescription,
Required: false,
Type: &pb.Field_ToggleField{
ToggleField: &pb.ToggleField{},
},
},
},
},
{
Name: "region",
Label: "Region",
Required: true,
Type: &pb.FormField_DropdownField{
DropdownField: &pb.DropdownField{
DropdownField: []string{
"US-EAST", "US-WEST",
Field: &pb.FormField_Single{
Single: &pb.Field{
Name: "region",
Label: "Region",
Required: true,
Type: &pb.Field_DropdownField{
DropdownField: &pb.DropdownField{
DropdownField: []string{
"US-EAST", "US-WEST",
},
},
},
},
},
Expand Down Expand Up @@ -265,7 +265,7 @@ func main() {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterConnectorServer(s, &server{})
pb.RegisterSourceConnectorServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
Expand Down

0 comments on commit d876a8f

Please sign in to comment.