diff --git a/agollo.go b/agollo.go index 331dbe2..4297369 100644 --- a/agollo.go +++ b/agollo.go @@ -109,45 +109,68 @@ func New(configServerURL, appID string, opts ...Option) (Agollo, error) { } a.opts = options - return a.preload() + return a, a.initNamespace(a.opts.PreloadNamespaces...) } -func (a *agollo) preload() (Agollo, error) { - for _, namespace := range a.opts.PreloadNamespaces { - err := a.initNamespace(namespace) +func (a *agollo) initNamespace(namespaces ...string) error { + var uninitializedNamespaces []string + for _, namespace := range namespaces { + _, found := a.initialized.LoadOrStore(namespace, true) + if !found { + uninitializedNamespaces = append(uninitializedNamespaces, namespace) + } + } + + if len(uninitializedNamespaces) == 0 { + return nil + } + + var existsNamespaces []Notification + for _, namespace := range uninitializedNamespaces { + // (1)读取配置 (2)设置初始化notificationMap + status, _, err := a.reloadNamespace(namespace, defaultNotificationID) if err != nil { - return nil, err + return err + } + + // 这里没法光凭靠error==nil来判断,即使http请求失败,如果开启 容错,会导致error丢失 + // 从而可能将一个不存在的namespace拿去调用getRemoteNotifications导致被hold + if status == http.StatusOK { + existsNamespaces = append(existsNamespaces, + Notification{ + NotificationID: defaultNotificationID, + NamespaceName: namespace, + }, + ) } } - return a, nil -} -func (a *agollo) initNamespace(namespace string) error { - _, found := a.initialized.LoadOrStore(namespace, true) - if !found { - _, err := a.reloadNamespace(namespace, defaultNotificationID) - return err + // 由于apollo去getRemoteNotifications获取一个不存在的namespace的notificationID时会hold请求90秒 + // (1) 为防止意外传入一个不存在的namespace而发生上述情况,仅将成功获取配置在apollo存在的namespace,去初始化notificationID + // (2) 此处忽略error返回,在容灾逻辑下配置能正确读取而去获取notificationid可能会返回http请求失败,防止服务不能正常容灾启动 + remoteNotifications, _ := a.getRemoteNotifications(existsNamespaces) + // 设置初始化的namespace的notificationID + for _, notification := range remoteNotifications { + a.updateNotificationIDIfIncreased(notification.NamespaceName, notification.NotificationID) } + return nil } -func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Configurations, err error) { - // 判断relaod的通知id是否大于缓存通知id,防止无谓的刷新缓存, - savedNotificationID, ok := a.notificationMap.Load(namespace) - if ok && savedNotificationID.(int) >= notificationID { +func (a *agollo) reloadNamespace(namespace string, notificationID int) (status int, conf Configurations, err error) { + if !a.updateNotificationIDIfIncreased(namespace, notificationID) { conf = a.getNameSpace(namespace) return } - a.notificationMap.Store(namespace, notificationID) var configServerURL string configServerURL, err = a.opts.Balancer.Select() if err != nil { - return nil, err + a.log("Action", "BalancerSelect", "Error", err) + return } var ( - status int config *Config cachedReleaseKey, _ = a.releaseKeyMap.LoadOrStore(namespace, "") ) @@ -167,24 +190,32 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con // 备份配置 if err = a.backup(); err != nil { + a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace, + "Action", "Backup", "Error", err) return } case http.StatusNotModified: // 服务端未修改配置情况下返回304 conf = a.getNameSpace(namespace) default: a.log("ConfigServerUrl", configServerURL, "Namespace", namespace, - "Action", "ReloadNameSpace", "ServerResponseStatus", status, + "Action", "GetConfigsFromNonCache", "ServerResponseStatus", status, "Error", err) conf = Configurations{} + // 异常状况下,如果开启容灾,则读取备份 if a.opts.FailTolerantOnBackupExists { backupConfig, lerr := a.loadBackup(namespace) - if lerr == nil { - a.cache.Store(namespace, backupConfig) - conf = backupConfig - err = nil + if lerr != nil { + a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace, + "Action", "LoadBackup", "Error", lerr) + return } + + a.cache.Store(namespace, backupConfig) + conf = backupConfig + err = nil + return } } @@ -213,7 +244,10 @@ func (a *agollo) Get(key string, opts ...GetOption) string { func (a *agollo) GetNameSpace(namespace string) Configurations { config, found := a.cache.LoadOrStore(namespace, Configurations{}) if !found && a.opts.AutoFetchOnCacheMiss { - _ = a.initNamespace(namespace) + err := a.initNamespace(namespace) + if err != nil { + a.log("Action", "InitNamespace", "Error", err) + } return a.getNameSpace(namespace) } @@ -254,6 +288,43 @@ func (a *agollo) Start() <-chan *LongPollerError { return a.errorsCh } +func (a *agollo) shouldStop() bool { + select { + case <-a.stopCh: + return true + default: + return false + } +} + +func (a *agollo) longPoll() { + localNotifications := a.getLocalNotifications() + + // 这里有个问题是非预加载的namespace,如果在Start开启监听后才被initNamespace + // 需要等待90秒后的下一次轮训才能收到事件通知 + notifications, err := a.getRemoteNotifications(localNotifications) + if err != nil { + a.sendErrorsCh("", nil, "", err) + return + } + + // HTTP Status: 200时,正常返回notifications数据,数组含有需要更新namespace和notificationID + // HTTP Status: 304时,上报的namespace没有更新的修改,返回notifications为空数组,遍历空数组跳过 + for _, notification := range notifications { + // 读取旧缓存用来给监听队列 + oldValue := a.getNameSpace(notification.NamespaceName) + + // 更新namespace + _, newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID) + if err == nil { + // 发送到监听channel + a.sendWatchCh(notification.NamespaceName, oldValue, newValue) + } else { + a.sendErrorsCh("", notifications, notification.NamespaceName, err) + } + } +} + func (a *agollo) Stop() { a.stopLock.Lock() defer a.stopLock.Unlock() @@ -269,15 +340,6 @@ func (a *agollo) Stop() { close(a.stopCh) } -func (a *agollo) shouldStop() bool { - select { - case <-a.stopCh: - return true - default: - return false - } -} - func (a *agollo) Watch() <-chan *ApolloResponse { if a.watchCh == nil { a.watchCh = make(chan *ApolloResponse) @@ -286,16 +348,6 @@ func (a *agollo) Watch() <-chan *ApolloResponse { return a.watchCh } -func fixWatchNamespace(namespace string) string { - // fix: 传给apollo类似test.properties这种namespace - // 通知回来的NamespaceName却没有.properties后缀,追加.properties后缀来修正此问题 - ext := path.Ext(namespace) - if ext == "" { - namespace = namespace + "." + defaultConfigType - } - return namespace -} - func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse { watchNamespace := fixWatchNamespace(namespace) watchCh, exists := a.watchNamespaceChMap.LoadOrStore(watchNamespace, make(chan *ApolloResponse)) @@ -320,6 +372,16 @@ func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *Apollo return watchCh.(chan *ApolloResponse) } +func fixWatchNamespace(namespace string) string { + // fix: 传给apollo类似test.properties这种namespace + // 通知回来的NamespaceName却没有.properties后缀,追加.properties后缀来修正此问题 + ext := path.Ext(namespace) + if ext == "" { + namespace = namespace + "." + defaultConfigType + } + return namespace +} + func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) { changes := oldVal.Different(newVal) if len(changes) == 0 { @@ -358,6 +420,8 @@ func (a *agollo) getWatchChs(namespace string) []chan *ApolloResponse { return chs } +// sendErrorsCh 发送轮训时发生的错误信息channel,如果使用者不监听消费channel,错误会被丢弃 +// 改成负载均衡机制后,不太好获取每个api使用的configServerURL有点蛋疼 func (a *agollo) sendErrorsCh(configServerURL string, notifications []Notification, namespace string, err error) { longPollerError := &LongPollerError{ ConfigServerURL: configServerURL, @@ -434,60 +498,36 @@ func (a *agollo) loadBackup(specifyNamespace string) (Configurations, error) { return nil, nil } -// longPoll 长轮训获取配置修改通知 -// 已知的apollo在配置未修改情况下返回304状态码,修改返回200状态码并带上notificationID -// (1) 先通过/configs接口获取配置后,而接口中无法获取notificationID只能获取releaseKey -// 通过releaseKey去重复请求接口得到304,从而知道配置已更新到最新 -// (2) 在初始化不知道namespace的notificationID情况下,通知接口返回200状态码时 -// * 无法判断是否需要更新配置(只能通过再次请求接口,在304状态下不修改,在200状态下更新,并存储notificationID) -// * 在初始化不知道namespace的notificationID情况下,无法判断是否需要发送修改事件(只能通过比较 新/旧 配置是否有差异部分,来判断是否发送修改事件) -// [Apollo BUG] https://github.com/ctripcorp/apollo/issues/3123 -// 在我实测下来,无法在初始化时通过调用/notifications接口获得notificationID,有些特殊情况会被hold死且无notificationID返回 -func (a *agollo) longPoll() { - localNotifications := a.notifications() +// getRemoteNotifications +// 立即返回的情况: +// 1. 请求中的namespace任意一个在apollo服务器中有更新的ID会立即返回结果 +// 请求被hold 90秒的情况: +// 1. 请求的notificationID和apollo服务器中的ID相等 +// 2. 请求的namespace都是在apollo中不存在的 +func (a *agollo) getRemoteNotifications(req []Notification) ([]Notification, error) { configServerURL, err := a.opts.Balancer.Select() if err != nil { - a.log("ConfigServerUrl", configServerURL, - "Notifications", Notifications(localNotifications).String(), - "Error", err, "Action", "Balancer.Select") - a.sendErrorsCh("", nil, "", err) - return + a.log("ConfigServerUrl", configServerURL, "Error", err, "Action", "Balancer.Select") + return nil, err } status, notifications, err := a.opts.ApolloClient.Notifications( configServerURL, a.opts.AppID, a.opts.Cluster, - localNotifications, + req, ) if err != nil { a.log("ConfigServerUrl", configServerURL, - "Notifications", Notifications(localNotifications).String(), + "Notifications", req, "ServerResponseStatus", status, "Error", err, "Action", "LongPoll") - a.sendErrorsCh(configServerURL, notifications, "", err) - return + return nil, err } - if status == http.StatusOK { - // 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过 - for _, notification := range notifications { - // 读取旧缓存用来给监听队列 - oldValue := a.getNameSpace(notification.NamespaceName) - - // 更新namespace - newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID) - - if err == nil { - // 发送到监听channel - a.sendWatchCh(notification.NamespaceName, oldValue, newValue) - } else { - a.sendErrorsCh(configServerURL, notifications, notification.NamespaceName, err) - } - } - } + return notifications, nil } -func (a *agollo) notifications() []Notification { +func (a *agollo) getLocalNotifications() []Notification { var notifications []Notification a.notificationMap.Range(func(key, val interface{}) bool { @@ -503,6 +543,15 @@ func (a *agollo) notifications() []Notification { return notifications } +func (a *agollo) updateNotificationIDIfIncreased(namespace string, notificationID int) bool { + savedNotificationID, ok := a.notificationMap.Load(namespace) + if ok && savedNotificationID.(int) >= notificationID { + return false + } + a.notificationMap.Store(namespace, notificationID) + return true +} + func Init(configServerURL, appID string, opts ...Option) (err error) { defaultAgollo, err = New(configServerURL, appID, opts...) return diff --git a/agollo_test.go b/agollo_test.go index 6bfe488..c3eeaae 100644 --- a/agollo_test.go +++ b/agollo_test.go @@ -2,6 +2,7 @@ package agollo import ( "fmt" + "os" "strconv" "sync" "testing" @@ -176,7 +177,11 @@ func TestAgollo(t *testing.T) { { Name: "测试:自动获取非预加载namespace时,正常读取配置配置项", NewAgollo: func(configs map[string]*Config) Agollo { - a, err := New(configServerURL, appid, WithApolloClient(newClient(configs)), AutoFetchOnCacheMiss()) + a, err := New(configServerURL, appid, + WithApolloClient(newClient(configs)), + AutoFetchOnCacheMiss(), + WithLogger(NewLogger(LoggerWriter(os.Stdout))), + ) assert.Nil(t, err) assert.NotNil(t, a) return a @@ -185,7 +190,7 @@ func TestAgollo(t *testing.T) { for namespace, config := range configs { for key, expected := range config.Configurations { actual := a.Get(key, WithNamespace(namespace)) - assert.Equal(t, expected, actual) + assert.Equal(t, expected, actual, "Namespace: %s, Key: %s", namespace, key) } } @@ -213,7 +218,7 @@ func TestAgollo(t *testing.T) { go func() { defer wg.Done() - for i := 0; i < 5; i++ { + for i := 0; i < 3; i++ { for namespace, config := range configs { for key, expected := range config.Configurations { actual := a.Get(key, WithNamespace(namespace)) @@ -230,7 +235,10 @@ func TestAgollo(t *testing.T) { { Name: "测试:容灾配置项", NewAgollo: func(configs map[string]*Config) Agollo { - a, err := New(configServerURL, appid, WithApolloClient(newBadClient(configs)), AutoFetchOnCacheMiss(), FailTolerantOnBackupExists()) + a, err := New(configServerURL, appid, + WithApolloClient(newBadClient(configs)), + AutoFetchOnCacheMiss(), + FailTolerantOnBackupExists()) assert.Nil(t, err) assert.NotNil(t, a) return a @@ -260,8 +268,17 @@ func TestAgollo(t *testing.T) { }, } + var wg sync.WaitGroup + wg.Add(len(tests)) for _, test := range tests { - configs := newConfigs() - test.Test(test.NewAgollo(configs), configs) + go func() { + defer wg.Done() + + configs := newConfigs() + a := test.NewAgollo(configs) + test.Test(a, configs) + }() } + + wg.Wait() }