diff --git a/api/v1alpha1/xlinecluster_types.go b/api/v1alpha1/xlinecluster_types.go index 769a735..93b9799 100644 --- a/api/v1alpha1/xlinecluster_types.go +++ b/api/v1alpha1/xlinecluster_types.go @@ -180,13 +180,16 @@ type XlineClusterStatus struct { type XlineClusterOprStage string const ( - StageXlineScriptCM XlineClusterOprStage = "Xline/ScriptCM" - StageXlineConfigMap XlineClusterOprStage = "Xline/ConfigMap" - StageXlineService XlineClusterOprStage = "Xline/Service" - StageXlineDiscoveryService XlineClusterOprStage = "Xline/DiscoveryService" - StageXlineDiscoveryDeploy XlineClusterOprStage = "Xline/DiscoveryDeploy" - StageXlineStatefulSet XlineClusterOprStage = "Xline/Statefulset" - StageComplete XlineClusterOprStage = "complete" + StageXlineScriptCM XlineClusterOprStage = "Xline/ScriptCM" + StageXlineConfigMap XlineClusterOprStage = "Xline/ConfigMap" + StageXlineService XlineClusterOprStage = "Xline/Service" + StageXlineDiscoveryService XlineClusterOprStage = "Xline/DiscoveryService" + StageXlineDiscoverySA XlineClusterOprStage = "Xline/DiscoverySA" + StageXlineDiscoveryRole XlineClusterOprStage = "Xline/DiscoveryRole" + StageXlineDiscoveryRoleBinding XlineClusterOprStage = "Xline/DiscoveryRoleBinding" + StageXlineDiscoveryDeploy XlineClusterOprStage = "Xline/DiscoveryDeploy" + StageXlineStatefulSet XlineClusterOprStage = "Xline/Statefulset" + StageComplete XlineClusterOprStage = "complete" ) // XlineClusterRecStatus represents XlineCluster reconcile status diff --git a/cmd/discovery/main.go b/cmd/discovery/main.go index dc40c46..50f26c8 100644 --- a/cmd/discovery/main.go +++ b/cmd/discovery/main.go @@ -35,13 +35,21 @@ func main() { xcName := os.Getenv("XC_NAME") if len(xcName) < 1 { - zap.L().Fatal("ENV XC_NAME is not set") + zap.S().Fatal("ENV XC_NAME is not set") + } + + ns := os.Getenv("NAMESPACE") + if len(ns) < 1 { + zap.S().Fatal("ENV NAMESPACE is not set") } go func() { addr := fmt.Sprintf("0.0.0.0:%d", port) zap.S().Infof("starting Xline Discovery server, listening on %s", addr) - discoveryServer := server.NewServer() + discoveryServer, err := server.NewServer(ns, xcName) + if err != nil { + zap.S().Fatal("cannot create k8s client: %s", err) + } discoveryServer.ListenAndServe(addr) }() diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3c5618a..63a3dda 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -56,6 +56,16 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - serviceaccounts + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - "" resources: @@ -68,6 +78,26 @@ rules: - patch - update - watch +- apiGroups: + - rbac.authorization.k8s.io + resources: + - rolebindings + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - rbac.authorization.k8s.io + resources: + - roles + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - xline.io.datenlord.com resources: diff --git a/internal/controller/xlinecluster_controller.go b/internal/controller/xlinecluster_controller.go index 0f8cda8..d753ef3 100644 --- a/internal/controller/xlinecluster_controller.go +++ b/internal/controller/xlinecluster_controller.go @@ -43,6 +43,9 @@ type XlineClusterReconciler struct { //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;delete +//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles,verbs=get;list;watch;create;delete; +//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=rolebindings,verbs=get;list;watch;create;delete func (r *XlineClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { recCtx := reconciler.NewReconcileContext(r.Client, r.Scheme, ctx) diff --git a/internal/reconciler/cluster_reconciler.go b/internal/reconciler/cluster_reconciler.go index f581cfb..4a59589 100644 --- a/internal/reconciler/cluster_reconciler.go +++ b/internal/reconciler/cluster_reconciler.go @@ -24,6 +24,7 @@ import ( tran "github.com/xline-kv/xline-operator/internal/transformer" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/types" ) @@ -76,6 +77,24 @@ func (r *XlineClusterReconciler) recXlineResources() ClusterStageRecResult { return clusterStageFail(xapi.StageXlineDiscoveryService, err) } + // create an xline discovery serviceaccount + discoverySa := tran.MakeDiscoverySA(r.CR, r.Schema) + if err := r.CreateOrUpdate(discoverySa, &corev1.ServiceAccount{}); err != nil { + return clusterStageFail(xapi.StageXlineDiscoverySA, err) + } + + // create an xline discovery role + discoveryRole := tran.MakeDiscoveryRole(r.CR, r.Schema) + if err := r.CreateOrUpdate(discoveryRole, &rbacv1.Role{}); err != nil { + return clusterStageFail(xapi.StageXlineDiscoveryRole, err) + } + + // create a rolebinding for xline discovery + discoveryRB := tran.MakeDiscoveryRoleBinding(r.CR, r.Schema) + if err := r.CreateOrUpdate(discoveryRB, &rbacv1.RoleBinding{}); err != nil { + return clusterStageFail(xapi.StageXlineDiscoveryRoleBinding, err) + } + // create an xline discovery deployment mgrDeployName := types.NamespacedName{Name: constants.OperatorDeployName, Namespace: constants.OperatorNamespace} mgrDeploy := &appv1.Deployment{} diff --git a/internal/server/server.go b/internal/server/server.go index 608d7c0..2adb7e4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,13 +1,19 @@ package server import ( + "context" "encoding/base64" "fmt" "io" "net/http" + "strings" "github.com/emicklei/go-restful" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) type Discovery interface { @@ -21,6 +27,9 @@ type Server interface { type server struct { discovery Discovery container *restful.Container + cli *kubernetes.Clientset + ns string + name string } type discovery struct{} @@ -30,17 +39,30 @@ func NewDiscovery() Discovery { } func (d *discovery) Discover(advertisePeerUrl string) (string, error) { - return fmt.Sprintf("%s", advertisePeerUrl), nil + return advertisePeerUrl, nil } // NewServer creates a new server. -func NewServer() Server { +func NewServer(namespace string, name string) (Server, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + s := &server{ discovery: NewDiscovery(), container: restful.NewContainer(), + cli: clientset, + ns: namespace, + name: name, } s.registerHandlers() - return s + return s, nil } func (s *server) registerHandlers() { @@ -54,17 +76,33 @@ func (s *server) ListenAndServe(addr string) { } func (s *server) newHandler(req *restful.Request, resp *restful.Response) { + pods, err := s.cli.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", s.name), + }) + if err != nil { + zap.S().Errorf("failed to get xline running pod: %s", err) + if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil { + zap.S().Errorf("failed to writeError: %v", werr) + } + return + } + var runningPods []string + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning { + runningPods = append(runningPods, pod.Spec.Hostname) + } + } encodedAdvertisePeerURL := req.PathParameter("advertise-peer-url") data, err := base64.StdEncoding.DecodeString(encodedAdvertisePeerURL) if err != nil { - zap.S().Errorf("failed to decode advertise-peer-url: %s, register-type is: %s", encodedAdvertisePeerURL) + zap.S().Errorf("failed to decode advertise-peer-url: %s", encodedAdvertisePeerURL) if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil { zap.S().Errorf("failed to writeError: %v", werr) } return } - advertisePeerURL := string(data) - + // advertisePeerURL := string(data) + advertisePeerURL := fmt.Sprintf("%s: %s", string(data), strings.Join(runningPods, ",")) var result string result, err = s.discovery.Discover(advertisePeerURL) if err != nil { diff --git a/internal/transformer/xlinecluster_resource.go b/internal/transformer/xlinecluster_resource.go index 34cc8fd..5767898 100644 --- a/internal/transformer/xlinecluster_resource.go +++ b/internal/transformer/xlinecluster_resource.go @@ -9,6 +9,7 @@ import ( "github.com/xline-kv/xline-operator/internal/util" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -87,6 +88,59 @@ func MakeDiscoveryService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1 return service } +func MakeDiscoverySA(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.ServiceAccount { + sa := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "xline-discovery", + Namespace: cr.Namespace, + Labels: GetXlineDiscoveryLabels(cr.ObjKey()), + }, + } + _ = controllerutil.SetOwnerReference(cr, sa, scheme) + return sa +} + +func MakeDiscoveryRole(cr *xapi.XlineCluster, scheme *runtime.Scheme) *rbacv1.Role { + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: "xline-discovery-role", + Namespace: cr.Namespace, + Labels: GetXlineDiscoveryLabels(cr.ObjKey()), + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{corev1.GroupName}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list", "watch"}, + }, + }, + } + + _ = controllerutil.SetOwnerReference(cr, role, scheme) + return role +} + +func MakeDiscoveryRoleBinding(cr *xapi.XlineCluster, scheme *runtime.Scheme) *rbacv1.RoleBinding { + rb := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "xline-discovery-rolebinding", + Namespace: cr.Namespace, + Labels: GetXlineDiscoveryLabels(cr.ObjKey()), + }, + Subjects: []rbacv1.Subject{{ + Kind: rbacv1.ServiceAccountKind, + Name: "xline-discovery", + }}, + RoleRef: rbacv1.RoleRef{ + Kind: "Role", + Name: "xline-discovery-role", + APIGroup: rbacv1.GroupName, + }, + } + _ = controllerutil.SetOwnerReference(cr, rb, scheme) + return rb +} + func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme, image string) *appv1.Deployment { discoveryLabel := GetXlineDiscoveryLabels(cr.ObjKey()) podSpec := corev1.PodSpec{ @@ -104,10 +158,11 @@ func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme, imag }, Env: []corev1.EnvVar{ {Name: "XC_NAME", Value: cr.Name}, + {Name: "NAMESPACE", Value: cr.Namespace}, }, }, }, - // ServiceAccountName: "my-service-account", + ServiceAccountName: "xline-discovery", } deploy := &appv1.Deployment{