Skip to content

Commit

Permalink
[feat] 1. 优化namespace获取对应notificationID的逻辑,以及内部逻辑使其更可读
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxingwang authored and liuxingwang committed Jun 15, 2020
1 parent 5f2a2fe commit 68d3e84
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 88 deletions.
213 changes: 131 additions & 82 deletions agollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,45 +109,68 @@ func New(configServerURL, appID string, opts ...Option) (Agollo, error) {
}
a.opts = options

return a.preload()
return a, a.initNamespace(a.opts.PreloadNamespaces...)
}

func (a *agollo) preload() (Agollo, error) {
for _, namespace := range a.opts.PreloadNamespaces {
err := a.initNamespace(namespace)
func (a *agollo) initNamespace(namespaces ...string) error {
var uninitializedNamespaces []string
for _, namespace := range namespaces {
_, found := a.initialized.LoadOrStore(namespace, true)
if !found {
uninitializedNamespaces = append(uninitializedNamespaces, namespace)
}
}

if len(uninitializedNamespaces) == 0 {
return nil
}

var existsNamespaces []Notification
for _, namespace := range uninitializedNamespaces {
// (1)读取配置 (2)设置初始化notificationMap
status, _, err := a.reloadNamespace(namespace, defaultNotificationID)
if err != nil {
return nil, err
return err
}

// 这里没法光凭靠error==nil来判断,即使http请求失败,如果开启 容错,会导致error丢失
// 从而可能将一个不存在的namespace拿去调用getRemoteNotifications导致被hold
if status == http.StatusOK {
existsNamespaces = append(existsNamespaces,
Notification{
NotificationID: defaultNotificationID,
NamespaceName: namespace,
},
)
}
}
return a, nil
}

func (a *agollo) initNamespace(namespace string) error {
_, found := a.initialized.LoadOrStore(namespace, true)
if !found {
_, err := a.reloadNamespace(namespace, defaultNotificationID)
return err
// 由于apollo去getRemoteNotifications获取一个不存在的namespace的notificationID时会hold请求90秒
// (1) 为防止意外传入一个不存在的namespace而发生上述情况,仅将成功获取配置在apollo存在的namespace,去初始化notificationID
// (2) 此处忽略error返回,在容灾逻辑下配置能正确读取而去获取notificationid可能会返回http请求失败,防止服务不能正常容灾启动
remoteNotifications, _ := a.getRemoteNotifications(existsNamespaces)
// 设置初始化的namespace的notificationID
for _, notification := range remoteNotifications {
a.updateNotificationIDIfIncreased(notification.NamespaceName, notification.NotificationID)
}

return nil
}

func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Configurations, err error) {
// 判断relaod的通知id是否大于缓存通知id,防止无谓的刷新缓存,
savedNotificationID, ok := a.notificationMap.Load(namespace)
if ok && savedNotificationID.(int) >= notificationID {
func (a *agollo) reloadNamespace(namespace string, notificationID int) (status int, conf Configurations, err error) {
if !a.updateNotificationIDIfIncreased(namespace, notificationID) {
conf = a.getNameSpace(namespace)
return
}
a.notificationMap.Store(namespace, notificationID)

var configServerURL string
configServerURL, err = a.opts.Balancer.Select()
if err != nil {
return nil, err
a.log("Action", "BalancerSelect", "Error", err)
return
}

var (
status int
config *Config
cachedReleaseKey, _ = a.releaseKeyMap.LoadOrStore(namespace, "")
)
Expand All @@ -167,24 +190,32 @@ func (a *agollo) reloadNamespace(namespace string, notificationID int) (conf Con

// 备份配置
if err = a.backup(); err != nil {
a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace,
"Action", "Backup", "Error", err)
return
}
case http.StatusNotModified: // 服务端未修改配置情况下返回304
conf = a.getNameSpace(namespace)
default:
a.log("ConfigServerUrl", configServerURL, "Namespace", namespace,
"Action", "ReloadNameSpace", "ServerResponseStatus", status,
"Action", "GetConfigsFromNonCache", "ServerResponseStatus", status,
"Error", err)

conf = Configurations{}

// 异常状况下,如果开启容灾,则读取备份
if a.opts.FailTolerantOnBackupExists {
backupConfig, lerr := a.loadBackup(namespace)
if lerr == nil {
a.cache.Store(namespace, backupConfig)
conf = backupConfig
err = nil
if lerr != nil {
a.log("BackupFile", a.opts.BackupFile, "Namespace", namespace,
"Action", "LoadBackup", "Error", lerr)
return
}

a.cache.Store(namespace, backupConfig)
conf = backupConfig
err = nil
return
}
}

Expand Down Expand Up @@ -213,7 +244,10 @@ func (a *agollo) Get(key string, opts ...GetOption) string {
func (a *agollo) GetNameSpace(namespace string) Configurations {
config, found := a.cache.LoadOrStore(namespace, Configurations{})
if !found && a.opts.AutoFetchOnCacheMiss {
_ = a.initNamespace(namespace)
err := a.initNamespace(namespace)
if err != nil {
a.log("Action", "InitNamespace", "Error", err)
}
return a.getNameSpace(namespace)
}

Expand Down Expand Up @@ -254,6 +288,43 @@ func (a *agollo) Start() <-chan *LongPollerError {
return a.errorsCh
}

func (a *agollo) shouldStop() bool {
select {
case <-a.stopCh:
return true
default:
return false
}
}

func (a *agollo) longPoll() {
localNotifications := a.getLocalNotifications()

// 这里有个问题是非预加载的namespace,如果在Start开启监听后才被initNamespace
// 需要等待90秒后的下一次轮训才能收到事件通知
notifications, err := a.getRemoteNotifications(localNotifications)
if err != nil {
a.sendErrorsCh("", nil, "", err)
return
}

// HTTP Status: 200时,正常返回notifications数据,数组含有需要更新namespace和notificationID
// HTTP Status: 304时,上报的namespace没有更新的修改,返回notifications为空数组,遍历空数组跳过
for _, notification := range notifications {
// 读取旧缓存用来给监听队列
oldValue := a.getNameSpace(notification.NamespaceName)

// 更新namespace
_, newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID)
if err == nil {
// 发送到监听channel
a.sendWatchCh(notification.NamespaceName, oldValue, newValue)
} else {
a.sendErrorsCh("", notifications, notification.NamespaceName, err)
}
}
}

func (a *agollo) Stop() {
a.stopLock.Lock()
defer a.stopLock.Unlock()
Expand All @@ -269,15 +340,6 @@ func (a *agollo) Stop() {
close(a.stopCh)
}

func (a *agollo) shouldStop() bool {
select {
case <-a.stopCh:
return true
default:
return false
}
}

func (a *agollo) Watch() <-chan *ApolloResponse {
if a.watchCh == nil {
a.watchCh = make(chan *ApolloResponse)
Expand All @@ -286,16 +348,6 @@ func (a *agollo) Watch() <-chan *ApolloResponse {
return a.watchCh
}

func fixWatchNamespace(namespace string) string {
// fix: 传给apollo类似test.properties这种namespace
// 通知回来的NamespaceName却没有.properties后缀,追加.properties后缀来修正此问题
ext := path.Ext(namespace)
if ext == "" {
namespace = namespace + "." + defaultConfigType
}
return namespace
}

func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse {
watchNamespace := fixWatchNamespace(namespace)
watchCh, exists := a.watchNamespaceChMap.LoadOrStore(watchNamespace, make(chan *ApolloResponse))
Expand All @@ -320,6 +372,16 @@ func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *Apollo
return watchCh.(chan *ApolloResponse)
}

func fixWatchNamespace(namespace string) string {
// fix: 传给apollo类似test.properties这种namespace
// 通知回来的NamespaceName却没有.properties后缀,追加.properties后缀来修正此问题
ext := path.Ext(namespace)
if ext == "" {
namespace = namespace + "." + defaultConfigType
}
return namespace
}

func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) {
changes := oldVal.Different(newVal)
if len(changes) == 0 {
Expand Down Expand Up @@ -358,6 +420,8 @@ func (a *agollo) getWatchChs(namespace string) []chan *ApolloResponse {
return chs
}

// sendErrorsCh 发送轮训时发生的错误信息channel,如果使用者不监听消费channel,错误会被丢弃
// 改成负载均衡机制后,不太好获取每个api使用的configServerURL有点蛋疼
func (a *agollo) sendErrorsCh(configServerURL string, notifications []Notification, namespace string, err error) {
longPollerError := &LongPollerError{
ConfigServerURL: configServerURL,
Expand Down Expand Up @@ -434,60 +498,36 @@ func (a *agollo) loadBackup(specifyNamespace string) (Configurations, error) {
return nil, nil
}

// longPoll 长轮训获取配置修改通知
// 已知的apollo在配置未修改情况下返回304状态码,修改返回200状态码并带上notificationID
// (1) 先通过/configs接口获取配置后,而接口中无法获取notificationID只能获取releaseKey
// 通过releaseKey去重复请求接口得到304,从而知道配置已更新到最新
// (2) 在初始化不知道namespace的notificationID情况下,通知接口返回200状态码时
// * 无法判断是否需要更新配置(只能通过再次请求接口,在304状态下不修改,在200状态下更新,并存储notificationID)
// * 在初始化不知道namespace的notificationID情况下,无法判断是否需要发送修改事件(只能通过比较 新/旧 配置是否有差异部分,来判断是否发送修改事件)
// [Apollo BUG] https://github.com/ctripcorp/apollo/issues/3123
// 在我实测下来,无法在初始化时通过调用/notifications接口获得notificationID,有些特殊情况会被hold死且无notificationID返回
func (a *agollo) longPoll() {
localNotifications := a.notifications()
// getRemoteNotifications
// 立即返回的情况:
// 1. 请求中的namespace任意一个在apollo服务器中有更新的ID会立即返回结果
// 请求被hold 90秒的情况:
// 1. 请求的notificationID和apollo服务器中的ID相等
// 2. 请求的namespace都是在apollo中不存在的
func (a *agollo) getRemoteNotifications(req []Notification) ([]Notification, error) {
configServerURL, err := a.opts.Balancer.Select()
if err != nil {
a.log("ConfigServerUrl", configServerURL,
"Notifications", Notifications(localNotifications).String(),
"Error", err, "Action", "Balancer.Select")
a.sendErrorsCh("", nil, "", err)
return
a.log("ConfigServerUrl", configServerURL, "Error", err, "Action", "Balancer.Select")
return nil, err
}

status, notifications, err := a.opts.ApolloClient.Notifications(
configServerURL,
a.opts.AppID,
a.opts.Cluster,
localNotifications,
req,
)
if err != nil {
a.log("ConfigServerUrl", configServerURL,
"Notifications", Notifications(localNotifications).String(),
"Notifications", req, "ServerResponseStatus", status,
"Error", err, "Action", "LongPoll")
a.sendErrorsCh(configServerURL, notifications, "", err)
return
return nil, err
}

if status == http.StatusOK {
// 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过
for _, notification := range notifications {
// 读取旧缓存用来给监听队列
oldValue := a.getNameSpace(notification.NamespaceName)

// 更新namespace
newValue, err := a.reloadNamespace(notification.NamespaceName, notification.NotificationID)

if err == nil {
// 发送到监听channel
a.sendWatchCh(notification.NamespaceName, oldValue, newValue)
} else {
a.sendErrorsCh(configServerURL, notifications, notification.NamespaceName, err)
}
}
}
return notifications, nil
}

func (a *agollo) notifications() []Notification {
func (a *agollo) getLocalNotifications() []Notification {
var notifications []Notification

a.notificationMap.Range(func(key, val interface{}) bool {
Expand All @@ -503,6 +543,15 @@ func (a *agollo) notifications() []Notification {
return notifications
}

func (a *agollo) updateNotificationIDIfIncreased(namespace string, notificationID int) bool {
savedNotificationID, ok := a.notificationMap.Load(namespace)
if ok && savedNotificationID.(int) >= notificationID {
return false
}
a.notificationMap.Store(namespace, notificationID)
return true
}

func Init(configServerURL, appID string, opts ...Option) (err error) {
defaultAgollo, err = New(configServerURL, appID, opts...)
return
Expand Down
Loading

0 comments on commit 68d3e84

Please sign in to comment.