diff --git a/agollo.go b/agollo.go index 4297369..92ef449 100644 --- a/agollo.go +++ b/agollo.go @@ -128,7 +128,7 @@ func (a *agollo) initNamespace(namespaces ...string) error { var existsNamespaces []Notification for _, namespace := range uninitializedNamespaces { // (1)读取配置 (2)设置初始化notificationMap - status, _, err := a.reloadNamespace(namespace, defaultNotificationID) + status, _, err := a.reloadNamespace(namespace) if err != nil { return err } @@ -142,27 +142,29 @@ func (a *agollo) initNamespace(namespaces ...string) error { NamespaceName: namespace, }, ) + } else { + // 不能正常获取notificationID的设置为默认notificationID + a.notificationMap.Store(namespace, defaultNotificationID) } } + if len(existsNamespaces) == 0 { + return nil + } + // 由于apollo去getRemoteNotifications获取一个不存在的namespace的notificationID时会hold请求90秒 // (1) 为防止意外传入一个不存在的namespace而发生上述情况,仅将成功获取配置在apollo存在的namespace,去初始化notificationID // (2) 此处忽略error返回,在容灾逻辑下配置能正确读取而去获取notificationid可能会返回http请求失败,防止服务不能正常容灾启动 remoteNotifications, _ := a.getRemoteNotifications(existsNamespaces) - // 设置初始化的namespace的notificationID for _, notification := range remoteNotifications { - a.updateNotificationIDIfIncreased(notification.NamespaceName, notification.NotificationID) + // 设置namespace初始化的notificationID + a.notificationMap.Store(notification.NamespaceName, notification.NotificationID) } return nil } -func (a *agollo) reloadNamespace(namespace string, notificationID int) (status int, conf Configurations, err error) { - if !a.updateNotificationIDIfIncreased(namespace, notificationID) { - conf = a.getNameSpace(namespace) - return - } - +func (a *agollo) reloadNamespace(namespace string) (status int, conf Configurations, err error) { var configServerURL string configServerURL, err = a.opts.Balancer.Select() if err != nil { @@ -311,11 +313,14 @@ func (a *agollo) longPoll() { // HTTP Status: 200时,正常返回notifications数据,数组含有需要更新namespace和notificationID // HTTP Status: 304时,上报的namespace没有更新的修改,返回notifications为空数组,遍历空数组跳过 for _, notification := range notifications { + // 更新NotificationID + a.notificationMap.Store(notification.NamespaceName, notification.NotificationID) + // 读取旧缓存用来给监听队列 oldValue := a.getNameSpace(notification.NamespaceName) // 更新namespace - _, newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID) + _, newValue, err := a.reloadNamespace(notification.NamespaceName) if err == nil { // 发送到监听channel a.sendWatchCh(notification.NamespaceName, oldValue, newValue) @@ -543,15 +548,6 @@ func (a *agollo) getLocalNotifications() []Notification { return notifications } -func (a *agollo) updateNotificationIDIfIncreased(namespace string, notificationID int) bool { - savedNotificationID, ok := a.notificationMap.Load(namespace) - if ok && savedNotificationID.(int) >= notificationID { - return false - } - a.notificationMap.Store(namespace, notificationID) - return true -} - func Init(configServerURL, appID string, opts ...Option) (err error) { defaultAgollo, err = New(configServerURL, appID, opts...) return diff --git a/agollo_test.go b/agollo_test.go index 14b7202..156c7db 100644 --- a/agollo_test.go +++ b/agollo_test.go @@ -1,7 +1,10 @@ package agollo import ( + "encoding/json" "fmt" + "io/ioutil" + "log" "os" "strconv" "sync" @@ -51,9 +54,8 @@ func (c *mockApolloClient) GetConfigServers(metaServerURL, appID string) (int, [ } type testCase struct { - Name string - NewAgollo func(configs map[string]*Config) Agollo - Test func(a Agollo, configs map[string]*Config) + Name string + Test func(configs map[string]*Config) } func TestAgollo(t *testing.T) { @@ -84,22 +86,13 @@ func TestAgollo(t *testing.T) { } newClient := func(configs map[string]*Config) ApolloClient { - var lock sync.RWMutex - var once sync.Once return &mockApolloClient{ notifications: func(configServerURL, appID, clusterName string, notifications []Notification) (int, []Notification, error) { - lock.RLock() - rk, _ := strconv.Atoi(configs["application"].ReleaseKey) - lock.RUnlock() - - once.Do(func() { - rk++ - lock.Lock() - configs["application"].ReleaseKey = fmt.Sprint(rk) - lock.Unlock() - }) - return 200, []Notification{ + rk, _ := strconv.Atoi(configs["application"].ReleaseKey) + rk++ + configs["application"].ReleaseKey = fmt.Sprint(rk) + return 304, []Notification{ Notification{ NamespaceName: "application", NotificationID: rk, @@ -115,9 +108,8 @@ func TestAgollo(t *testing.T) { opt(&options) } - lock.RLock() config, ok := configs[namespace] - lock.RUnlock() + if !ok { return 404, nil, nil } @@ -152,17 +144,25 @@ func TestAgollo(t *testing.T) { }, } } - + _ = newBadClient var tests = []testCase{ { Name: "测试:预加载的namespace应该正常可获取,非预加载的namespace无法获取配置", - NewAgollo: func(configs map[string]*Config) Agollo { - a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), PreloadNamespaces("test.json")) + Test: func(configs map[string]*Config) { + backupfile, err := ioutil.TempFile("", "backup") + if err != nil { + log.Fatal(err) + } + defer os.Remove(backupfile.Name()) + + a, err := New(configServerURL, appid, + WithApolloClient(newClient(configs)), + PreloadNamespaces("test.json"), + BackupFile(backupfile.Name()), + ) assert.Nil(t, err) assert.NotNil(t, a) - return a - }, - Test: func(a Agollo, configs map[string]*Config) { + for namespace, config := range configs { for key, expected := range config.Configurations { if namespace == "test.json" { @@ -178,21 +178,28 @@ func TestAgollo(t *testing.T) { }, { Name: "测试:自动获取非预加载namespace时,正常读取配置配置项", - NewAgollo: func(configs map[string]*Config) Agollo { + Test: func(configs map[string]*Config) { + backupfile, err := ioutil.TempFile("", "backup") + if err != nil { + log.Fatal(err) + } + defer os.Remove(backupfile.Name()) + a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss(), WithLogger(NewLogger(LoggerWriter(os.Stdout))), + BackupFile(backupfile.Name()), ) assert.Nil(t, err) assert.NotNil(t, a) - return a - }, - Test: func(a Agollo, configs map[string]*Config) { + for namespace, config := range configs { for key, expected := range config.Configurations { actual := a.Get(key, WithNamespace(namespace)) - assert.Equal(t, expected, actual, "Namespace: %s, Key: %s", namespace, key) + assert.Equal(t, expected, actual, + "configs: %v, agollo: %v, Namespace: %s, Key: %s", + configs, a.GetNameSpace(namespace), namespace, key) } } @@ -205,13 +212,21 @@ func TestAgollo(t *testing.T) { }, { Name: "测试:初始化后 start 监听配置的情况", - NewAgollo: func(configs map[string]*Config) Agollo { - a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss()) + Test: func(configs map[string]*Config) { + backupfile, err := ioutil.TempFile("", "backup") + if err != nil { + log.Fatal(err) + } + defer os.Remove(backupfile.Name()) + + a, err := New(configServerURL, appid, + WithApolloClient(newClient(configs)), + AutoFetchOnCacheMiss(), + BackupFile(backupfile.Name()), + ) assert.Nil(t, err) assert.NotNil(t, a) - return a - }, - Test: func(a Agollo, configs map[string]*Config) { + a.Start() defer a.Stop() @@ -236,16 +251,34 @@ func TestAgollo(t *testing.T) { }, { Name: "测试:容灾配置项", - NewAgollo: func(configs map[string]*Config) Agollo { + Test: func(configs map[string]*Config) { + backupfile, err := ioutil.TempFile("", "backup") + if err != nil { + log.Fatal(err) + } + defer os.Remove(backupfile.Name()) + + enc := json.NewEncoder(backupfile) + + backup := map[string]Configurations{} + for _, config := range configs { + backup[config.NamespaceName] = config.Configurations + } + + err = enc.Encode(backup) + if err != nil { + log.Fatal(err) + } + a, err := New(configServerURL, appid, WithApolloClient(newBadClient(configs)), AutoFetchOnCacheMiss(), - FailTolerantOnBackupExists()) + FailTolerantOnBackupExists(), + BackupFile(backupfile.Name()), + ) assert.Nil(t, err) assert.NotNil(t, a) - return a - }, - Test: func(a Agollo, configs map[string]*Config) { + a.Start() defer a.Stop() @@ -254,11 +287,11 @@ func TestAgollo(t *testing.T) { go func() { defer wg.Done() - for i := 0; i < 5; i++ { + for i := 0; i < 3; i++ { for namespace, config := range configs { for key, expected := range config.Configurations { actual := a.Get(key, WithNamespace(namespace)) - assert.Equal(t, expected, actual) + assert.Equal(t, expected, actual, "%v %s", a.GetNameSpace(namespace), namespace) } } time.Sleep(time.Second) @@ -275,10 +308,8 @@ func TestAgollo(t *testing.T) { for _, test := range tests { go func(test testCase) { defer wg.Done() - configs := newConfigs() - a := test.NewAgollo(configs) - test.Test(a, configs) + test.Test(configs) }(test) }