Skip to content

Commit

Permalink
Merge pull request #1020 from redpanda-data/sj/kafka-connect/get-conn…
Browse files Browse the repository at this point in the history
…ector

Sj/kafka connect/get connector
  • Loading branch information
sago2k8 authored Jan 22, 2024
2 parents 7820bc2 + 1ce960b commit bf800d6
Show file tree
Hide file tree
Showing 17 changed files with 1,663 additions and 818 deletions.
7 changes: 6 additions & 1 deletion backend/pkg/api/connect/integration/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ func (s *APISuite) SetupSuite() {
s.testSeedBroker = seedBroker

// 3. Start Kafka Connect Docker container
kConnectContainer, err := testutil.RunRedpandaConnectorsContainer(ctx, []string{"redpanda:29092"}, network.WithNetwork([]string{"kconnect"}, s.network))
kConnectContainer, err := testutil.RunRedpandaConnectorsContainer(
ctx,
[]string{"redpanda:29092"},
network.WithNetwork([]string{"kconnect"}, s.network),
testcontainers.WithImage("docker.cloudsmith.io/redpanda/connectors-unsupported/connectors:v1.0.0-44344ad"),
)
require.NoError(err)

s.kConnectContainer = kConnectContainer
Expand Down
152 changes: 149 additions & 3 deletions backend/pkg/api/connect/integration/kafkaconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/carlmjohnson/requests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/network"

v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1"
v1alpha1connect "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1/dataplanev1alpha1connect"
"github.com/redpanda-data/console/backend/pkg/testutil"
)

func (s *APISuite) TestListConnectors() {
Expand Down Expand Up @@ -55,7 +57,7 @@ func (s *APISuite) TestListConnectors() {
}))
require.NoError(err)
assert.NotNil(res.Msg, "response message must not be nil")
assert.Equal(0, len(res.Msg.Connectors))
assert.IsType([]*v1alpha1.ListConnectorsResponse_ConnectorInfoStatus{}, res.Msg.Connectors)
})
}

Expand All @@ -64,7 +66,7 @@ func (s *APISuite) TestListConnectClusters() {
require := require.New(t)
assert := assert.New(t)

t.Run("list connect clusters default request (connect-go)", func(t *testing.T) {
t.Run("list connect clusters request (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -77,7 +79,7 @@ func (s *APISuite) TestListConnectClusters() {
assert.Equal("connect-cluster", res.Msg.Clusters[0].Name)
})

t.Run("list connect clusters with default request (http)", func(t *testing.T) {
t.Run("list connect clusters request (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -114,3 +116,147 @@ func (s *APISuite) TestListConnectClusters() {
assert.Equal("connect-cluster", response.Clusters[0].Name)
})
}

func (s *APISuite) TestGetConnectorAndStatus() {
t := s.T()
require := require.New(t)
assert := assert.New(t)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

// Run HTTPBin container
httpC, err := testutil.RunHTTPBinContainer(ctx, network.WithNetwork([]string{"httpbin", "local-httpbin"}, s.network))
require.NoError(err)

client := v1alpha1connect.NewKafkaConnectServiceClient(http.DefaultClient, s.httpAddress())

// Create Connector request
input := &v1alpha1.CreateConnectorRequest{
ClusterName: "connect-cluster",
Connector: &v1alpha1.ConnectorSpec{
Name: "http_connect_input",
Config: map[string]string{
"connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
"http.request.url": "http://httpbin/uuid",
"http.timer.catchup.interval.millis": "10000",
"http.timer.interval.millis": "1000",
"kafka.topic": "httpbin-input",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"name": "http_connect_input",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "1",
"topic.creation.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
},
},
}

_, err = client.CreateConnector(ctx, connect.NewRequest(input))

require.NoError(err)

t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()

// delete connector
_, err := client.DeleteConnector(ctx, connect.NewRequest(
&v1alpha1.DeleteConnectorRequest{
ClusterName: input.ClusterName,
Name: input.Connector.Name,
},
))
require.NoError(err)

// Stop HTTPBin container
err = httpC.Terminate(ctx)
require.NoError(err)
})

t.Run("Get connector request (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetConnector(ctx, connect.NewRequest(
&v1alpha1.GetConnectorRequest{
ClusterName: input.ClusterName,
Name: input.Connector.Name,
}))
require.NoError(err)
assert.NotNil(res.Msg, "response message must not be nil")
assert.Equal("http_connect_input", res.Msg.Connector.Name)
assert.Equal(input.Connector.Config, res.Msg.Connector.Config)
})

t.Run("Get connector request (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

type getConnectorResponse struct {
Name string `json:"name"`
Config map[string]string `json:"config"`
Type string `json:"type"`
}

var response getConnectorResponse
var errResponse string
err := requests.
URL(s.httpAddress() + "/v1alpha1/").
Path("connect/clusters/connect-cluster/connectors/http_connect_input").
AddValidator(requests.ValidatorHandler(
requests.CheckStatus(http.StatusOK),
requests.ToString(&errResponse),
)).
ToJSON(&response).
Fetch(ctx)
assert.Empty(errResponse)
require.NoError(err)
assert.Equal(input.Connector.Name, response.Name)
assert.Equal(input.Connector.Config, response.Config)
assert.Equal("source", response.Type)
})

t.Run("Get connector status (connect-go)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetConnectorStatus(ctx, connect.NewRequest(
&v1alpha1.GetConnectorStatusRequest{
ClusterName: input.ClusterName,
Name: input.Connector.Name,
}))
require.NoError(err)
assert.NotNil(res.Msg, "response message must not be nil")
assert.Equal("http_connect_input", res.Msg.Status.Name)
})

t.Run("Get connector status request (http)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

type getConnectorStatusResponse struct {
Name string `json:"name"`
Type string `json:"type"`
}

var response getConnectorStatusResponse
var errResponse string
err := requests.
URL(s.httpAddress() + "/v1alpha1/").
Path("connect/clusters/connect-cluster/connectors/http_connect_input/status").
AddValidator(requests.ValidatorHandler(
requests.CheckStatus(http.StatusOK),
requests.ToString(&errResponse),
)).
ToJSON(&response).
Fetch(ctx)
assert.Empty(errResponse)
require.NoError(err)
assert.Equal(input.Connector.Name, response.Name)
assert.Equal("source", response.Type)
})
}
40 changes: 30 additions & 10 deletions backend/pkg/api/connect/service/kafkaconnect/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func (m mapper) connectorsHTTPResponseToProto(httpResponse kafkaconnect.ClusterC
}

connectors[i] = &dataplanev1alpha1.ListConnectorsResponse_ConnectorInfoStatus{
Name: connector.Name,
HolisticState: m.holisticStateToProto(connector.Status),
Errors: errors,
Name: connector.Name,
}

connectors[i].Info = &dataplanev1alpha1.ConnectorSpec{
Expand All @@ -49,10 +47,12 @@ func (m mapper) connectorsHTTPResponseToProto(httpResponse kafkaconnect.ClusterC
Connector: &dataplanev1alpha1.ConnectorStatus_Connector{
State: connector.State,
WorkerId: connector.WorkerID,
Trace: connector.Trace,
},
Tasks: m.taskInfoListToProtoStatus(connector.Tasks),
Type: connector.Type,
Trace: connector.Trace,
Tasks: m.taskInfoListToProtoStatus(connector.Tasks),
Type: connector.Type,
Errors: errors,
HolisticState: m.holisticStateToProto(connector.State),
}
}

Expand Down Expand Up @@ -162,7 +162,7 @@ func (mapper) createConnectorProtoToClientRequest(createConnector *dataplanev1al
}, nil
}

func (m mapper) ClusterInfoToProto(clusterInfo kafkaconnect.ClusterInfo) (*dataplanev1alpha1.ConnectCluster, error) {
func (m mapper) clusterInfoToProto(clusterInfo kafkaconnect.ClusterInfo) *dataplanev1alpha1.ConnectCluster {
return &dataplanev1alpha1.ConnectCluster{
Name: clusterInfo.Name,
Address: clusterInfo.Host,
Expand All @@ -172,7 +172,7 @@ func (m mapper) ClusterInfoToProto(clusterInfo kafkaconnect.ClusterInfo) (*datap
KafkaClusterId: clusterInfo.KafkaClusterID,
},
Plugins: m.connectPluginsToProto(clusterInfo.Plugins),
}, nil
}
}

func (mapper) connectPluginsToProto(plugins []con.ConnectorPluginInfo) []*dataplanev1alpha1.ConnectorPlugin {
Expand Down Expand Up @@ -212,8 +212,8 @@ func (m mapper) connectorInfoListToProto(connectorInfoList []kafkaconnect.Cluste
return clusters, errs
}

// ConnectorSpecToProto converts the http response to proto message
func (mapper) ConnectorSpecToProto(connector con.ConnectorInfo) *dataplanev1alpha1.ConnectorSpec {
// connectorSpecToProto converts the http response to proto message
func (mapper) connectorSpecToProto(connector con.ConnectorInfo) *dataplanev1alpha1.ConnectorSpec {
tasks := make([]*dataplanev1alpha1.TaskInfo, len(connector.Tasks))

for i, task := range connector.Tasks {
Expand All @@ -226,9 +226,29 @@ func (mapper) ConnectorSpecToProto(connector con.ConnectorInfo) *dataplanev1alph
Name: connector.Name,
Config: connector.Config,
Tasks: tasks,
Type: connector.Type,
}
}

func (m mapper) connectorStatusToProto(status kafkaconnect.ConnectorStatus) (*dataplanev1alpha1.ConnectorStatus, error) {
errors, err := m.connectorErrorsToProto(status.Errors)
if err != nil {
return nil, fmt.Errorf("failed to map connector error to proto for connector %q: %w", status.Name, err)
}
return &dataplanev1alpha1.ConnectorStatus{
Name: status.Name,
Connector: &dataplanev1alpha1.ConnectorStatus_Connector{
State: status.Connector.State,
WorkerId: status.Connector.WorkerID,
Trace: status.Connector.Trace,
},
Type: status.Type,
Errors: errors,
Tasks: m.taskInfoListToProtoStatus(status.Tasks),
HolisticState: m.holisticStateToProto(status.State),
}, nil
}

// convertStringMapToInterfaceMap converts interface map to string map
func convertStringMapToInterfaceMap(stringMap map[string]string) map[string]any {
interfaceMap := make(map[string]any, len(stringMap))
Expand Down
61 changes: 40 additions & 21 deletions backend/pkg/api/connect/service/kafkaconnect/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,43 @@ func (s *Service) CreateConnector(ctx context.Context, req *connect.Request[v1al
}

// GetConnector implements the handler for the get connector operation
func (*Service) GetConnector(context.Context, *connect.Request[v1alpha1.GetConnectorRequest]) (*connect.Response[v1alpha1.GetConnectorResponse], error) {
return nil, apierrors.NewConnectError(
connect.CodeUnimplemented,
errors.New("endpoint is not implemented"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_KAFKA_CONNECT_API_ERROR.String()),
)
func (s *Service) GetConnector(ctx context.Context, req *connect.Request[v1alpha1.GetConnectorRequest]) (*connect.Response[v1alpha1.GetConnectorResponse], error) {
httpRes, err := s.connectSvc.GetConnectorInfo(ctx, req.Msg.ClusterName, req.Msg.Name)
if err != nil {
return nil, s.matchError(err)
}

res := connect.NewResponse(&v1alpha1.GetConnectorResponse{
Connector: s.mapper.connectorSpecToProto(httpRes),
})

return res, nil
}

// GetConnectorStatus implements the handler for the get connector status operation
func (s *Service) GetConnectorStatus(ctx context.Context, req *connect.Request[v1alpha1.GetConnectorStatusRequest]) (*connect.Response[v1alpha1.GetConnectorStatusResponse], error) {
httpRes, restErr := s.connectSvc.GetConnectorStatus(ctx, req.Msg.ClusterName, req.Msg.Name)
if restErr != nil {
return nil, s.matchError(restErr)
}

status, err := s.mapper.connectorStatusToProto(httpRes)
if err != nil {
s.logger.Error("error mapping response for connector", zap.Error(err), zap.String("cluster", req.Msg.ClusterName), zap.String("connector", req.Msg.Name))
return nil, apierrors.NewConnectError(
connect.CodeInternal,
err,
apierrors.NewErrorInfo(
v1alpha1.Reason_REASON_KAFKA_CONNECT_API_ERROR.String(),
),
)
}

res := connect.NewResponse(&v1alpha1.GetConnectorStatusResponse{
Status: status,
})

return res, nil
}

// ResumeConnector implements the handler for the resume connector operation
Expand Down Expand Up @@ -199,26 +230,14 @@ func (s *Service) ListConnectClusters(ctx context.Context, _ *connect.Request[v1
}), nil
}

// GetConnectCluster implements the handler for the restart connector operation
// GetConnectCluster implements the get connector info operation
func (s *Service) GetConnectCluster(ctx context.Context, req *connect.Request[v1alpha1.GetConnectClusterRequest]) (*connect.Response[v1alpha1.GetConnectClusterResponse], error) {
response, httpErr := s.connectSvc.GetClusterInfo(ctx, req.Msg.ClusterName)
if httpErr != nil {
return nil, s.matchError(httpErr)
}

clusterInfoProto, err := s.mapper.ClusterInfoToProto(response)
if err != nil {
s.logger.Error("unable to map list connectors response", zap.Error(err))
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("not able to parse response"),
apierrors.NewErrorInfo(
v1alpha1.Reason_REASON_KAFKA_CONNECT_API_ERROR.String(),
),
)
}
return connect.NewResponse(&v1alpha1.GetConnectClusterResponse{
Cluster: clusterInfoProto,
Cluster: s.mapper.clusterInfoToProto(response),
}), nil
}

Expand All @@ -242,7 +261,7 @@ func (s *Service) UpsertConnector(ctx context.Context, req *connect.Request[v1al
}

res := connect.NewResponse(&v1alpha1.UpsertConnectorResponse{
Connector: s.mapper.ConnectorSpecToProto(conInfo),
Connector: s.mapper.connectorSpecToProto(conInfo),
})

// Check if connector already exists, if not set header to 201 created
Expand Down
Loading

0 comments on commit bf800d6

Please sign in to comment.