Skip to content

Commit

Permalink
feat: add k8s client in discovery component
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Feb 18, 2024
1 parent 70fe44f commit 9ec841c
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 16 deletions.
17 changes: 10 additions & 7 deletions api/v1alpha1/xlinecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,16 @@ type XlineClusterStatus struct {
type XlineClusterOprStage string

const (
StageXlineScriptCM XlineClusterOprStage = "Xline/ScriptCM"
StageXlineConfigMap XlineClusterOprStage = "Xline/ConfigMap"
StageXlineService XlineClusterOprStage = "Xline/Service"
StageXlineDiscoveryService XlineClusterOprStage = "Xline/DiscoveryService"
StageXlineDiscoveryDeploy XlineClusterOprStage = "Xline/DiscoveryDeploy"
StageXlineStatefulSet XlineClusterOprStage = "Xline/Statefulset"
StageComplete XlineClusterOprStage = "complete"
StageXlineScriptCM XlineClusterOprStage = "Xline/ScriptCM"
StageXlineConfigMap XlineClusterOprStage = "Xline/ConfigMap"
StageXlineService XlineClusterOprStage = "Xline/Service"
StageXlineDiscoveryService XlineClusterOprStage = "Xline/DiscoveryService"
StageXlineDiscoverySA XlineClusterOprStage = "Xline/DiscoverySA"
StageXlineDiscoveryRole XlineClusterOprStage = "Xline/DiscoveryRole"
StageXlineDiscoveryRoleBinding XlineClusterOprStage = "Xline/DiscoveryRoleBinding"
StageXlineDiscoveryDeploy XlineClusterOprStage = "Xline/DiscoveryDeploy"
StageXlineStatefulSet XlineClusterOprStage = "Xline/Statefulset"
StageComplete XlineClusterOprStage = "complete"
)

// XlineClusterRecStatus represents XlineCluster reconcile status
Expand Down
12 changes: 10 additions & 2 deletions cmd/discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,21 @@ func main() {

xcName := os.Getenv("XC_NAME")
if len(xcName) < 1 {
zap.L().Fatal("ENV XC_NAME is not set")
zap.S().Fatal("ENV XC_NAME is not set")
}

ns := os.Getenv("NAMESPACE")
if len(ns) < 1 {
zap.S().Fatal("ENV NAMESPACE is not set")
}

go func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
zap.S().Infof("starting Xline Discovery server, listening on %s", addr)
discoveryServer := server.NewServer()
discoveryServer, err := server.NewServer(ns, xcName)
if err != nil {
zap.S().Fatal("cannot create k8s client: %s", err)
}
discoveryServer.ListenAndServe(addr)
}()

Expand Down
30 changes: 30 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- create
- delete
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand All @@ -68,6 +78,26 @@ rules:
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- rolebindings
verbs:
- create
- delete
- get
- list
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- roles
verbs:
- create
- delete
- get
- list
- watch
- apiGroups:
- xline.io.datenlord.com
resources:
Expand Down
3 changes: 3 additions & 0 deletions internal/controller/xlinecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type XlineClusterReconciler struct {
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;delete
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles,verbs=get;list;watch;create;delete;
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=rolebindings,verbs=get;list;watch;create;delete

func (r *XlineClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
recCtx := reconciler.NewReconcileContext(r.Client, r.Scheme, ctx)
Expand Down
19 changes: 19 additions & 0 deletions internal/reconciler/cluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
tran "github.com/xline-kv/xline-operator/internal/transformer"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -76,6 +77,24 @@ func (r *XlineClusterReconciler) recXlineResources() ClusterStageRecResult {
return clusterStageFail(xapi.StageXlineDiscoveryService, err)
}

// create an xline discovery serviceaccount
discoverySa := tran.MakeDiscoverySA(r.CR, r.Schema)
if err := r.CreateOrUpdate(discoverySa, &corev1.ServiceAccount{}); err != nil {
return clusterStageFail(xapi.StageXlineDiscoverySA, err)
}

// create an xline discovery role
discoveryRole := tran.MakeDiscoveryRole(r.CR, r.Schema)
if err := r.CreateOrUpdate(discoveryRole, &rbacv1.Role{}); err != nil {
return clusterStageFail(xapi.StageXlineDiscoveryRole, err)
}

// create a rolebinding for xline discovery
discoveryRB := tran.MakeDiscoveryRoleBinding(r.CR, r.Schema)
if err := r.CreateOrUpdate(discoveryRB, &rbacv1.RoleBinding{}); err != nil {
return clusterStageFail(xapi.StageXlineDiscoveryRoleBinding, err)
}

// create an xline discovery deployment
mgrDeployName := types.NamespacedName{Name: constants.OperatorDeployName, Namespace: constants.OperatorNamespace}
mgrDeploy := &appv1.Deployment{}
Expand Down
50 changes: 44 additions & 6 deletions internal/server/server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package server

import (
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"strings"

"github.com/emicklei/go-restful"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type Discovery interface {
Expand All @@ -21,6 +27,9 @@ type Server interface {
type server struct {
discovery Discovery
container *restful.Container
cli *kubernetes.Clientset
ns string
name string
}

type discovery struct{}
Expand All @@ -30,17 +39,30 @@ func NewDiscovery() Discovery {
}

func (d *discovery) Discover(advertisePeerUrl string) (string, error) {
return fmt.Sprintf("%s", advertisePeerUrl), nil
return advertisePeerUrl, nil
}

// NewServer creates a new server.
func NewServer() Server {
func NewServer(namespace string, name string) (Server, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

s := &server{
discovery: NewDiscovery(),
container: restful.NewContainer(),
cli: clientset,
ns: namespace,
name: name,
}
s.registerHandlers()
return s
return s, nil
}

func (s *server) registerHandlers() {
Expand All @@ -54,17 +76,33 @@ func (s *server) ListenAndServe(addr string) {
}

func (s *server) newHandler(req *restful.Request, resp *restful.Response) {
pods, err := s.cli.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", s.name),
})
if err != nil {
zap.S().Errorf("failed to get xline running pod: %s", err)
if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil {
zap.S().Errorf("failed to writeError: %v", werr)
}
return
}
var runningPods []string
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
runningPods = append(runningPods, pod.Spec.Hostname)
}
}
encodedAdvertisePeerURL := req.PathParameter("advertise-peer-url")
data, err := base64.StdEncoding.DecodeString(encodedAdvertisePeerURL)
if err != nil {
zap.S().Errorf("failed to decode advertise-peer-url: %s, register-type is: %s", encodedAdvertisePeerURL)
zap.S().Errorf("failed to decode advertise-peer-url: %s", encodedAdvertisePeerURL)
if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil {
zap.S().Errorf("failed to writeError: %v", werr)
}
return
}
advertisePeerURL := string(data)

// advertisePeerURL := string(data)
advertisePeerURL := fmt.Sprintf("%s: %s", string(data), strings.Join(runningPods, ","))
var result string
result, err = s.discovery.Discover(advertisePeerURL)
if err != nil {
Expand Down
57 changes: 56 additions & 1 deletion internal/transformer/xlinecluster_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/xline-kv/xline-operator/internal/util"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -87,6 +88,59 @@ func MakeDiscoveryService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1
return service
}

func MakeDiscoverySA(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.ServiceAccount {
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "xline-discovery",
Namespace: cr.Namespace,
Labels: GetXlineDiscoveryLabels(cr.ObjKey()),
},
}
_ = controllerutil.SetOwnerReference(cr, sa, scheme)
return sa
}

func MakeDiscoveryRole(cr *xapi.XlineCluster, scheme *runtime.Scheme) *rbacv1.Role {
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: "xline-discovery-role",
Namespace: cr.Namespace,
Labels: GetXlineDiscoveryLabels(cr.ObjKey()),
},
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{corev1.GroupName},
Resources: []string{"pods"},
Verbs: []string{"get", "list", "watch"},
},
},
}

_ = controllerutil.SetOwnerReference(cr, role, scheme)
return role
}

func MakeDiscoveryRoleBinding(cr *xapi.XlineCluster, scheme *runtime.Scheme) *rbacv1.RoleBinding {
rb := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "xline-discovery-rolebinding",
Namespace: cr.Namespace,
Labels: GetXlineDiscoveryLabels(cr.ObjKey()),
},
Subjects: []rbacv1.Subject{{
Kind: rbacv1.ServiceAccountKind,
Name: "xline-discovery",
}},
RoleRef: rbacv1.RoleRef{
Kind: "Role",
Name: "xline-discovery-role",
APIGroup: rbacv1.GroupName,
},
}
_ = controllerutil.SetOwnerReference(cr, rb, scheme)
return rb
}

func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme, image string) *appv1.Deployment {
discoveryLabel := GetXlineDiscoveryLabels(cr.ObjKey())
podSpec := corev1.PodSpec{
Expand All @@ -104,10 +158,11 @@ func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme, imag
},
Env: []corev1.EnvVar{
{Name: "XC_NAME", Value: cr.Name},
{Name: "NAMESPACE", Value: cr.Namespace},
},
},
},
// ServiceAccountName: "my-service-account",
ServiceAccountName: "xline-discovery",
}

deploy := &appv1.Deployment{
Expand Down

0 comments on commit 9ec841c

Please sign in to comment.