Skip to content

Commit

Permalink
feat: optimize connector data structure
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai authored and wenfengwang committed Feb 13, 2023
1 parent 918541c commit 51f0f6b
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 6 deletions.
3 changes: 3 additions & 0 deletions api/models/connector_info.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/restapi/embedded_spec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ definitions:
status:
type: "string"
description: "connector status"
reason:
type: "string"
description: "connector status reason"

Controller_info:
type: "object"
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/connector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ type ConnectorSpec struct {
// Important: Run "make" to regenerate code after modifying this file

// Kind is the kind of connector, support source/sink.
Kind string `json:"type,omitempty"`
Kind string `json:"kind,omitempty"`
// Name is the name of connector.
Name string `json:"name,omitempty"`
// Type is the type of connector.
Type string `json:"type,omitempty"`
// Config is the file of config.
Config string `json:"config,omitempty"`
// Image is the name of the controller docker image to use for the Pods.
Expand Down
5 changes: 4 additions & 1 deletion config/crd/bases/vanus.linkall.com_connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ spec:
imagePullPolicy:
description: ImagePullPolicy defines how the image is pulled
type: string
kind:
description: Kind is the kind of connector, support source/sink.
type: string
name:
description: Name is the name of connector.
type: string
type:
description: Type is the type of connector, support source/sink.
description: Type is the type of connector.
type: string
type: object
status:
Expand Down
109 changes: 105 additions & 4 deletions pkg/apiserver/handlers/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,32 @@ import (
"github.com/linkall-labs/vanus-operator/pkg/apiserver/utils"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
log "k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
// PodStatusStatusPending captures enum value "Pending"
PodStatusStatusPending string = "Pending"

// PodStatusStatusRunning captures enum value "Running"
PodStatusStatusRunning string = "Running"

// PodStatusStatusSucceeded captures enum value "Succeeded"
PodStatusStatusSucceeded string = "Succeeded"

// PodStatusStatusStarting captures enum value "Starting"
PodStatusStatusStarting string = "Starting"

// PodStatusStatusFailed captures enum value "Failed"
PodStatusStatusFailed string = "Failed"

// PodStatusStatusRemoving captures enum value "Removing"
PodStatusStatusRemoving string = "Removing"

// PodStatusStatusUnknown captures enum value "Unknown"
PodStatusStatusUnknown string = "Unknown"
)

// All registered processing functions should appear under Registxxx in order
Expand Down Expand Up @@ -109,11 +134,18 @@ func (a *Api) listConnectorHandler(params connector.ListConnectorParams) middlew
msg := "list connector success"
data := make([]*models.ConnectorInfo, 0)
for _, c := range connectors.Items {
status, reason, err := a.getConnectorStatus(c.Name)
if err != nil {
log.Error(err, "Get Connector status failed", "Connector.Namespace: ", cons.DefaultNamespace, "Connector.Name: ", c.Name)
return utils.Response(0, err)
}
data = append(data, &models.ConnectorInfo{
Kind: c.Spec.Kind,
Name: c.Spec.Name,
Version: strings.Split(c.Spec.Image, ":")[1],
Status: "Running",
Type: c.Spec.Type,
Version: getConnectorVersion(c.Spec.Image),
Status: status,
Reason: reason,
})
}
return connector.NewListConnectorOK().WithPayload(&connector.ListConnectorOKBody{
Expand All @@ -131,13 +163,22 @@ func (a *Api) getConnectorHandler(params connector.GetConnectorParams) middlewar
}
retcode := int32(400)
msg := "get connector success"

status, reason, err := a.getConnectorStatus(params.Name)
if err != nil {
log.Error(err, "Get Connector status failed", "Connector.Namespace: ", cons.DefaultNamespace, "Connector.Name: ", params.Name)
return utils.Response(0, err)
}

return connector.NewGetConnectorOK().WithPayload(&connector.GetConnectorOKBody{
Code: &retcode,
Data: &models.ConnectorInfo{
Kind: c.Spec.Kind,
Name: c.Spec.Name,
Version: strings.Split(c.Spec.Image, ":")[1],
Status: "Running",
Type: c.Spec.Type,
Version: getConnectorVersion(c.Spec.Image),
Status: status,
Reason: reason,
},
Message: &msg,
})
Expand Down Expand Up @@ -196,10 +237,70 @@ func generateConnector(c *connectorConfig) *vanusv1alpha1.Connector {
Spec: vanusv1alpha1.ConnectorSpec{
Name: c.name,
Kind: c.kind,
Type: c.ctype,
Config: c.config,
Image: fmt.Sprintf("public.ecr.aws/vanus/connector/%s-%s:%s", c.kind, c.ctype, c.version),
ImagePullPolicy: corev1.PullIfNotPresent,
},
}
return controller
}

func getConnectorVersion(s string) string {
return strings.Split(s, ":")[1]
}

func (a *Api) getConnectorStatus(name string) (string, string, error) {
pods := &corev1.PodList{}
l := make(map[string]string)
l["app"] = name
opts := &ctrlclient.ListOptions{Namespace: cons.DefaultNamespace, LabelSelector: labels.SelectorFromSet(l)}
err := a.ctrl.List(pods, opts)
if err != nil {
log.Error(err, "Get Connector status failed", "Connector.Namespace: ", cons.DefaultNamespace, "Connector.Name: ", name)
return "", "", err
}
var status, reason string
if len(pods.Items) != 0 {
status, reason = statusCheck(&pods.Items[0])
}
return status, reason, nil
}

func statusCheck(a *corev1.Pod) (string, string) {
if a == nil {
return PodStatusStatusUnknown, ""
}
if a.DeletionTimestamp != nil {
return PodStatusStatusRemoving, ""
}
// Status: Pending/Succeeded/Failed/Unknown
if a.Status.Phase != corev1.PodRunning {
return string(a.Status.Phase), a.Status.Reason
}
// handle running
var (
containers = a.Status.ContainerStatuses
rnum int
)

for _, v := range containers {
if v.Ready {
rnum++
continue
}
if v.State.Terminated != nil {
if v.State.Terminated.ExitCode != 0 {
return PodStatusStatusFailed, v.State.Terminated.Reason
}
if v.State.Waiting != nil {
return PodStatusStatusStarting, v.State.Waiting.Reason
}
}
}
if rnum == len(containers) {
return PodStatusStatusRunning, ""
} else {
return PodStatusStatusStarting, ""
}
}

0 comments on commit 51f0f6b

Please sign in to comment.