Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
+ add watch support for watch resources
Browse files Browse the repository at this point in the history
  • Loading branch information
kebe7jun committed Apr 2, 2022
1 parent d105523 commit 1ee093c
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 5 deletions.
197 changes: 193 additions & 4 deletions pkg/client/fake/fake_ckube.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package fake

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/DaoCloud/ckube/common"
"github.com/DaoCloud/ckube/common/constants"
"github.com/DaoCloud/ckube/kube"
"github.com/DaoCloud/ckube/log"
"github.com/DaoCloud/ckube/page"
"github.com/DaoCloud/ckube/server"
"github.com/DaoCloud/ckube/store"
"github.com/DaoCloud/ckube/store/memory"
Expand All @@ -15,13 +21,17 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
)

type fakeCkubeServer struct {
store store.Store
ser server.Server
kubeConfig *rest.Config
eventChan chan Event
store store.Store
ser server.Server
kubeConfig *rest.Config
eventChan chan Event
watchChanMap map[string]chan Event
watchChanLock sync.RWMutex
}

func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) {
Expand Down Expand Up @@ -59,6 +69,7 @@ func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) {
return "127.0.0.1:" + parts[1]
}(),
},
watchChanMap: make(map[string]chan Event),
}
ser := server.NewMuxServer(listenAddr, nil, m, s.registerFakeRoute)
s.ser = ser
Expand Down Expand Up @@ -87,6 +98,17 @@ func (s *fakeCkubeServer) GetKubeConfig() *rest.Config {
}

func (s *fakeCkubeServer) registerFakeRoute(r *mux.Router) {
r.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wq := r.URL.Query()["watch"]
if len(wq) == 1 && (wq[0] == "true" || wq[0] == "1") {
h := wh{s: s}
h.ServeHTTP(w, r)
return
}
next.ServeHTTP(w, r)
})
})
for _, p := range []string{
"/apis/{group}/{version}/{resourceType}",
"/api/{version}/{resourceType}",
Expand All @@ -99,6 +121,15 @@ func (s *fakeCkubeServer) registerFakeRoute(r *mux.Router) {
} {
r.Path(p).Methods("POST", "PUT", "DELETE").HandlerFunc(s.proxy)
}
for _, p := range []string{
// watch
"/apis/{group}/{version}/watch/{resourceType}",
"/api/{version}/watch/{resourceType}",
"/apis/{group}/{version}/watch/namespaces/{namespace}/{resourceType}",
"/api/{version}/watch/namespaces/{namespace}/{resourceType}",
} {
r.Path(p).Methods("GET").HandlerFunc(s.watch)
}
}

func jsonResp(writer http.ResponseWriter, status int, v interface{}) {
Expand All @@ -112,6 +143,123 @@ func errorProxy(w http.ResponseWriter, err metav1.Status) {
jsonResp(w, int(err.Code), err)
}

type wh struct {
s *fakeCkubeServer
}

func (w *wh) ServeHTTP(writer http.ResponseWriter, r *http.Request) {
w.s.watch(writer, r)
}

func (s *fakeCkubeServer) watch(writer http.ResponseWriter, r *http.Request) {
group := mux.Vars(r)["group"]
version := mux.Vars(r)["version"]
resourceType := mux.Vars(r)["resourceType"]
namespace := mux.Vars(r)["namespace"]

query := r.URL.Query()
labelSelectorStr := ""
for k, v := range query {
switch k {
case "labelSelector": // For List options
labelSelectorStr = v[0]
}
}
paginate := page.Paginate{}
if labelSelectorStr != "" {
var err error
labels, _ := kube.ParseToLabelSelector(labelSelectorStr)
paginateStr := ""
if ps, ok := labels.MatchLabels[constants.PaginateKey]; ok {
paginateStr = ps
delete(labels.MatchLabels, constants.PaginateKey)
} else {
mes := []metav1.LabelSelectorRequirement{}
// Why we use MatchExpressions?
// to adapt dsm.daocloud.io/query=xxxx send to apiserver, which makes no results.
// if dsm.daocloud.io/query != xxx or dsm.daocloud.io/query not in (xxx), results exist even if it was sent to apiserver.
for _, m := range labels.MatchExpressions {
if m.Key == constants.PaginateKey {
if len(m.Values) > 0 {
paginateStr, err = kube.MergeValues(m.Values)
if err != nil {
errorProxy(writer, metav1.Status{
Message: err.Error(),
Code: 400,
})
return
}
}
} else {
mes = append(mes, m)
}
}
labels.MatchExpressions = mes
}
if paginateStr != "" {
rr, err := base64.StdEncoding.WithPadding(base64.NoPadding).DecodeString(paginateStr)
if err != nil {
errorProxy(writer, metav1.Status{
Message: err.Error(),
Code: 400,
})
return
}
json.Unmarshal(rr, &paginate)
delete(labels.MatchLabels, constants.PaginateKey)
}
}

s.watchChanLock.Lock()
s.watchChanMap[r.RemoteAddr] = make(chan Event)
s.watchChanLock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
writer.Header().Set("Content-Type", "application/json")
writer.Header().Set("Transfer-Encoding", "chunked")
writer.Header().Set("Connection", "keep-alive")
writer.(http.Flusher).Flush()
for {
select {
case <-ctx.Done():
s.watchChanLock.Lock()
close(s.watchChanMap[r.RemoteAddr])
delete(s.watchChanMap, r.RemoteAddr)
s.watchChanLock.Unlock()
return
case e := <-s.watchChanMap[r.RemoteAddr]:
if e.Group == group &&
e.Version == version &&
e.Resource == resourceType &&
(e.Namespace == namespace || namespace == "") {

if len(paginate.GetClusters()) > 0 && !func() bool {
for _, c := range paginate.GetClusters() {
if e.Cluster == c {
return true
}
}
return false
}() {
continue
}
typ := "ERROR"
switch e.EventAction {
case EventActionAdd:
typ = "ADDED"
case EventActionDelete:
typ = "DELETED"
case EventActionUpdate:
typ = "MODIFIED"
}
res := fmt.Sprintf(`{"type": %q, "object": %s}`, typ, e.Raw)
writer.Write([]byte(res + "\n"))
writer.(http.Flusher).Flush()
}
}
}
}

func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) {
group := mux.Vars(r)["group"]
version := mux.Vars(r)["version"]
Expand All @@ -132,6 +280,11 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(v[0], constants.ClusterPrefix) {
cluster = v[0][len(constants.ClusterPrefix):]
}
case "watch":
if strings.ToLower(v[0]) == "true" || strings.ToLower(v[0]) == "1" {
s.watch(writer, r)
return
}
}
}
obj := watcher.ObjType{}
Expand All @@ -152,16 +305,40 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
action = EventActionAdd
if o := s.store.Get(gvr, cluster, namespace, resourceName); o != nil {
errorProxy(writer, metav1.Status{
Message: fmt.Sprintf("resource %v %s %s/%s already exists", gvr, cluster, namespace, resourceName),
Code: 400,
})
return
}
s.store.OnResourceAdded(gvr, cluster, &obj)
case "PUT":
action = EventActionUpdate
if o := s.store.Get(gvr, cluster, namespace, resourceName); o == nil {
errorProxy(writer, metav1.Status{
Message: fmt.Sprintf("resource %v %s %s/%s not found", gvr, cluster, namespace, resourceName),
Code: 404,
})
return
}
s.store.OnResourceModified(gvr, cluster, &obj)
case "DELETE":
action = EventActionDelete
del := metav1.DeleteOptions{}
json.Unmarshal(bs, &del)
if len(del.DryRun) == 1 && strings.HasPrefix(del.DryRun[0], constants.ClusterPrefix) {
cluster = del.DryRun[0][len(constants.ClusterPrefix):]
}
if o := s.store.Get(gvr, cluster, namespace, resourceName); o == nil {
errorProxy(writer, metav1.Status{
Message: fmt.Sprintf("resource %v %s %s/%s not found", gvr, cluster, namespace, resourceName),
Code: 404,
})
return
} else {
bs, _ = json.Marshal(o)
}
obj.Name = resourceName
obj.Namespace = namespace
s.store.OnResourceDeleted(gvr, cluster, &obj)
Expand All @@ -171,12 +348,24 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) {
Group: group,
Version: version,
Resource: resourceType,
Cluster: cluster,
Namespace: namespace,
Name: resourceName,
Raw: string(bs),
}
select {
case s.eventChan <- e:
default:
}
s.watchChanLock.RLock()
for remote, c := range s.watchChanMap {
select {
case c <- e:
log.Debugf("succeed send stream to %s", remote)
default:
log.Infof("remote watcher %s no active stream", remote)
}
}
s.watchChanLock.RUnlock()
jsonResp(writer, 200, obj)
}
57 changes: 57 additions & 0 deletions pkg/client/fake/fake_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sync"
"testing"
)

Expand Down Expand Up @@ -131,8 +132,64 @@ func TestNewFakeCKubeServer(t *testing.T) {
Group: "",
Version: "v1",
Resource: "pods",
Cluster: "c1",
Namespace: "test",
Name: "pod1",
Raw: "{\"kind\":\"Pod\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"pod1\",\"namespace\":\"test\",\"creationTimestamp\":null},\"spec\":{\"containers\":null,\"dnsPolicy\":\"ClusterFirst\"},\"status\":{}}\n",
},
}, events)
})
t.Run("watch", func(t *testing.T) {
events := []Event{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
p := page.Paginate{}
p.Clusters([]string{"c1"})
lopts, _ := page.QueryListOptions(metav1.ListOptions{}, p)
w, err := cli.CoreV1().Pods("test").Watch(context.Background(), lopts)
assert.NoError(t, err)
wg.Done()
for {
select {
case e := <-w.ResultChan():
pod := e.Object.(*v1.Pod)
events = append(events, Event{
Cluster: page.GetObjectCluster(pod),
Namespace: pod.Namespace,
Name: pod.Name,
})
}
}
}()
wg.Wait()
coptc1, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c1")
_, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "ClusterFirst",
},
}, coptc1)
coptc2, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c2")
_, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
Namespace: "test",
},
Spec: v1.PodSpec{
DNSPolicy: "ClusterFirst",
},
}, coptc2)
assert.NoError(t, err)
assert.Equal(t, []Event{
{
Namespace: "test",
Name: "pod3",
},
}, events)
})
Expand Down
4 changes: 3 additions & 1 deletion pkg/client/fake/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ const (
EventActionError EventAction = 0
EventActionAdd EventAction = 1
EventActionUpdate EventAction = 2
EventActionDelete EventAction = 2
EventActionDelete EventAction = 3
)

type Event struct {
EventAction
Group string
Version string
Resource string
Cluster string
Namespace string
Name string
Raw string
}

type CkubeServer interface {
Expand Down

0 comments on commit 1ee093c

Please sign in to comment.