Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: viper sup more namespace #83

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ go 1.13

require (
github.com/bketelsen/crypt v0.0.4
github.com/hashicorp/hcl v1.0.0
github.com/magiconair/properties v1.8.5
github.com/pelletier/go-toml v1.9.3
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0
github.com/subosito/gotenv v1.2.0
gopkg.in/go-playground/assert.v1 v1.2.1
gopkg.in/yaml.v2 v2.4.0
)
153 changes: 140 additions & 13 deletions viper-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package remote

import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"

crypt "github.com/bketelsen/crypt/config"
"github.com/shima-park/agollo"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
)

var (
Expand Down Expand Up @@ -49,7 +54,9 @@ func SetAgolloOptions(opts ...agollo.Option) {

type viperConfigManager interface {
Get(key string) ([]byte, error)
GetMultipleNamespaces(keys []string) ([]byte, error)
Watch(key string, stop chan bool) <-chan *viper.RemoteResponse
WatchMultipleNamespaces(keys []string, stop chan bool) <-chan *viper.RemoteResponse
}

type apolloConfigManager struct {
Expand Down Expand Up @@ -99,6 +106,68 @@ func (cm apolloConfigManager) Get(namespace string) ([]byte, error) {
return marshalConfigs(getConfigType(namespace), configs)
}

func (cm apolloConfigManager) GetMultipleNamespaces(namespaces []string) ([]byte, error) {
newConfigs := make(map[string]interface{})
var configType string
var b []byte
for _, namespace := range namespaces {
configs := cm.agollo.GetNameSpace(namespace)
configType = getConfigType(namespace)
// 根据类型进行合并
switch configType {
case "json", "yml", "yaml", "xml":
content := configs["content"]
if content != nil {
var tempConfig map[string]interface{}
err := unmarshalConfig(configType, []byte(content.(string)), &tempConfig)
if err != nil {
return nil, err
}
// 执行合并
viper.MergeConfigMap(tempConfig)
mergedConfigs := viper.AllSettings()
b, err = marshalConfig(configType, mergedConfigs)
if err != nil {
return nil, err
}
newConfigs["content"] = string(b)
}
case "properties":
for k, v := range configs {
newConfigs[k] = v
}
}
}

return marshalConfigs(configType, newConfigs)
}

func unmarshalConfig(configType string, data []byte, config *map[string]interface{}) error {
switch configType {
case "json":
return json.Unmarshal(data, config)
case "yml", "yaml":
return yaml.Unmarshal(data, config)
case "xml":
return xml.Unmarshal(data, config)
default:
return fmt.Errorf("unsupported config type: %s", configType)
}
}

func marshalConfig(configType string, config map[string]interface{}) ([]byte, error) {
switch configType {
case "json":
return json.Marshal(config)
case "yml", "yaml":
return yaml.Marshal(config)
case "xml":
return xml.Marshal(config)
default:
return nil, fmt.Errorf("unsupported config type: %s", configType)
}
}

func marshalConfigs(configType string, configs map[string]interface{}) ([]byte, error) {
var bts []byte
var err error
Expand Down Expand Up @@ -140,6 +209,39 @@ func (cm apolloConfigManager) Watch(namespace string, stop chan bool) <-chan *vi
}()
return resp
}
func (cm apolloConfigManager) WatchMultipleNamespaces(namespaces []string, stop chan bool) <-chan *viper.RemoteResponse {
combinedResp := make(chan *viper.RemoteResponse)

// 创建一个监听每个namespace变化的goroutine
for _, namespace := range namespaces {
go func(ns string) {
backendResp := cm.agollo.WatchNamespace(ns, stop)
for {
select {
case <-stop:
return
case r := <-backendResp:
if r.Error != nil {
combinedResp <- &viper.RemoteResponse{
Value: nil,
Error: r.Error,
}
continue
}
// 重载所有配置以确保多配置的优先级
allConfigs, err := cm.GetMultipleNamespaces(namespaces)
if err != nil {
combinedResp <- &viper.RemoteResponse{Value: nil, Error: err}
continue
}
combinedResp <- &viper.RemoteResponse{Value: allConfigs, Error: err}
}
}
}(namespace)
}

return combinedResp
}

type configProvider struct {
}
Expand All @@ -149,13 +251,23 @@ func (rc configProvider) Get(rp viper.RemoteProvider) (io.Reader, error) {
if err != nil {
return nil, err
}

namespaces := strings.Split(rp.Path(), ",")
var b []byte
switch cm := cmt.(type) {
case viperConfigManager:
b, err = cm.Get(rp.Path())
case crypt.ConfigManager:
b, err = cm.Get(rp.Path())
if len(namespaces) == 1 {
switch cm := cmt.(type) {
case viperConfigManager:
b, err = cm.Get(namespaces[0])
case crypt.ConfigManager:
b, err = cm.Get(namespaces[0])
}
} else if len(namespaces) > 1 {
switch cm := cmt.(type) {
case viperConfigManager:
b, err = cm.GetMultipleNamespaces(namespaces)
if err != nil {
return nil, err
}
}
}

if err != nil {
Expand All @@ -169,13 +281,20 @@ func (rc configProvider) Watch(rp viper.RemoteProvider) (io.Reader, error) {
if err != nil {
return nil, err
}

namespaces := strings.Split(rp.Path(), ",")
var resp []byte
switch cm := cmt.(type) {
case viperConfigManager:
resp, err = cm.Get(rp.Path())
case crypt.ConfigManager:
resp, err = cm.Get(rp.Path())
if len(namespaces) == 1 {
switch cm := cmt.(type) {
case viperConfigManager:
resp, err = cm.Get(namespaces[0])
case crypt.ConfigManager:
resp, err = cm.Get(namespaces[0])
}
} else if len(namespaces) > 1 {
switch cm := cmt.(type) {
case viperConfigManager:
resp, err = cm.GetMultipleNamespaces(namespaces)
}
}

if err != nil {
Expand All @@ -190,7 +309,15 @@ func (rc configProvider) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.Re
if err != nil {
return nil, nil
}

namespaces := strings.Split(rp.Path(), ",")
if len(namespaces) > 1 {
switch cm := cmt.(type) {
case viperConfigManager:
quitwc := make(chan bool)
viperResponsCh := cm.WatchMultipleNamespaces(namespaces, quitwc)
return viperResponsCh, quitwc
}
}
switch cm := cmt.(type) {
case viperConfigManager:
quitwc := make(chan bool)
Expand Down