From 097cbc7b57666e69d0a22f5a907d8c03e24d9e61 Mon Sep 17 00:00:00 2001 From: kingsylin Date: Mon, 23 Dec 2019 11:26:27 +0800 Subject: [PATCH 1/2] support for nacos as backend, Nacos: Dynamic Naming and Configuration Service , it provider by alibaba.inc . see the website http://nacos.io/ for more about nacos --- backends/client.go | 3 + backends/config.go | 50 ++++++----- backends/nacos/client.go | 184 +++++++++++++++++++++++++++++++++++++++ config.go | 8 +- 4 files changed, 222 insertions(+), 23 deletions(-) create mode 100644 backends/nacos/client.go diff --git a/backends/client.go b/backends/client.go index 1e1bff119..e92f549bb 100644 --- a/backends/client.go +++ b/backends/client.go @@ -2,6 +2,7 @@ package backends import ( "errors" + "github.com/kelseyhightower/confd/backends/nacos" "strings" "github.com/kelseyhightower/confd/backends/consul" @@ -85,6 +86,8 @@ func New(config Config) (StoreClient, error) { return dynamodb.NewDynamoDBClient(table) case "ssm": return ssm.New() + case "nacos": + return nacos.NewNacosClient(backendNodes, config.Group) } return nil, errors.New("Invalid backend") } diff --git a/backends/config.go b/backends/config.go index 9f58127bd..a338c2ce6 100644 --- a/backends/config.go +++ b/backends/config.go @@ -5,26 +5,32 @@ import ( ) type Config struct { - AuthToken string `toml:"auth_token"` - AuthType string `toml:"auth_type"` - Backend string `toml:"backend"` - BasicAuth bool `toml:"basic_auth"` - ClientCaKeys string `toml:"client_cakeys"` - ClientCert string `toml:"client_cert"` - ClientKey string `toml:"client_key"` - ClientInsecure bool `toml:"client_insecure"` - BackendNodes util.Nodes `toml:"nodes"` - Password string `toml:"password"` - Scheme string `toml:"scheme"` - Table string `toml:"table"` - Separator string `toml:"separator"` - Username string `toml:"username"` - AppID string `toml:"app_id"` - UserID string `toml:"user_id"` - RoleID string `toml:"role_id"` - SecretID string `toml:"secret_id"` - YAMLFile util.Nodes `toml:"file"` - Filter string `toml:"filter"` - Path string `toml:"path"` - Role string + AuthToken string `toml:"auth_token"` + AuthType string `toml:"auth_type"` + Backend string `toml:"backend"` + BasicAuth bool `toml:"basic_auth"` + ClientCaKeys string `toml:"client_cakeys"` + ClientCert string `toml:"client_cert"` + ClientKey string `toml:"client_key"` + ClientInsecure bool `toml:"client_insecure"` + BackendNodes util.Nodes `toml:"nodes"` + Password string `toml:"password"` + Scheme string `toml:"scheme"` + Table string `toml:"table"` + Separator string `toml:"separator"` + Username string `toml:"username"` + AppID string `toml:"app_id"` + UserID string `toml:"user_id"` + RoleID string `toml:"role_id"` + SecretID string `toml:"secret_id"` + YAMLFile util.Nodes `toml:"file"` + Filter string `toml:"filter"` + Path string `toml:"path"` + Role string + + Group string `toml:"group"` + Endpoint string `toml:"endpoint"` + Namespace string `toml:"namespace"` + AccessKey string `toml:"accessKey"` + SecretKey string `toml:"secretKey"` } diff --git a/backends/nacos/client.go b/backends/nacos/client.go new file mode 100644 index 000000000..ae27014d0 --- /dev/null +++ b/backends/nacos/client.go @@ -0,0 +1,184 @@ +package nacos + +import ( + "crypto/md5" + "encoding/json" + "fmt" + "github.com/kelseyhightower/confd/log" + "io/ioutil" + "math/rand" + "net/http" + "strings" + "time" +) + +var replacer = strings.NewReplacer("/", ".") + +type Client struct { + nodes []string + group string + channel chan int +} +type ConfigValue struct { + key string + md5 string + content string +} + +var configCache = map[string]*ConfigValue{} + +func NewNacosClient(nodes []string, group string) (client *Client, err error) { + client = &Client{nodes, group, make(chan int, 10)} + return +} +func (client *Client) GetValues(keys []string) (map[string]string, error) { + vars := make(map[string]string) + for _, key := range keys { + k := strings.TrimPrefix(key, "/") + k = replacer.Replace(k) + // get instance list api when key has prefix naming + if strings.HasPrefix(k, "naming:") { + k = strings.TrimPrefix(k, "naming:") + // if cache exists + oldConfig, exist := configCache[k] + if exist { + vars[key] = oldConfig.content + continue + } + path := fmt.Sprintf("/nacos/v1/ns/instance/list?healthyOnly=false&groupName=%s&serviceName=%s", client.group, k) + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Get(client.buildUrl(path)) + if err != nil { + return vars, err + } + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + var instanceList InstanceList + err = json.Unmarshal(body, &instanceList) + if err != nil { + return vars, err + } + bytes, err := json.Marshal(instanceList.Hosts) + if err != nil { + return vars, err + } + md5Val := fmt.Sprintf("%x", md5.Sum(bytes)) + newConfig := &ConfigValue{k, md5Val, string(bytes)} + configCache[k] = newConfig + vars[key] = newConfig.content + log.Info("service instances updated for %s ", k) + } else { + config, exist := configCache[k] + if exist { + vars[key] = config.content + continue + } + // get config + path := fmt.Sprintf("/nacos/v1/cs/configs?group=%s&dataId=%s", client.group, k) + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Get(client.buildUrl(path)) + if err != nil { + return vars, err + } + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + md5Val := resp.Header.Get("Content-MD5") + newVal := &ConfigValue{k, md5Val, string(body)} + configCache[k] = newVal + log.Info("config value updated for %s %", k, newVal.content) + vars[key] = newVal.content + } + } + return vars, nil +} +func (client *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) { + if waitIndex == 0 { + return 1, nil + } + httpClient := &http.Client{Timeout: 5 * time.Second} + for { + for _, key := range keys { + k := strings.TrimPrefix(key, "/") + k = replacer.Replace(k) + if strings.HasPrefix(k, "naming:") { + k = strings.TrimPrefix(k, "naming:") + path := fmt.Sprintf("/nacos/v1/ns/instance/list?healthyOnly=false&groupName=%s&serviceName=%s", client.group, k) + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Get(client.buildUrl(path)) + if err != nil { + continue + } + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + var instanceList InstanceList + err = json.Unmarshal(body, &instanceList) + if err != nil { + continue + } + bytes, err := json.Marshal(instanceList.Hosts) + if err != nil { + continue + } + md5Val := fmt.Sprintf("%x", md5.Sum(bytes)) + oldConfig, exist := configCache[k] + if exist && strings.Compare(md5Val, oldConfig.md5) != 0 { + log.Info("instances of service [%s] has changed", k) + newConfig := &ConfigValue{k, md5Val, string(bytes)} + configCache[k] = newConfig + return waitIndex, nil + } else { + newConfig := &ConfigValue{k, md5Val, string(bytes)} + configCache[k] = newConfig + } + } else { + body := "Listening-Configs=" + body += k + string(2) + body += client.group + string(2) + body += configCache[k].md5 + string(2) + body += "" + string(1) + // long pulling for listener + path := "/nacos/v1/cs/configs/listener" + req, _ := http.NewRequest("POST", client.buildUrl(path), strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Long-Pulling-Timeout", "100") + resp, err := httpClient.Do(req) + if err != nil { + continue + } + defer resp.Body.Close() + respBody, _ := ioutil.ReadAll(resp.Body) + // get result and it's not empty + result := string(respBody) + if len(result) > 0 { + log.Info("config [%s] has changed", k) + delete(configCache, k) + return waitIndex, nil + } + } + } + time.Sleep(time.Microsecond * 500) + } + return waitIndex, nil +} + +func (client *Client) buildUrl(path string) string { + n := rand.Intn(len(client.nodes)) + node := client.nodes[n] + return node + path +} + +type InstanceList struct { + Name string `json:"name"` + Hosts []InstanceHost `json:"hosts"` +} +type InstanceHost struct { + InstanceId string `json:"instanceId"` + ServiceName string `json:"serviceName"` + Ip string `json:"ip"` + Port int `json:"port"` + Weight float32 `json:"weight"` + Enabled bool `json:"enabled"` + Healthy bool `json:"healthy"` + Valid bool `json:"valid"` + Metadata map[string]string `json:"metadata"` +} diff --git a/config.go b/config.go index dcd36d5c1..dd7a9c79a 100644 --- a/config.go +++ b/config.go @@ -33,6 +33,7 @@ type Config struct { PrintVersion bool ConfigFile string OneTime bool + GRou bool } var config Config @@ -44,7 +45,7 @@ func init() { flag.StringVar(&config.ClientCaKeys, "client-ca-keys", "", "client ca keys") flag.StringVar(&config.ClientCert, "client-cert", "", "the client cert") flag.StringVar(&config.ClientKey, "client-key", "", "the client key") - flag.BoolVar(&config.ClientInsecure, "client-insecure", false, "Allow connections to SSL sites without certs (only used with -backend=etcd)") + flag.BoolVar(&config.ClientInsecure, "client-insecure", false, "Allow connections to SSL sites without certs (only used with -backend=etcd)") flag.StringVar(&config.ConfDir, "confdir", "/etc/confd", "confd conf directory") flag.StringVar(&config.ConfigFile, "config-file", "/etc/confd/confd.toml", "the confd config file") flag.Var(&config.YAMLFile, "file", "the YAML file to watch for changes (only used with -backend=file)") @@ -73,6 +74,11 @@ func init() { flag.StringVar(&config.Username, "username", "", "the username to authenticate as (only used with vault and etcd backends)") flag.StringVar(&config.Password, "password", "", "the password to authenticate with (only used with vault and etcd backends)") flag.BoolVar(&config.Watch, "watch", false, "enable watch support") + + flag.StringVar(&config.Group, "group", "DEFAULT_GROUP", "the group in nacos (only used with nacos backends)") + flag.StringVar(&config.Namespace, "namespace", "", "the namespace in nacos (only used with nacos backends)") + flag.StringVar(&config.AccessKey, "accessKey", "", "the accessKey to authenticate in nacos (only used with nacos backends)") + flag.StringVar(&config.SecretKey, "secretKey", "", "the secretKey to authenticate in nacos (only used with nacos backends)") } // initConfig initializes the confd configuration by first setting defaults, From f633e6daa3ba8c3bbbc08641ab50bf8a01340e9a Mon Sep 17 00:00:00 2001 From: kingsylin Date: Mon, 23 Dec 2019 21:15:01 +0800 Subject: [PATCH 2/2] add function for parse yaml --- backends/nacos/client.go | 2 +- resource/template/template_funcs.go | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/backends/nacos/client.go b/backends/nacos/client.go index ae27014d0..9de99fad9 100644 --- a/backends/nacos/client.go +++ b/backends/nacos/client.go @@ -85,7 +85,7 @@ func (client *Client) GetValues(keys []string) (map[string]string, error) { md5Val := resp.Header.Get("Content-MD5") newVal := &ConfigValue{k, md5Val, string(body)} configCache[k] = newVal - log.Info("config value updated for %s %", k, newVal.content) + log.Info("config value updated for %s", k) vars[key] = newVal.content } } diff --git a/resource/template/template_funcs.go b/resource/template/template_funcs.go index 9a5d94f29..107d13448 100644 --- a/resource/template/template_funcs.go +++ b/resource/template/template_funcs.go @@ -13,8 +13,9 @@ import ( "strings" "time" - util "github.com/kelseyhightower/confd/util" + "github.com/kelseyhightower/confd/util" "github.com/kelseyhightower/memkv" + "gopkg.in/yaml.v2" ) func newFuncMap() map[string]interface{} { @@ -23,6 +24,7 @@ func newFuncMap() map[string]interface{} { m["split"] = strings.Split m["json"] = UnmarshalJsonObject m["jsonArray"] = UnmarshalJsonArray + m["yaml"] = UnmarshalYaml m["dir"] = path.Dir m["map"] = CreateMap m["getenv"] = Getenv @@ -169,6 +171,12 @@ func UnmarshalJsonArray(data string) ([]interface{}, error) { return ret, err } +func UnmarshalYaml(data string) (map[string]interface{}, error) { + var ret map[string]interface{} + err := yaml.Unmarshal([]byte(strings.ReplaceAll(data, "---", "")), &ret) + return ret, err +} + func LookupIP(data string) []string { ips, err := net.LookupIP(data) if err != nil {