From 1ee093c99e60000ab1cd7bdb7e1c9a3936a1b248 Mon Sep 17 00:00:00 2001 From: Kebe Date: Sat, 2 Apr 2022 15:27:09 +0800 Subject: [PATCH] + add watch support for watch resources --- pkg/client/fake/fake_ckube.go | 197 +++++++++++++++++++++++++++- pkg/client/fake/fake_client_test.go | 57 ++++++++ pkg/client/fake/interface.go | 4 +- 3 files changed, 253 insertions(+), 5 deletions(-) diff --git a/pkg/client/fake/fake_ckube.go b/pkg/client/fake/fake_ckube.go index f5d0476..aaacce2 100644 --- a/pkg/client/fake/fake_ckube.go +++ b/pkg/client/fake/fake_ckube.go @@ -1,9 +1,15 @@ package fake import ( + "context" + "encoding/base64" "encoding/json" + "fmt" "github.com/DaoCloud/ckube/common" "github.com/DaoCloud/ckube/common/constants" + "github.com/DaoCloud/ckube/kube" + "github.com/DaoCloud/ckube/log" + "github.com/DaoCloud/ckube/page" "github.com/DaoCloud/ckube/server" "github.com/DaoCloud/ckube/store" "github.com/DaoCloud/ckube/store/memory" @@ -15,13 +21,17 @@ import ( "net/http" "os" "strings" + "sync" + "time" ) type fakeCkubeServer struct { - store store.Store - ser server.Server - kubeConfig *rest.Config - eventChan chan Event + store store.Store + ser server.Server + kubeConfig *rest.Config + eventChan chan Event + watchChanMap map[string]chan Event + watchChanLock sync.RWMutex } func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) { @@ -59,6 +69,7 @@ func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) { return "127.0.0.1:" + parts[1] }(), }, + watchChanMap: make(map[string]chan Event), } ser := server.NewMuxServer(listenAddr, nil, m, s.registerFakeRoute) s.ser = ser @@ -87,6 +98,17 @@ func (s *fakeCkubeServer) GetKubeConfig() *rest.Config { } func (s *fakeCkubeServer) registerFakeRoute(r *mux.Router) { + r.Use(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wq := r.URL.Query()["watch"] + if len(wq) == 1 && (wq[0] == "true" || wq[0] == "1") { + h := wh{s: s} + h.ServeHTTP(w, r) + return + } + next.ServeHTTP(w, r) + }) + }) for _, p := range []string{ "/apis/{group}/{version}/{resourceType}", "/api/{version}/{resourceType}", @@ -99,6 +121,15 @@ func (s *fakeCkubeServer) registerFakeRoute(r *mux.Router) { } { r.Path(p).Methods("POST", "PUT", "DELETE").HandlerFunc(s.proxy) } + for _, p := range []string{ + // watch + "/apis/{group}/{version}/watch/{resourceType}", + "/api/{version}/watch/{resourceType}", + "/apis/{group}/{version}/watch/namespaces/{namespace}/{resourceType}", + "/api/{version}/watch/namespaces/{namespace}/{resourceType}", + } { + r.Path(p).Methods("GET").HandlerFunc(s.watch) + } } func jsonResp(writer http.ResponseWriter, status int, v interface{}) { @@ -112,6 +143,123 @@ func errorProxy(w http.ResponseWriter, err metav1.Status) { jsonResp(w, int(err.Code), err) } +type wh struct { + s *fakeCkubeServer +} + +func (w *wh) ServeHTTP(writer http.ResponseWriter, r *http.Request) { + w.s.watch(writer, r) +} + +func (s *fakeCkubeServer) watch(writer http.ResponseWriter, r *http.Request) { + group := mux.Vars(r)["group"] + version := mux.Vars(r)["version"] + resourceType := mux.Vars(r)["resourceType"] + namespace := mux.Vars(r)["namespace"] + + query := r.URL.Query() + labelSelectorStr := "" + for k, v := range query { + switch k { + case "labelSelector": // For List options + labelSelectorStr = v[0] + } + } + paginate := page.Paginate{} + if labelSelectorStr != "" { + var err error + labels, _ := kube.ParseToLabelSelector(labelSelectorStr) + paginateStr := "" + if ps, ok := labels.MatchLabels[constants.PaginateKey]; ok { + paginateStr = ps + delete(labels.MatchLabels, constants.PaginateKey) + } else { + mes := []metav1.LabelSelectorRequirement{} + // Why we use MatchExpressions? + // to adapt dsm.daocloud.io/query=xxxx send to apiserver, which makes no results. + // if dsm.daocloud.io/query != xxx or dsm.daocloud.io/query not in (xxx), results exist even if it was sent to apiserver. + for _, m := range labels.MatchExpressions { + if m.Key == constants.PaginateKey { + if len(m.Values) > 0 { + paginateStr, err = kube.MergeValues(m.Values) + if err != nil { + errorProxy(writer, metav1.Status{ + Message: err.Error(), + Code: 400, + }) + return + } + } + } else { + mes = append(mes, m) + } + } + labels.MatchExpressions = mes + } + if paginateStr != "" { + rr, err := base64.StdEncoding.WithPadding(base64.NoPadding).DecodeString(paginateStr) + if err != nil { + errorProxy(writer, metav1.Status{ + Message: err.Error(), + Code: 400, + }) + return + } + json.Unmarshal(rr, &paginate) + delete(labels.MatchLabels, constants.PaginateKey) + } + } + + s.watchChanLock.Lock() + s.watchChanMap[r.RemoteAddr] = make(chan Event) + s.watchChanLock.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + writer.Header().Set("Content-Type", "application/json") + writer.Header().Set("Transfer-Encoding", "chunked") + writer.Header().Set("Connection", "keep-alive") + writer.(http.Flusher).Flush() + for { + select { + case <-ctx.Done(): + s.watchChanLock.Lock() + close(s.watchChanMap[r.RemoteAddr]) + delete(s.watchChanMap, r.RemoteAddr) + s.watchChanLock.Unlock() + return + case e := <-s.watchChanMap[r.RemoteAddr]: + if e.Group == group && + e.Version == version && + e.Resource == resourceType && + (e.Namespace == namespace || namespace == "") { + + if len(paginate.GetClusters()) > 0 && !func() bool { + for _, c := range paginate.GetClusters() { + if e.Cluster == c { + return true + } + } + return false + }() { + continue + } + typ := "ERROR" + switch e.EventAction { + case EventActionAdd: + typ = "ADDED" + case EventActionDelete: + typ = "DELETED" + case EventActionUpdate: + typ = "MODIFIED" + } + res := fmt.Sprintf(`{"type": %q, "object": %s}`, typ, e.Raw) + writer.Write([]byte(res + "\n")) + writer.(http.Flusher).Flush() + } + } + } +} + func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) { group := mux.Vars(r)["group"] version := mux.Vars(r)["version"] @@ -132,6 +280,11 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) { if strings.HasPrefix(v[0], constants.ClusterPrefix) { cluster = v[0][len(constants.ClusterPrefix):] } + case "watch": + if strings.ToLower(v[0]) == "true" || strings.ToLower(v[0]) == "1" { + s.watch(writer, r) + return + } } } obj := watcher.ObjType{} @@ -152,16 +305,40 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": action = EventActionAdd + if o := s.store.Get(gvr, cluster, namespace, resourceName); o != nil { + errorProxy(writer, metav1.Status{ + Message: fmt.Sprintf("resource %v %s %s/%s already exists", gvr, cluster, namespace, resourceName), + Code: 400, + }) + return + } s.store.OnResourceAdded(gvr, cluster, &obj) case "PUT": action = EventActionUpdate + if o := s.store.Get(gvr, cluster, namespace, resourceName); o == nil { + errorProxy(writer, metav1.Status{ + Message: fmt.Sprintf("resource %v %s %s/%s not found", gvr, cluster, namespace, resourceName), + Code: 404, + }) + return + } s.store.OnResourceModified(gvr, cluster, &obj) case "DELETE": + action = EventActionDelete del := metav1.DeleteOptions{} json.Unmarshal(bs, &del) if len(del.DryRun) == 1 && strings.HasPrefix(del.DryRun[0], constants.ClusterPrefix) { cluster = del.DryRun[0][len(constants.ClusterPrefix):] } + if o := s.store.Get(gvr, cluster, namespace, resourceName); o == nil { + errorProxy(writer, metav1.Status{ + Message: fmt.Sprintf("resource %v %s %s/%s not found", gvr, cluster, namespace, resourceName), + Code: 404, + }) + return + } else { + bs, _ = json.Marshal(o) + } obj.Name = resourceName obj.Namespace = namespace s.store.OnResourceDeleted(gvr, cluster, &obj) @@ -171,12 +348,24 @@ func (s *fakeCkubeServer) proxy(writer http.ResponseWriter, r *http.Request) { Group: group, Version: version, Resource: resourceType, + Cluster: cluster, Namespace: namespace, Name: resourceName, + Raw: string(bs), } select { case s.eventChan <- e: default: } + s.watchChanLock.RLock() + for remote, c := range s.watchChanMap { + select { + case c <- e: + log.Debugf("succeed send stream to %s", remote) + default: + log.Infof("remote watcher %s no active stream", remote) + } + } + s.watchChanLock.RUnlock() jsonResp(writer, 200, obj) } diff --git a/pkg/client/fake/fake_client_test.go b/pkg/client/fake/fake_client_test.go index edb0025..55186c6 100644 --- a/pkg/client/fake/fake_client_test.go +++ b/pkg/client/fake/fake_client_test.go @@ -7,6 +7,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "sync" "testing" ) @@ -131,8 +132,64 @@ func TestNewFakeCKubeServer(t *testing.T) { Group: "", Version: "v1", Resource: "pods", + Cluster: "c1", Namespace: "test", Name: "pod1", + Raw: "{\"kind\":\"Pod\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"pod1\",\"namespace\":\"test\",\"creationTimestamp\":null},\"spec\":{\"containers\":null,\"dnsPolicy\":\"ClusterFirst\"},\"status\":{}}\n", + }, + }, events) + }) + t.Run("watch", func(t *testing.T) { + events := []Event{} + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + p := page.Paginate{} + p.Clusters([]string{"c1"}) + lopts, _ := page.QueryListOptions(metav1.ListOptions{}, p) + w, err := cli.CoreV1().Pods("test").Watch(context.Background(), lopts) + assert.NoError(t, err) + wg.Done() + for { + select { + case e := <-w.ResultChan(): + pod := e.Object.(*v1.Pod) + events = append(events, Event{ + Cluster: page.GetObjectCluster(pod), + Namespace: pod.Namespace, + Name: pod.Name, + }) + } + } + }() + wg.Wait() + coptc1, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c1") + _, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Namespace: "test", + }, + Spec: v1.PodSpec{ + DNSPolicy: "ClusterFirst", + }, + }, coptc1) + coptc2, _ := page.QueryCreateOptions(metav1.CreateOptions{}, "c2") + _, err = cli.CoreV1().Pods("test").Create(context.Background(), &v1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod4", + Namespace: "test", + }, + Spec: v1.PodSpec{ + DNSPolicy: "ClusterFirst", + }, + }, coptc2) + assert.NoError(t, err) + assert.Equal(t, []Event{ + { + Namespace: "test", + Name: "pod3", }, }, events) }) diff --git a/pkg/client/fake/interface.go b/pkg/client/fake/interface.go index 04cff27..941e8d7 100644 --- a/pkg/client/fake/interface.go +++ b/pkg/client/fake/interface.go @@ -8,7 +8,7 @@ const ( EventActionError EventAction = 0 EventActionAdd EventAction = 1 EventActionUpdate EventAction = 2 - EventActionDelete EventAction = 2 + EventActionDelete EventAction = 3 ) type Event struct { @@ -16,8 +16,10 @@ type Event struct { Group string Version string Resource string + Cluster string Namespace string Name string + Raw string } type CkubeServer interface {