diff --git a/examples/connector/golang/golang_connector/main.go b/examples/connector/golang/golang_connector/main.go index e26eb6d..5b43481 100644 --- a/examples/connector/golang/golang_connector/main.go +++ b/examples/connector/golang/golang_connector/main.go @@ -20,10 +20,10 @@ type MyState struct { } type server struct { - pb.UnimplementedConnectorServer + pb.UnimplementedSourceConnectorServer } -func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) error { +func (s *server) Update(in *pb.UpdateRequest, stream pb.SourceConnector_UpdateServer) error { config := in.Configuration selection := in.Selection state_json := in.GetStateJson() @@ -33,31 +33,25 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state) // -- Send a log message - stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_LogEntry{ - LogEntry: &pb.LogEntry{ - Level: pb.LogLevel_INFO, - Message: "Sync STARTING", - }, - }, - }) + logEntry := map[string]interface{}{ + "level": "INFO", + "message": "Sync STARTING", + } + logJSON, _ := json.Marshal(logEntry) + fmt.Println(string(logJSON)) // -- Send UPSERT records schemaName := "schema1" for i := 0; i < 3; i++ { stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_Operation{ - Operation: &pb.Operation{ - Op: &pb.Operation_Record{ - Record: &pb.Record{ - SchemaName: &schemaName, - TableName: "table1", - Type: pb.OpType_UPSERT, - Data: map[string]*pb.ValueType{ - "a1": {Inner: &pb.ValueType_String_{String_: "a-" + strconv.Itoa(i)}}, - "a2": {Inner: &pb.ValueType_Double{Double: float64(i) * float64(0.234)}}, - }, - }, + Operation: &pb.UpdateResponse_Record{ + Record: &pb.Record{ + SchemaName: &schemaName, + TableName: "table1", + Type: pb.RecordType_UPSERT, + Data: map[string]*pb.ValueType{ + "a1": {Inner: &pb.ValueType_String_{String_: "a-" + strconv.Itoa(i)}}, + "a2": {Inner: &pb.ValueType_Double{Double: float64(i) * float64(0.234)}}, }, }, }, @@ -68,18 +62,14 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) // -- Send UPDATE record stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_Operation{ - Operation: &pb.Operation{ - Op: &pb.Operation_Record{ - Record: &pb.Record{ - SchemaName: &schemaName, - TableName: "table1", - Type: pb.OpType_UPDATE, - Data: map[string]*pb.ValueType{ - "a1": {Inner: &pb.ValueType_String_{String_: "a-0"}}, - "a2": {Inner: &pb.ValueType_Double{Double: float64(110.234)}}, - }, - }, + Operation: &pb.UpdateResponse_Record{ + Record: &pb.Record{ + SchemaName: &schemaName, + TableName: "table1", + Type: pb.RecordType_UPDATE, + Data: map[string]*pb.ValueType{ + "a1": {Inner: &pb.ValueType_String_{String_: "a-0"}}, + "a2": {Inner: &pb.ValueType_Double{Double: float64(110.234)}}, }, }, }, @@ -88,17 +78,13 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) // -- Send DELETE record stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_Operation{ - Operation: &pb.Operation{ - Op: &pb.Operation_Record{ - Record: &pb.Record{ - SchemaName: &schemaName, - TableName: "table1", - Type: pb.OpType_DELETE, - Data: map[string]*pb.ValueType{ - "a1": {Inner: &pb.ValueType_String_{String_: "a-2"}}, - }, - }, + Operation: &pb.UpdateResponse_Record{ + Record: &pb.Record{ + SchemaName: &schemaName, + TableName: "table1", + Type: pb.RecordType_DELETE, + Data: map[string]*pb.ValueType{ + "a1": {Inner: &pb.ValueType_String_{String_: "a-2"}}, }, }, }, @@ -112,26 +98,20 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) // -- Send Checkpoint stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_Operation{ - Operation: &pb.Operation{ - Op: &pb.Operation_Checkpoint{ - Checkpoint: &pb.Checkpoint{ - StateJson: new_state, - }, - }, + Operation: &pb.UpdateResponse_Checkpoint{ + Checkpoint: &pb.Checkpoint{ + StateJson: new_state, }, }, }) // -- Send a log message - stream.Send(&pb.UpdateResponse{ - Response: &pb.UpdateResponse_LogEntry{ - LogEntry: &pb.LogEntry{ - Level: pb.LogLevel_INFO, - Message: "Sync DONE", - }, - }, - }) + syncEndLog := map[string]interface{}{ + "level": "INFO", + "message": "Sync DONE", + } + syncEndLogJson, _ := json.Marshal(syncEndLog) + fmt.Println(string(syncEndLogJson)) // End the RPC call return nil @@ -189,45 +169,65 @@ func (s *server) ConfigurationForm(ctx context.Context, in *pb.ConfigurationForm TableSelectionSupported: true, Fields: []*pb.FormField{ { - Name: "apikey", - Label: "API key", - Required: true, - Type: &pb.FormField_TextField{ - TextField: pb.TextField_PlainText, + Field: &pb.FormField_Single{ + Single: &pb.Field{ + Name: "apikey", + Label: "API key", + Required: true, + Type: &pb.Field_TextField{ + TextField: pb.TextField_PlainText, + }, + }, }, }, { - Name: "password", - Label: "User password", - Required: true, - Type: &pb.FormField_TextField{ - TextField: pb.TextField_Password, + Field: &pb.FormField_Single{ + Single: &pb.Field{ + Name: "password", + Label: "User password", + Required: true, + Type: &pb.Field_TextField{ + TextField: pb.TextField_Password, + }, + }, }, }, { - Name: "hidden", - Label: "my-hidden-value", - Type: &pb.FormField_TextField{ - TextField: pb.TextField_Hidden, + Field: &pb.FormField_Single{ + Single: &pb.Field{ + Name: "hidden", + Label: "my-hidden-value", + Type: &pb.Field_TextField{ + TextField: pb.TextField_Hidden, + }, + }, }, }, { - Name: "isPublic", - Label: "Public?", - Description: &toggleDescription, - Required: false, - Type: &pb.FormField_ToggleField{ - ToggleField: &pb.ToggleField{}, + Field: &pb.FormField_Single{ + Single: &pb.Field{ + Name: "isPublic", + Label: "Public?", + Description: &toggleDescription, + Required: false, + Type: &pb.Field_ToggleField{ + ToggleField: &pb.ToggleField{}, + }, + }, }, }, { - Name: "region", - Label: "Region", - Required: true, - Type: &pb.FormField_DropdownField{ - DropdownField: &pb.DropdownField{ - DropdownField: []string{ - "US-EAST", "US-WEST", + Field: &pb.FormField_Single{ + Single: &pb.Field{ + Name: "region", + Label: "Region", + Required: true, + Type: &pb.Field_DropdownField{ + DropdownField: &pb.DropdownField{ + DropdownField: []string{ + "US-EAST", "US-WEST", + }, + }, }, }, }, @@ -265,7 +265,7 @@ func main() { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() - pb.RegisterConnectorServer(s, &server{}) + pb.RegisterSourceConnectorServer(s, &server{}) log.Printf("server listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err)