Skip to content

Commit

Permalink
[feature] 1. 增加了事件通知中详细的数据修改部分 2. 修正了preload的namespace也会有事件通知的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxingwang committed Jul 12, 2019
1 parent 0a72555 commit f37d9be
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 12 deletions.
27 changes: 15 additions & 12 deletions agollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ type Agollo interface {
Options() Options
}

type Configurations map[string]interface{}

type ApolloResponse struct {
Namespace string
OldValue Configurations
NewValue Configurations
Changes Changes
Error error
}

Expand Down Expand Up @@ -117,7 +116,8 @@ func NewWithConfigFile(configFilePath string, opts ...Option) (Agollo, error) {

func (a *agollo) preload() (Agollo, error) {
for _, namespace := range a.opts.PreloadNamespaces {
_, err := a.loadConfigFromNonCache(namespace)
// The action do not need to notify
_, err := a.loadConfigFromNonCache(namespace, false)
if err != nil {
if a.opts.FailTolerantOnBackupExists {
_, err = a.loadBackup(namespace)
Expand Down Expand Up @@ -167,7 +167,7 @@ func (a *agollo) loadNameSpace(namespace string) Configurations {
a.notificationMap.LoadOrStore(namespace, defaultNotificationID)

if a.opts.AutoFetchOnCacheMiss {
configs, err := a.loadConfigFromCache(namespace)
configs, err := a.loadConfigFromCache(namespace, true)
if err == nil {
a.log("Namesapce", namespace, "From", "cache-api")
return configs
Expand Down Expand Up @@ -215,6 +215,7 @@ func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) {
Namespace: namespace,
OldValue: oldVal,
NewValue: newVal,
Changes: oldVal.Different(newVal),
}

timer := time.NewTimer(defaultWatchTimeout)
Expand Down Expand Up @@ -268,7 +269,7 @@ func (a *agollo) log(kvs ...interface{}) {
)
}

func (a *agollo) loadConfigFromCache(namespace string) (configurations Configurations, err error) {
func (a *agollo) loadConfigFromCache(namespace string, isNeedNotify bool) (configurations Configurations, err error) {
configurations, err = a.opts.ApolloClient.GetConfigsFromCache(
a.opts.ConfigServerURL,
a.opts.AppID,
Expand All @@ -279,12 +280,12 @@ func (a *agollo) loadConfigFromCache(namespace string) (configurations Configura
return
}

err = a.handleConfig(namespace, configurations)
err = a.handleConfig(namespace, configurations, isNeedNotify)

return
}

func (a *agollo) loadConfigFromNonCache(namespace string) (configurations Configurations, err error) {
func (a *agollo) loadConfigFromNonCache(namespace string, isNeedNotify bool) (configurations Configurations, err error) {

var (
status int
Expand All @@ -306,21 +307,23 @@ func (a *agollo) loadConfigFromNonCache(namespace string) (configurations Config
if status == http.StatusOK {
configurations = config.Configurations
a.namespaceMap.Store(namespace, config.ReleaseKey)
err = a.handleConfig(namespace, config.Configurations)
err = a.handleConfig(namespace, config.Configurations, isNeedNotify)
return
}

return
}

func (a *agollo) handleConfig(namespace string, configurations Configurations) error {
func (a *agollo) handleConfig(namespace string, configurations Configurations, isNeedNotify bool) error {
// 读取旧缓存用来给监听队列
oldValue := a.GetNameSpace(namespace)
// 覆盖旧缓存
a.cache.Store(namespace, configurations)

// 发送到监听channel
a.sendWatchCh(namespace, oldValue, configurations)
if isNeedNotify {
// 发送到监听channel
a.sendWatchCh(namespace, oldValue, configurations)
}
// 备份配置
return a.backup()
}
Expand Down Expand Up @@ -385,7 +388,7 @@ func (a *agollo) longPoll() {
if status == http.StatusOK {
// 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过
for _, notification := range notifications {
_, err = a.loadConfigFromNonCache(notification.NamespaceName)
_, err = a.loadConfigFromNonCache(notification.NamespaceName, true)
if err == nil {
a.notificationMap.Store(notification.NamespaceName, notification.NotificationID)
continue
Expand Down
32 changes: 32 additions & 0 deletions change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package agollo

type ChangeType string

const (
ChangeTypeAdd ChangeType = "add"
ChangeTypeUpdate ChangeType = "update"
ChangeTypeDelete ChangeType = "delete"
)

type Change struct {
Type ChangeType
Key string
Value interface{}
}

type Changes []Change

// Len is part of sort.Interface.
func (cs Changes) Len() int {
return len(cs)
}

// Swap is part of sort.Interface.
func (cs Changes) Swap(i, j int) {
cs[i], cs[j] = cs[j], cs[i]
}

// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (cs Changes) Less(i, j int) bool {
return cs[i].Key < cs[j].Key
}
42 changes: 42 additions & 0 deletions configurations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package agollo

import "sort"

type Configurations map[string]interface{}

func (old Configurations) Different(new Configurations) Changes {
var changes []Change
for k, newValue := range new {
oldValue, found := old[k]
if found {
if oldValue != newValue {
changes = append(changes, Change{
Type: ChangeTypeUpdate,
Key: k,
Value: newValue,
})
}
} else {
changes = append(changes, Change{
Type: ChangeTypeAdd,
Key: k,
Value: newValue,
})
}
}

for k, oldValue := range old {
_, found := new[k]
if !found {
changes = append(changes, Change{
Type: ChangeTypeDelete,
Key: k,
Value: oldValue,
})
}
}

sort.Sort(Changes(changes))

return changes
}
46 changes: 46 additions & 0 deletions configurations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package agollo

import "testing"

func TestConfigurationsDifferent(t *testing.T) {
tests := []struct {
old Configurations
new Configurations
changes []Change
}{
{
Configurations{
"name": "foo",
"age": 18,
"balance": 101.2,
},

Configurations{
"name": "foo",
"age": 19,
"height": 1.82,
},
[]Change{
Change{ChangeTypeUpdate, "age", 19},
Change{ChangeTypeDelete, "balance", 101.2},
Change{ChangeTypeAdd, "height", 1.82},
},
},
}

for _, test := range tests {
changes := test.old.Different(test.new)

if len(changes) != len(test.changes) {
t.Errorf("should be equal (expected=%v, actual=%v)", test.changes, len(changes))
}
for i, actual := range changes {
expected := test.changes[i]
if actual.Type != expected.Type &&
actual.Key != expected.Key &&
actual.Value != expected.Value {
t.Errorf("should be equal (expected=%v, actual=%v)", expected, actual)
}
}
}
}

0 comments on commit f37d9be

Please sign in to comment.