Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #7 from DaoCloud/fix/watcher-cluster-error
Browse files Browse the repository at this point in the history
> fix cluster error in watch and labelSelector may be <none>
  • Loading branch information
kebe7jun authored Jul 4, 2022
2 parents 5cf4ec9 + 9ffa6d2 commit 5e7f6c1
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 163 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
if: github.event_name == 'pull_request'
uses: golangci/golangci-lint-action@v2
with:
version: v1.29
version: v1.46.2
go-unit-test:
runs-on: ubuntu-20.04
needs: [style-check]
Expand Down
6 changes: 3 additions & 3 deletions api/extend/deploy2service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Deploy2Service(r *api.ReqContext) interface{} {
cluster = common.GetConfig().DefaultCluster
}
p := page.Paginate{Search: "name=" + dep}
p.Clusters([]string{cluster})
_ = p.Clusters([]string{cluster})
res := r.Store.Query(podGvr, store.Query{
Namespace: ns,
Paginate: p,
Expand All @@ -54,7 +54,7 @@ func Deploy2Service(r *api.ReqContext) interface{} {
}
}
p = page.Paginate{}
p.Clusters([]string{cluster})
_ = p.Clusters([]string{cluster})
res = r.Store.Query(svcGvr, store.Query{
Namespace: ns,
Paginate: p,
Expand All @@ -66,7 +66,7 @@ func Deploy2Service(r *api.ReqContext) interface{} {
svc := &v1.Service{}
if s, ok := svcIf.(*watcher.ObjType); ok {
bs, _ := json.Marshal(s)
json.Unmarshal(bs, svc)
_ = json.Unmarshal(bs, svc)
} else {
svc = svcIf.(*v1.Service)
}
Expand Down
26 changes: 16 additions & 10 deletions api/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS
var paginate page.Paginate
var labelSelectorStr string
clusterPrefix := constants.ClusterPrefix
cluster := common.GetConfig().DefaultCluster
cluster := ""
query := r.URL.Query()
for k, v := range query {
switch k {
Expand All @@ -121,7 +121,7 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS
opts, err := ioutil.ReadAll(body)
if err == nil {
options := v1.DeleteOptions{}
json.Unmarshal(opts, &options)
_ = json.Unmarshal(opts, &options)
if len(options.DryRun) > 0 && strings.HasPrefix(options.DryRun[0], clusterPrefix) {
cluster = options.DryRun[0][len(clusterPrefix):]
options.DryRun = options.DryRun[1:]
Expand Down Expand Up @@ -166,30 +166,36 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS
if err != nil {
return nil, labels, cluster, err
}
json.Unmarshal(rr, &paginate)
_ = json.Unmarshal(rr, &paginate)
delete(labels.MatchLabels, constants.PaginateKey)
}
query.Set("labelSelector", v1.FormatLabelSelector(labels))
if len(labels.MatchLabels) != 0 || len(labels.MatchExpressions) != 0 {
// if labelSelectorStr is empty, v1.FormatLabelSelector will return "<none>", so we should not use it.
query.Set("labelSelector", v1.FormatLabelSelector(labels))
}
}
r.URL.RawQuery = query.Encode()
if cs := paginate.GetClusters(); len(cs) > 0 && cluster == "" {
cluster = cs[0]
}
if cluster == "" {
cluster = common.GetConfig().DefaultCluster
}
return &paginate, labels, cluster, nil
}

func Proxy(r *ReqContext) interface{} {
// version := mux.Vars(r.Request)["version"]
namespace := mux.Vars(r.Request)["namespace"]
resourceName := mux.Vars(r.Request)["resource"]
gvr := getGVRFromReq(r.Request)
paginate, labels, cluster, err := parsePaginateAndLabelsAndClean(r.Request)
if err != nil {
return proxyPass(r, cluster)
}
if cluster == "" {
cluster = common.GetConfig().DefaultCluster
}
gvr := getGVRFromReq(r.Request)
for k, v := range r.Request.URL.Query() {
switch k {
case "labelSelector":
Expand Down Expand Up @@ -223,7 +229,7 @@ func Proxy(r *ReqContext) interface{} {
log.Debugf("got paginate %v", paginate)

items := make([]interface{}, 0)
var total int64 = 0
var total int64
if labels != nil && (len(labels.MatchLabels) != 0 || len(labels.MatchExpressions) != 0) {
// exists label selector
res := r.Store.Query(gvr, store.Query{
Expand Down Expand Up @@ -260,7 +266,7 @@ func Proxy(r *ReqContext) interface{} {

// manually slice items
var l = int64(len(items))
var start, end int64 = 0, 0
var start, end int64
if paginate.PageSize == 0 || paginate.Page == 0 {
// all resources
start = 0
Expand Down Expand Up @@ -346,7 +352,7 @@ func serverPrint(items []interface{}) interface{} {
continue
}
indexes := map[string]string{}
json.Unmarshal([]byte(indexesStr), &indexes)
_ = json.Unmarshal([]byte(indexesStr), &indexes)
if i == 0 {
commonCols := []string{"cluster", "namespace", "name"}
if _, ok := indexes["namespace"]; !ok {
Expand Down Expand Up @@ -440,12 +446,12 @@ func proxyPassWatch(r *ReqContext, cluster string) interface{} {
t := make([]byte, 1)
_, err := reader.Read(t)
if err != nil {
r.Writer.Write(buf.Bytes())
_, _ = r.Writer.Write(buf.Bytes())
return nil
}
buf.Write(t)
if t[0] == '\n' {
r.Writer.Write(buf.Bytes())
_, _ = r.Writer.Write(buf.Bytes())
buf.Reset()
}
select {
Expand Down
12 changes: 3 additions & 9 deletions api/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"net/http"
"testing"

"github.com/DaoCloud/ckube/common"
"github.com/DaoCloud/ckube/store"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/DaoCloud/ckube/common"
"github.com/DaoCloud/ckube/store"
)

type fakeWriter struct {
Expand Down Expand Up @@ -63,13 +64,6 @@ var podsMap = map[string]string{
"resourceType": "pods",
}

var nsMap = map[string]string{
"namespace": "default",
"group": "",
"version": "v1",
"resourceType": "pods",
}

func podsInterfaces(pods []v1.Pod) []interface{} {
a := []interface{}{}
for _, p := range pods {
Expand Down
66 changes: 32 additions & 34 deletions cmd/cacheproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kubeapi "k8s.io/client-go/tools/clientcmd/api/v1"
"sigs.k8s.io/yaml"

"github.com/DaoCloud/ckube/common"
"github.com/DaoCloud/ckube/log"
"github.com/DaoCloud/ckube/server"
Expand All @@ -12,14 +22,6 @@ import (
"github.com/DaoCloud/ckube/utils"
"github.com/DaoCloud/ckube/utils/prommonitor"
"github.com/DaoCloud/ckube/watcher"
"io/ioutil"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kubeapi "k8s.io/client-go/tools/clientcmd/api/v1"
"os"
"path"
"sigs.k8s.io/yaml"
)

func GetK8sConfigConfigWithFile(kubeconfig, context string) *rest.Config {
Expand Down Expand Up @@ -156,7 +158,7 @@ func loadFromConfig(kubeConfig, configFile string) (map[string]kubernetes.Interf
}
m := memory.NewMemoryStore(indexConf)
w := watcher.NewWatcher(clusterConfigs, storeGVRConfig, m)
w.Start()
_ = w.Start()
return clusterClients, w, m, nil
}

Expand Down Expand Up @@ -195,33 +197,29 @@ func main() {
}
defer fixedWatcher.Close()
go func() {
for {
select {
case e := <-fixedWatcher.Events():
log.Infof("get file watcher event: %v", e)
switch e.Type {
case utils.EventTypeChanged:
// do reload
case utils.EventTypeError:
log.Errorf("got file watcher error type: file: %s", e.Name)
break
// do reload
}
clis, rw, rs, err := loadFromConfig(kubeConfig, configFile)
if err != nil {
prommonitor.ConfigReload.WithLabelValues("failed").Inc()
log.Errorf("watcher: reload config error: %v", err)
continue
}
prommonitor.Resources.Reset()
w.Stop()
w = rw
ser.ResetStore(rs, clis) // reset store
prommonitor.ConfigReload.WithLabelValues("success").Inc()
log.Infof("auto reloaded config successfully")
for e := range fixedWatcher.Events() {
log.Infof("get file watcher event: %v", e)
switch e.Type {
case utils.EventTypeChanged:
// do reload
case utils.EventTypeError:
log.Errorf("got file watcher error type: file: %s", e.Name)
// do reload
}
clis, rw, rs, err := loadFromConfig(kubeConfig, configFile)
if err != nil {
prommonitor.ConfigReload.WithLabelValues("failed").Inc()
log.Errorf("watcher: reload config error: %v", err)
continue
}
prommonitor.Resources.Reset()
_ = w.Stop()
w = rw
ser.ResetStore(rs, clis) // reset store
prommonitor.ConfigReload.WithLabelValues("success").Inc()
log.Infof("auto reloaded config successfully")
}
}()
}
ser.Run()
_ = ser.Run()
}
18 changes: 10 additions & 8 deletions cmd/ckube-plugin/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package main

import (
"github.com/DaoCloud/ckube/log"
"github.com/DaoCloud/ckube/page"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"os/exec"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/DaoCloud/ckube/log"
"github.com/DaoCloud/ckube/page"
)

const (
Expand All @@ -28,7 +30,7 @@ func main() {
typ = get
case "create":
typ = create
//case "delete":
// case "delete":
// typ = del
}
if a == "--clusters" {
Expand Down Expand Up @@ -57,7 +59,7 @@ func main() {
switch typ {
case get:
p := page.Paginate{}
p.Clusters(cs)
_ = p.Clusters(cs)
selector := ""
if selectorPos != 0 {
selector = args[selectorPos]
Expand All @@ -76,15 +78,15 @@ func main() {
os.Exit(2)
}
p := page.Paginate{}
p.Clusters(cs)
_ = p.Clusters(cs)
o, _ := page.QueryCreateOptions(metav1.CreateOptions{}, cs[0])
args = append(args, "--field-manager", o.FieldManager)
}
}
c := exec.Command("kubectl", args...)
//fmt.Printf("args %v\n", args)
// fmt.Printf("args %v\n", args)
c.Stdin = os.Stdin
c.Stdout = os.Stdout
c.Stderr = os.Stderr
c.Run()
_ = c.Run()
}
5 changes: 3 additions & 2 deletions cmd/ckubecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"os"
"strings"

"github.com/DaoCloud/ckube/page"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/DaoCloud/ckube/page"
)

func main() {
Expand All @@ -30,7 +31,7 @@ func main() {
}
if clusters != "" {
ccs := strings.Split(clusters, ",")
p.Clusters(ccs)
_ = p.Clusters(ccs)
}
o, err := page.QueryListOptions(v1.ListOptions{}, p)
if err != nil {
Expand Down
Loading

0 comments on commit 5e7f6c1

Please sign in to comment.