Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
+ add fake client
Browse files Browse the repository at this point in the history
  • Loading branch information
kebe7jun committed Apr 1, 2022
1 parent de3a16f commit 6f8e14c
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 3 deletions.
2 changes: 1 addition & 1 deletion api/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS
var labels *v1.LabelSelector
var paginate page.Paginate
var labelSelectorStr string
clusterPrefix := "dsm-cluster-"
clusterPrefix := constants.ClusterPrefix
cluster := ""
query := r.URL.Query()
for k, v := range query {
Expand Down
2 changes: 2 additions & 0 deletions common/constants/ckube.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
KeyTypeStr = "str"
SearchPartsSep = ';'
DSMClusterAnno = "ckube.doacloud.io/cluster"
ClusterPrefix = "dsm-cluster-"
)

var (
Expand All @@ -22,4 +23,5 @@ var (
_ = KeyTypeStr
_ = SearchPartsSep
_ = DSMClusterAnno
_ = ClusterPrefix
)
182 changes: 182 additions & 0 deletions pkg/client/fake/fake_ckube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package fake

import (
"encoding/json"
"github.com/DaoCloud/ckube/common"
"github.com/DaoCloud/ckube/common/constants"
"github.com/DaoCloud/ckube/server"
"github.com/DaoCloud/ckube/store"
"github.com/DaoCloud/ckube/store/memory"
"github.com/DaoCloud/ckube/watcher"
"github.com/gorilla/mux"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"net/http"
"os"
"strings"
)

type fakeCkubeServer struct {
store store.Store
ser server.Server
kubeConfig *rest.Config
eventChan chan Event
}

func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) {
cfg := common.Config{}
err := json.Unmarshal([]byte(config), &cfg)
if err != nil {
return nil, err
}
cfg.Token = ""
common.InitConfig(&cfg)
indexConf := map[store.GroupVersionResource]map[string]string{}
storeGVRConfig := []store.GroupVersionResource{}
for _, proxy := range cfg.Proxies {
indexConf[store.GroupVersionResource{
Group: proxy.Group,
Version: proxy.Version,
Resource: proxy.Resource,
}] = proxy.Index
storeGVRConfig = append(storeGVRConfig, store.GroupVersionResource{
Group: proxy.Group,
Version: proxy.Version,
Resource: proxy.Resource,
})
}
m := memory.NewMemoryStore(indexConf)
s := fakeCkubeServer{
store: m,
eventChan: make(chan Event),
kubeConfig: &rest.Config{
Host: "http://" + func() string {
parts := strings.Split(listenAddr, ":")
if parts[0] == "" {
return listenAddr
}
return "127.0.0.1:" + parts[1]
}(),
},
}
ser := server.NewMuxServer(listenAddr, nil, m, s.registerFakeRoute)
s.ser = ser
go ser.Run()
return &s, nil
}

func NewFakeCKubeServerWithConfigPath(listenAddr string, cfgPath string) (CkubeServer, error) {
bs, err := os.ReadFile(cfgPath)
if err != nil {
return nil, err
}
return NewFakeCKubeServer(listenAddr, string(bs))
}

func (s *fakeCkubeServer) Stop() {
s.ser.Stop()
}

func (s *fakeCkubeServer) Events() <-chan Event {
return s.eventChan
}

func (s *fakeCkubeServer) GetKubeConfig() *rest.Config {
return s.kubeConfig
}

func (s *fakeCkubeServer) registerFakeRoute(r *mux.Router) {
for _, p := range []string{
"/apis/{group}/{version}/{resourceType}",
"/api/{version}/{resourceType}",
"/apis/{group}/{version}/{resourceType}/{resourceName}",
"/api/{version}/{resourceType}/{resourceName}",
"/apis/{group}/{version}/namespaces/{namespace}/{resourceType}",
"/api/{version}/namespaces/{namespace}/{resourceType}",
"/apis/{group}/{version}/namespaces/{namespace}/{resourceType}/{resourceName}",
"/api/{version}/namespaces/{namespace}/{resourceType}/{resourceName}",
} {
r.Path(p).Methods("POST", "PUT", "DELETE").HandlerFunc(s.proxy)
}
}

func jsonResp(writer http.ResponseWriter, status int, v interface{}) {
b, _ := json.Marshal(v)
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(status)
writer.Write(b)
}

func errorProxy(w http.ResponseWriter, err metav1.Status) {
jsonResp(w, int(err.Code), err)
}

func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) {
group := mux.Vars(r)["group"]
version := mux.Vars(r)["version"]
resourceType := mux.Vars(r)["resourceType"]
namespace := mux.Vars(r)["namespace"]
resourceName := mux.Vars(r)["resourceName"]
gvr := store.GroupVersionResource{
Group: group,
Version: version,
Resource: resourceType,
}

query := r.URL.Query()
cluster := common.GetConfig().DefaultCluster
for k, v := range query {
switch k {
case "fieldManager", "resourceVersion": // For Get Create Patch Update actions.
if strings.HasPrefix(v[0], constants.ClusterPrefix) {
cluster = v[0][len(constants.ClusterPrefix):]
}
}
}
obj := watcher.ObjType{}
bs, err := io.ReadAll(r.Body)
if err != nil {
errorProxy(writer, metav1.Status{
Message: err.Error(),
Code: 500,
})
return
}
json.Unmarshal(bs, &obj)
obj.Namespace = namespace
if resourceName == "" {
resourceName = obj.Name
}
action := EventActionError
switch r.Method {
case "POST":
action = EventActionAdd
s.store.OnResourceAdded(gvr, cluster, &obj)
case "PUT":
action = EventActionUpdate
s.store.OnResourceModified(gvr, cluster, &obj)
case "DELETE":
del := metav1.DeleteOptions{}
json.Unmarshal(bs, &del)
if len(del.DryRun) == 1 && strings.HasPrefix(del.DryRun[0], constants.ClusterPrefix) {
cluster = del.DryRun[0][len(constants.ClusterPrefix):]
}
obj.Name = resourceName
obj.Namespace = namespace
s.store.OnResourceDeleted(gvr, cluster, &obj)
}
e := Event{
EventAction: action,
Group: group,
Version: version,
Resource: resourceType,
Namespace: namespace,
Name: resourceName,
}
select {
case s.eventChan <- e:
default:
}
jsonResp(writer, 200, obj)
}
139 changes: 139 additions & 0 deletions pkg/client/fake/fake_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package fake

import (
"context"
"github.com/DaoCloud/ckube/page"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"testing"
)

func TestNewFakeCKubeServer(t *testing.T) {
s, err := NewFakeCKubeServer(":65521", `
{
"proxies": [
{
"group": "",
"version": "v1",
"resource": "pods",
"list_kind": "PodList",
"index": {
"namespace": "{.metadata.namespace}",
"name": "{.metadata.name}",
"labels": "{.metadata.labels}",
"created_at": "{.metadata.creationTimestamp}"
}
}
]
}
`)
assert.NoError(t, err)
defer s.Stop()
cfb := s.GetKubeConfig()
cli, err := kubernetes.NewForConfig(cfb)
assert.NoError(t, err)
t.Run("create pods", func(t *testing.T) {
coptc1, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c1")
coptc2, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c2")
_, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "ClusterFirst",
},
}, coptc1)
assert.NoError(t, err)
_, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "ClusterFirst",
},
}, coptc2)
assert.NoError(t, err)
})
t.Run("get pod each cluster", func(t *testing.T) {
goptc1, _ := page.QueryGetOptions(metav1.GetOptions{}, "c1")
goptc2, _ := page.QueryGetOptions(metav1.GetOptions{}, "c2")
p1, err := cli.CoreV1().Pods("test").Get(context.Background(), "pod1", goptc1)
assert.NoError(t, err)
assert.Equal(t, v1.DNSPolicy("ClusterFirst"), p1.Spec.DNSPolicy)
_, err = cli.CoreV1().Pods("test").Get(context.Background(), "pod1", goptc2)
assert.Error(t, err)
})
t.Run("list pods", func(t *testing.T) {
p := page.Paginate{}
p.Clusters([]string{"c1", "c2"})
lopts, _ := page.QueryListOptions(metav1.ListOptions{}, p)
pods, err := cli.CoreV1().Pods("test").List(context.Background(), lopts)
assert.NoError(t, err)
assert.Len(t, pods.Items, 2)
})
t.Run("update pods", func(t *testing.T) {
uoptc1, _ := page.QueryUpdateOptions(metav1.UpdateOptions{}, "c1")
_, err := cli.CoreV1().Pods("test").Update(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "Default",
},
}, uoptc1)
assert.NoError(t, err)
goptc1, _ := page.QueryGetOptions(metav1.GetOptions{}, "c1")
p1, err := cli.CoreV1().Pods("test").Get(context.Background(), "pod1", goptc1)
assert.Equal(t, v1.DNSPolicy("Default"), p1.Spec.DNSPolicy)
})
t.Run("delete pods", func(t *testing.T) {
doptc1, _ := page.QueryDeleteOptions(metav1.DeleteOptions{}, "c1")
err := cli.CoreV1().Pods("test").Delete(context.Background(), "pod1", doptc1)
assert.NoError(t, err)
goptc1, _ := page.QueryGetOptions(metav1.GetOptions{}, "c1")
_, err = cli.CoreV1().Pods("test").Get(context.Background(), "pod1", goptc1)
assert.Error(t, err)
})

t.Run("events", func(t *testing.T) {
events := []Event{}
go func() {
for {
select {
case e := <-s.Events():
events = append(events, e)
}
}
}()
coptc1, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c1")
_, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "ClusterFirst",
},
}, coptc1)
assert.NoError(t, err)
assert.Equal(t, []Event{
{
EventAction: EventActionAdd,
Group: "",
Version: "v1",
Resource: "pods",
Namespace: "test",
Name: "pod1",
},
}, events)
})
}
27 changes: 27 additions & 0 deletions pkg/client/fake/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package fake

import "k8s.io/client-go/rest"

type EventAction int

const (
EventActionError EventAction = 0
EventActionAdd EventAction = 1
EventActionUpdate EventAction = 2
EventActionDelete EventAction = 2
)

type Event struct {
EventAction
Group string
Version string
Resource string
Namespace string
Name string
}

type CkubeServer interface {
Events() <-chan Event
GetKubeConfig() *rest.Config
Stop()
}
Loading

0 comments on commit 6f8e14c

Please sign in to comment.