Skip to content

Commit

Permalink
feat: integrate discovery repo into xline operator
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Feb 18, 2024
1 parent b5d58a5 commit 70fe44f
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 20 deletions.
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ COPY go.sum go.sum
RUN export GOPROXY=https://goproxy.io/ && go mod download

# Copy the go source
COPY cmd/main.go cmd/main.go
COPY cmd/operator/main.go cmd/operator/main.go
COPY cmd/discovery/main.go cmd/discovery/main.go
COPY api/ api/
COPY internal/ internal/

Expand All @@ -21,14 +22,16 @@ COPY internal/ internal/
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/operator/main.go
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o discovery cmd/discovery/main.go

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
# FROM gcr.io/distroless/static:nonroot
FROM kubeimages/distroless-static:latest
WORKDIR /
COPY --from=builder /workspace/manager .
COPY --from=builder /workspace/discovery .
USER 65532:65532

ENTRYPOINT ["/manager"]
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes

.PHONY: build
build: manifests generate fmt vet ## Build manager binary.
go build -o bin/manager cmd/main.go
go build -o bin/manager cmd/operator/main.go
go build -o bin/discovery cmd/discovery/main.go

.PHONY: run
run: manifests generate fmt vet ## Run a controller from your host.
go run ./cmd/main.go
go run ./cmd/operator/main.go

# If you wish to build the manager image targeting other platforms you can use the --platform flag.
# (i.e. docker build --platform linux/arm64). However, you must enable docker buildKit for it.
Expand Down
69 changes: 69 additions & 0 deletions cmd/discovery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"

"github.com/xline-kv/xline-operator/internal/server"

"go.uber.org/zap"
)

var port int

func init() {
zap.ReplaceGlobals(zap.Must(zap.NewProduction()))
flag.IntVar(&port, "port", 10086, "The port that the xline discovery's http service runs on (default 10086)")
flag.Parse()
}

// discovery_url="my-xline-cluster-discovery.default.svc:10086"
// domain="my-xline-cluster-0.my-xline-cluster.default.svc.cluster.local"
// encoded_domain_url=`echo ${domain}:2380 | base64 | tr "\n" " " | sed "s/ //g"`
// wget -qO- -T 3 http://${discovery_url}/new/${encoded_domain_url}

func main() {
flag.CommandLine.VisitAll(func(flag *flag.Flag) {
zap.S().Info("FLAG: --%s=%q", flag.Name, flag.Value)
})

xcName := os.Getenv("XC_NAME")
if len(xcName) < 1 {
zap.L().Fatal("ENV XC_NAME is not set")
}

go func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
zap.S().Infof("starting Xline Discovery server, listening on %s", addr)
discoveryServer := server.NewServer()
discoveryServer.ListenAndServe(addr)
}()

srv := http.Server{Addr: ":6060"}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)

go func() {
sig := <-sc
zap.S().Infof("got signal %s to exit", sig)
if err2 := srv.Shutdown(context.Background()); err2 != nil {
zap.S().Fatal("fail to shutdown the HTTP server", err2)
}
}()

if err := srv.ListenAndServe(); err != http.ErrServerClosed {
zap.S().Fatal(err)
}
zap.S().Infof("xline-discovery exited!!")
}
File renamed without changes.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ module github.com/xline-kv/xline-operator
go 1.20

require (
github.com/emicklei/go-restful v2.16.0+incompatible
github.com/go-logr/logr v1.2.4
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.25.0
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
sigs.k8s.io/controller-runtime v0.16.3
)

Expand Down Expand Up @@ -49,7 +52,6 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
Expand All @@ -68,7 +70,6 @@ require (
k8s.io/component-base v0.28.3 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epTksy9HQ1MyrbDXSdYhM=
github.com/emicklei/go-restful v2.16.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
Expand Down
8 changes: 8 additions & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package constants

const (
XlinePort = 2379
DiscoveryPort = 10086
OperatorNamespace = "xline-operator-system"
OperatorDeployName = "xline-operator-controller-manager"
)
10 changes: 9 additions & 1 deletion internal/reconciler/cluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package reconciler

import (
xapi "github.com/xline-kv/xline-operator/api/v1alpha1"
"github.com/xline-kv/xline-operator/internal/constants"
tran "github.com/xline-kv/xline-operator/internal/transformer"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

// XlineClusterReconciler reconciles a XlineCluster object
Expand Down Expand Up @@ -75,7 +77,13 @@ func (r *XlineClusterReconciler) recXlineResources() ClusterStageRecResult {
}

// create an xline discovery deployment
discoveryDeploy := tran.MakeDiscoveryDeployment(r.CR, r.Schema)
mgrDeployName := types.NamespacedName{Name: constants.OperatorDeployName, Namespace: constants.OperatorNamespace}
mgrDeploy := &appv1.Deployment{}
if err := r.Get(r.Ctx, mgrDeployName, mgrDeploy); err != nil {
return clusterStageFail(xapi.StageXlineDiscoveryDeploy, err)
}
discoveryImage := mgrDeploy.Spec.Template.Spec.Containers[1].Image
discoveryDeploy := tran.MakeDiscoveryDeployment(r.CR, r.Schema, discoveryImage)
if err := r.CreateOrUpdate(discoveryDeploy, &appv1.Deployment{}); err != nil {
return clusterStageFail(xapi.StageXlineDiscoveryDeploy, err)
}
Expand Down
82 changes: 82 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package server

import (
"encoding/base64"
"fmt"
"io"
"net/http"

"github.com/emicklei/go-restful"
"go.uber.org/zap"
)

type Discovery interface {
Discover(string) (string, error)
}

type Server interface {
ListenAndServe(addr string)
}

type server struct {
discovery Discovery
container *restful.Container
}

type discovery struct{}

func NewDiscovery() Discovery {
return &discovery{}
}

func (d *discovery) Discover(advertisePeerUrl string) (string, error) {
return fmt.Sprintf("%s", advertisePeerUrl), nil
}

// NewServer creates a new server.
func NewServer() Server {
s := &server{
discovery: NewDiscovery(),
container: restful.NewContainer(),
}
s.registerHandlers()
return s
}

func (s *server) registerHandlers() {
ws := new(restful.WebService)
ws.Route(ws.GET("/new/{advertise-peer-url}").To(s.newHandler))
s.container.Add(ws)
}

func (s *server) ListenAndServe(addr string) {
zap.S().Fatal(http.ListenAndServe(addr, s.container.ServeMux))
}

func (s *server) newHandler(req *restful.Request, resp *restful.Response) {
encodedAdvertisePeerURL := req.PathParameter("advertise-peer-url")
data, err := base64.StdEncoding.DecodeString(encodedAdvertisePeerURL)
if err != nil {
zap.S().Errorf("failed to decode advertise-peer-url: %s, register-type is: %s", encodedAdvertisePeerURL)
if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil {
zap.S().Errorf("failed to writeError: %v", werr)
}
return
}
advertisePeerURL := string(data)

var result string
result, err = s.discovery.Discover(advertisePeerURL)
if err != nil {
zap.S().Errorf("failed to discover: %s, %v", advertisePeerURL, err)
if werr := resp.WriteError(http.StatusInternalServerError, err); werr != nil {
zap.S().Errorf("failed to writeError: %v", werr)
}
return
}

zap.S().Infof("generated args for %s: %s", advertisePeerURL, result)
if _, err := io.WriteString(resp, result); err != nil {
zap.S().Errorf("failed to writeString: %s, %v", result, err)
}
}
22 changes: 9 additions & 13 deletions internal/transformer/xlinecluster_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

xapi "github.com/xline-kv/xline-operator/api/v1alpha1"
"github.com/xline-kv/xline-operator/internal/constants"
"github.com/xline-kv/xline-operator/internal/util"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -15,11 +16,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
XlinePort = 2379
DiscoveryPort = 10086
)

func GetXlineInstanceLabels(xlineClusterName types.NamespacedName) map[string]string {
return MakeResourceLabels(xlineClusterName.Name)
}
Expand All @@ -34,7 +30,7 @@ func GetMemberTopology(cr *xapi.XlineCluster) string {
for i := 0; i < replicas; i++ {
podName := fmt.Sprintf("%s-%d", cr.Name, i)
dnsName := fmt.Sprintf("%s.%s.%s.svc.cluster.local", podName, cr.Name, cr.Namespace)
members[i] = fmt.Sprintf("%s=%s:%d", podName, dnsName, XlinePort)
members[i] = fmt.Sprintf("%s=%s:%d", podName, dnsName, constants.XlinePort)
}
return strings.Join(members, ",")
}
Expand Down Expand Up @@ -81,7 +77,7 @@ func MakeDiscoveryService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1
Ports: []corev1.ServicePort{
{
Name: "discovery-port",
Port: DiscoveryPort,
Port: constants.DiscoveryPort,
},
},
Selector: svcLabel,
Expand All @@ -91,19 +87,19 @@ func MakeDiscoveryService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1
return service
}

func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme) *appv1.Deployment {
func MakeDiscoveryDeployment(cr *xapi.XlineCluster, scheme *runtime.Scheme, image string) *appv1.Deployment {
discoveryLabel := GetXlineDiscoveryLabels(cr.ObjKey())
podSpec := corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "xline-discovery",
Image: "phoenix500526/discovery:v0.1.1",
Image: image,
Command: []string{
"/usr/local/bin/discovery",
"/discovery",
},
Ports: []corev1.ContainerPort{
{
ContainerPort: DiscoveryPort,
ContainerPort: constants.DiscoveryPort,
},
},
Env: []corev1.EnvVar{
Expand Down Expand Up @@ -148,7 +144,7 @@ func MakeService(cr *xapi.XlineCluster, scheme *runtime.Scheme) *corev1.Service
Ports: []corev1.ServicePort{
{
Name: "xline-port",
Port: XlinePort,
Port: constants.XlinePort,
},
},
Selector: svcLabel,
Expand Down Expand Up @@ -229,7 +225,7 @@ func MakeStatefulSet(cr *xapi.XlineCluster, scheme *runtime.Scheme) *appv1.State
Image: *cr.Spec.Image,
ImagePullPolicy: cr.Spec.ImagePullPolicy,
Ports: []corev1.ContainerPort{
{Name: "xline-port", ContainerPort: XlinePort},
{Name: "xline-port", ContainerPort: constants.XlinePort},
},
Command: []string{"/bin/bash", "/usr/local/script/xline_start_script.sh"},
Env: envs,
Expand Down

0 comments on commit 70fe44f

Please sign in to comment.