From f61f49af7376863a57486aacba0e06e2e1e5c01b Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 7 Feb 2024 15:40:27 +0800 Subject: [PATCH] unstable->3.5 (#2397) * fix: codis-dashboard uses 100% cpu(#2332) (#2393) Co-authored-by: liuchengyu * fix: The role displayed on the first Server in the Group area of the codis-fe is incorrect (#2350) (#2387) Co-authored-by: liuchengyu * fix: automatic fix master-slave replication relationship after master or slave service restarted (#2373, #2038, #1950, #1967, #2351)) (#2386) Co-authored-by: liuchengyu * feat:add 3.5.3 changelog (#2395) * add 3.5.3 changelog --------- Co-authored-by: chejinge --------- Co-authored-by: Chengyu Liu Co-authored-by: liuchengyu Co-authored-by: chejinge --- CHANGELOG.MD | 89 +++++++ CHANGELOG_CN.MD | 81 ++++++ codis/config/dashboard.toml | 7 +- codis/pkg/models/action.go | 3 + codis/pkg/models/group.go | 45 +++- codis/pkg/topom/config.go | 8 +- codis/pkg/topom/context.go | 2 +- codis/pkg/topom/topom.go | 18 +- codis/pkg/topom/topom_group.go | 317 ++++++++++++++++-------- codis/pkg/topom/topom_sentinel.go | 179 ++++++------- codis/pkg/topom/topom_stats.go | 20 +- codis/pkg/utils/redis/client.go | 35 ++- codis/pkg/utils/redis/client_test.go | 90 +++---- codis/pkg/utils/redis/codis_sentinel.go | 1 + codis/pkg/utils/redis/sentinel.go | 40 ++- 15 files changed, 658 insertions(+), 277 deletions(-) diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 9170b5bb4..003e369f5 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,3 +1,92 @@ +# v3.5.3 + +## New features + +- Pika supports ACL[#2013](https://github.com/OpenAtomFoundation/pika/pull/2013) @[lqxhub](https://github.com/lqxhub) + +- Automatically resume service when Codis dashboard coroutine panics[#2349](https://github.com/OpenAtomFoundation/pika/pull/2349)@[chengyu-l](https://github.com/chengyu-l) + +- During the full replication process, the slave node of the pika service does not receive read traffic requests.[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[tedli](https://github.com/tedli) + +- Pika cache adds bimap data type.[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Delete the remaining Slots in Sharing mode. There is only DB under Pika, and there are multiple DBs under one Pika.[#2251](https://github.com/OpenAtomFoundation/pika/pull/2251) @[Mixficsol](https://github.com/Mixficsol) + +- Pika exporter exposes cache-related data collection indicators.[#2318](https://github.com/OpenAtomFoundation/pika/pull/2318) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika supports separation of fast and slow commands.[#2162](https://github.com/OpenAtomFoundation/pika/pull/2162) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- After pika executes bgsave, retain the unix timepoint.[#2167](https://github.com/OpenAtomFoundation/pika/pull/2167) @[hero-heng](https://github.com/hero-heng) + +- Pika supports dynamic configuration of the disable_auto_compations parameter.[#2257](https://github.com/OpenAtomFoundation/pika/pull/2257) @[hero-heng](https://github.com/hero-heng) + +- Pika supports Redis Stream.[#1955](https://github.com/OpenAtomFoundation/pika/pull/1955) @[KKorpse](https://github.com/KKorpse) + +- Pika supports large key analysis tools[#2195](https://github.com/OpenAtomFoundation/pika/pull/2195) @[sjcsjc123](https://github.com/sjcsjc123) + +- Pika supports dynamic adjustment of Pika cache parameters[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Updated Pika benchmark tool to support more interface stress tests.[#2222](https://github.com/OpenAtomFoundation/pika/pull/2222)@[wangshao1](https://github.com/wangshao1) + +- Pika Operator supports automatic expansion of pika clusters.[#2121](https://github.com/OpenAtomFoundation/pika/pull/2121)@[machinly](https://github.com/machinly/) + +- Add the CompactRange command to support compacting keys within a certain range.[#2163](https://github.com/OpenAtomFoundation/pika/pull/2163)@[u6th9d](https://github.com/u6th9d) + +- Add small time cost compaction policy.[#2172](https://github.com/OpenAtomFoundation/pika/pull/2172)@[u6th9d](https://github.com/u6th9d) + +- Upgrade RocksDB version to v8.7.3.[#2157](https://github.com/OpenAtomFoundation/pika/pull/2157)@[JasirVoriya](https://github.com/JasirVoriya) + +- Pika distributed cluster Codis proxy adds new observable indicators.[#2199](https://github.com/OpenAtomFoundation/pika/pull/2199)@[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika distributed cluster supports automatic failover.[#2386](https://github.com/OpenAtomFoundation/pika/pull/2386)@[chengyu-l](https://github.com/chengyu-l) + +## bugfix + +- Fixed an issue where Pika would accidentally delete dump files during full replication from the node.[#2377](https://github.com/OpenAtomFoundation/pika/pull/2377)@[wangshao1](https://github.com/wangshao1) + +- Fixed the processing logic after the slave node receives an abnormal response packet from the master during the master-slave replication process.[#2319](https://github.com/OpenAtomFoundation/pika/pull/2319)@[wangshao1](https://github.com/wangshao1) + +- Call disable compaction when pika executes the shutdown command to improve the process exit speed. [#2345](https://github.com/OpenAtomFoundation/pika/pull/2345) @[panlei-coder](https://github.com/panlei-coder) + +- Fix the problem of inaccurate Codis-dashboard Redis Memory value.[#2337](https://github.com/OpenAtomFoundation/pika/pull/2337) @[Mixficsol](https://github.com/Mixficsol) + +- The INFO command is time-consuming and optimized to reduce the frequency of disk checks. [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- Fixed the issue where rsync deletes temporary files with incorrect paths and fails to delete them, causing rocksdb to fail to open.[#2186](https://github.com/OpenAtomFoundation/pika/pull/2186)@[wangshao1](https://github.com/wangshao1) + +- Fixed the problem that the compact, bgsave, and info keyspace commands did not specify the db name, resulting in some coredump commands.[#2194](https://github.com/OpenAtomFoundation/pika/pull/2194)@[u6th9d](https://github.com/u6th9d) + +- Codis dashboard uses info replication instead of info command to search master ip to reduce the performance impact on Pika. [#2198](https://github.com/OpenAtomFoundation/pika/pull/2198) @[chenbt-hz](https://github.com/chenbt-hz) + +- Fix Pika cache to use edge cases to solve the problem of cache and DB data inconsistency in some scenarios.[#2225](https://github.com/OpenAtomFoundation/pika/pull/2225) @[chejinge](https://github.com/chejinge) + +- Fixed the issue where Segmentation fault would be reported when the dump folder is empty.[#2265](https://github.com/OpenAtomFoundation/pika/pull/2265) @[chenbt-hz](https://github.com/chenbt-hz) + +- Fixed the problem that some command caches did not take effect due to flag calculation errors.[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- Fixed the problem that in master-slave replication mode, after the master instance flushdb, the slave instance cannot be accessed due to deadlock.[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- Fixed the issue where some commands did not judge the return value of RocksDB.[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- Fixed the problem that some command caches did not take effect due to flag calculation errors.[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- Fixed the problem that in master-slave replication mode, after the master instance flushdb, the slave instance cannot be accessed due to deadlock.[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- Fixed the issue where some commands did not judge the return value of RocksDB.[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- Fix the problem of info keyspace returning wrong results.[#2369](https://github.com/OpenAtomFoundation/pika/pull/2369)@[Mixficsol](https://github.com/Mixficsol) + +- Standard function return value and initial value.[#2176](https://github.com/OpenAtomFoundation/pika/pull/2176)@[Mixficsol](https://github.com/Mixficsol) + +- Fixed the problem of inaccurate network monitoring indicator statistics.[#2234](https://github.com/OpenAtomFoundation/pika/pull/2234)@[chengyu-l](https://github.com/chengyu-l) + +- Fixed an issue where some parameters in configuration file loading were abnormal.[#2218](https://github.com/OpenAtomFoundation/pika/pull/2218)@[jettcc](https://github.com/jettcc) + +- Fix Codis dashboard cpu used 100%.[#2393](https://github.com/OpenAtomFoundation/pika/pull/2393)@[chengyu-l](https://github.com/chengyu-l) + +- Fix the problem of abnormal display of master and slave roles in Codis fe of pika.[#2387](https://github.com/OpenAtomFoundation/pika/pull/2387)@[chengyu-l](https://github.com/chengyu-l) + + # v3.5.2 ## New features diff --git a/CHANGELOG_CN.MD b/CHANGELOG_CN.MD index 9c43cf2db..95a76d0b9 100644 --- a/CHANGELOG_CN.MD +++ b/CHANGELOG_CN.MD @@ -1,3 +1,84 @@ +# v3.5.3 + +## 新特性 + +- Pika 支持 ACL[#2013](https://github.com/OpenAtomFoundation/pika/pull/2013) @[lqxhub](https://github.com/lqxhub) + +- 在 Codis dashboard 协程 panic 时自动恢复服务[#2349](https://github.com/OpenAtomFoundation/pika/pull/2349)@[chengyu-l](https://github.com/chengyu-l) + +- 在全量复制的过程中,pika 服务的从节点不接收读流量请求 [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[tedli](https://github.com/tedli) + +- Pika cache 新增 bimap数据类型[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 删除 Sharing 模式残留的 Slot,Pika 下只有 DB,一个 Pika 下有多个 DB[#2251](https://github.com/OpenAtomFoundation/pika/pull/2251) @[Mixficsol](https://github.com/Mixficsol) + +- Pika exporter 暴露 cache 相关的数据采集指标[#2318](https://github.com/OpenAtomFoundation/pika/pull/2318) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika 支持快慢命令分离[#2162](https://github.com/OpenAtomFoundation/pika/pull/2162) @[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- pika 执行完成 Bgsave后, 保留 unix timepoint[#2167](https://github.com/OpenAtomFoundation/pika/pull/2167) @[hero-heng](https://github.com/hero-heng) + +- Pika 支持动态配置 disable_auto_compations 参数[#2257](https://github.com/OpenAtomFoundation/pika/pull/2257) @[hero-heng](https://github.com/hero-heng) + +- Pika 支持 Redis Stream[#1955](https://github.com/OpenAtomFoundation/pika/pull/1955) @[KKorpse](https://github.com/KKorpse) + +- Pika 支持大 key 分析工具[#2195](https://github.com/OpenAtomFoundation/pika/pull/2195) @[sjcsjc123](https://github.com/sjcsjc123) + +- Pika 支持动态调整 Pika cache 参数[#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 更新 Pika benchmark 工具支持更多的接口压测[#2222](https://github.com/OpenAtomFoundation/pika/pull/2222)@[wangshao1](https://github.com/wangshao1) + +- Pika Operator 支持 pika 集群自动扩容[#2121](https://github.com/OpenAtomFoundation/pika/pull/2121)@[machinly](https://github.com/machinly/) + +- 添加 CompactRange 命令支持对一定范围内的 key 进行 compact[#2163](https://github.com/OpenAtomFoundation/pika/pull/2163)@[u6th9d](https://github.com/u6th9d) + +- 提升 Compaction 速度减少 Compaction 耗时[#2172](https://github.com/OpenAtomFoundation/pika/pull/2172)@[u6th9d](https://github.com/u6th9d) + +- 升级 RocksDB 版本到 v8.7.3[#2157](https://github.com/OpenAtomFoundation/pika/pull/2157)@[JasirVoriya](https://github.com/JasirVoriya) + +- Pika 分布式集群 Codis proxy 新增可观测指标[#2199](https://github.com/OpenAtomFoundation/pika/pull/2199)@[dingxiaoshuai](https://github.com/dingxiaoshuai123) + +- Pika 分布式集群支持自动 failover[#2386](https://github.com/OpenAtomFoundation/pika/pull/2386)@[chengyu-l](https://github.com/chengyu-l) + +## bugfix + +- 修复 Pika 有从节点进行全量复制期间会误删除 dump 文件的问题[#2377](https://github.com/OpenAtomFoundation/pika/pull/2377)@[wangshao1](https://github.com/wangshao1) + +- 修复主从复制过程中, slave 节点收到 master 异常回包后的处理逻辑[#2319](https://github.com/OpenAtomFoundation/pika/pull/2319)@[wangshao1](https://github.com/wangshao1) + +- 在 Pika 执行 shutdown 命令时调用 disable compaction, 提升进程退出速度 [#2345](https://github.com/OpenAtomFoundation/pika/pull/2345) @[panlei-coder](https://github.com/panlei-coder) + +- 修复 Codis-dashboard Redis Memory 值不准确的问题[#2337](https://github.com/OpenAtomFoundation/pika/pull/2337) @[Mixficsol](https://github.com/Mixficsol) + +- INFO 命令耗时优化,降低查磁盘频率 [#2197](https://github.com/OpenAtomFoundation/pika/pull/2197) @[chejinge](https://github.com/chejinge) + +- 修复 Rsync 删除临时文件路径不对,删除失败,导致rocksdb打开失败的问题[#2186](https://github.com/OpenAtomFoundation/pika/pull/2186)@[wangshao1](https://github.com/wangshao1) + +- 修复 Compact ,Bgsave ,Info keyspace 命令未指定db名称,导致部分命令 coredump 的问题[#2194](https://github.com/OpenAtomFoundation/pika/pull/2194)@[u6th9d](https://github.com/u6th9d) + +- Codis dashboard 用 info replication 替代 info 命令查寻 master ip 降低对 Pika 的性能影响 [#2198](https://github.com/OpenAtomFoundation/pika/pull/2198) @[chenbt-hz](https://github.com/chenbt-hz) + +- 修复 Pika cache 使用边缘case,解决部分场景下 cache 和 DB 数据不一致的问题[#2225](https://github.com/OpenAtomFoundation/pika/pull/2225) @[chejinge](https://github.com/chejinge) + +- 修复当 dump 文件夹为空时,会启动报错 Segmentation fault 的问题[#2265](https://github.com/OpenAtomFoundation/pika/pull/2265) @[chenbt-hz](https://github.com/chenbt-hz) + +- 修复因为flag计算错误,导致的部分命令缓存没有生效问题[#2217](https://github.com/OpenAtomFoundation/pika/pull/2217) @[lqxhub](https://github.com/lqxhub) + +- 修复主从复制模式下,主实例 flushdb 后,从实例因为死锁导致的不能访问的问题[#2249](https://github.com/OpenAtomFoundation/pika/pull/2249)@[ForestLH](https://github.com/ForestLH) + +- 修复部分命令未对 RocksDB 的返回值进行判断的问题[#2187](https://github.com/OpenAtomFoundation/pika/pull/2187)@[callme-taota](https://github.com/callme-taota) + +- 规范函数的返回值及初始值[#2176](https://github.com/OpenAtomFoundation/pika/pull/2176)@[Mixficsol](https://github.com/Mixficsol) + +- 修复网络监控指标统计不准确的问题[#2234](https://github.com/OpenAtomFoundation/pika/pull/2234)@[chengyu-l](https://github.com/chengyu-l) + +- 修复配置文件加载部分参数异常的问题[#2218](https://github.com/OpenAtomFoundation/pika/pull/2218)@[jettcc](https://github.com/jettcc) + +- 修复 Codis dashboard cpu 100% 的问题[#2393](https://github.com/OpenAtomFoundation/pika/pull/2393)@[chengyu-l](https://github.com/chengyu-l) + +- 修复 Codis fe pika 主从角色显示异常的问题[#2387](https://github.com/OpenAtomFoundation/pika/pull/2387)@[chengyu-l](https://github.com/chengyu-l) + + # v3.5.2 ## 新特性 diff --git a/codis/config/dashboard.toml b/codis/config/dashboard.toml index ebd910ec6..44ef06213 100644 --- a/codis/config/dashboard.toml +++ b/codis/config/dashboard.toml @@ -33,9 +33,10 @@ migration_async_numkeys = 500 migration_timeout = "30s" # Set configs for redis sentinel. -sentinel_check_server_state_interval = "5s" -sentinel_check_master_failover_interval = "1s" -sentinel_master_dead_check_times = 5 +sentinel_check_server_state_interval = "10s" +sentinel_check_master_failover_interval = "2s" +sentinel_master_dead_check_times = 10 +sentinel_check_offline_server_interval = "2s" sentinel_client_timeout = "10s" sentinel_quorum = 2 sentinel_parallel_syncs = 1 diff --git a/codis/pkg/models/action.go b/codis/pkg/models/action.go index 80dbe4b55..1138df7b4 100644 --- a/codis/pkg/models/action.go +++ b/codis/pkg/models/action.go @@ -11,4 +11,7 @@ const ( ActionMigrating = "migrating" ActionFinished = "finished" ActionSyncing = "syncing" + ActionSynced = "synced" + + ActionSyncedFailed = "synced_failed" ) diff --git a/codis/pkg/models/group.go b/codis/pkg/models/group.go index 11d6e7bf5..092ec2f11 100644 --- a/codis/pkg/models/group.go +++ b/codis/pkg/models/group.go @@ -25,6 +25,38 @@ func (g *Group) GetServersMap() map[string]*GroupServer { return results } +// SelectNewMaster choose a new master node in the group +func (g *Group) SelectNewMaster() (string, int) { + var newMasterServer *GroupServer + var newMasterIndex = -1 + + for index, server := range g.Servers { + if index == 0 || server.State != GroupServerStateNormal { + continue + } + + if newMasterServer == nil { + newMasterServer = server + newMasterIndex = index + } else if server.DbBinlogFileNum > newMasterServer.DbBinlogFileNum { + // Select the slave node with the latest offset as the master node + newMasterServer = server + newMasterIndex = index + } else if server.DbBinlogFileNum == newMasterServer.DbBinlogFileNum { + if server.DbBinlogOffset > newMasterServer.DbBinlogOffset { + newMasterServer = server + newMasterIndex = index + } + } + } + + if newMasterServer == nil { + return "", newMasterIndex + } + + return newMasterServer.Addr, newMasterIndex +} + type GroupServerState int8 const ( @@ -33,6 +65,13 @@ const ( GroupServerStateOffline ) +type GroupServerRole string + +const ( + RoleMaster GroupServerRole = "master" + RoleSlave GroupServerRole = "slave" +) + type GroupServer struct { Addr string `json:"server"` DataCenter string `json:"datacenter"` @@ -43,9 +82,11 @@ type GroupServer struct { } `json:"action"` // master or slave - Role string `json:"role"` + Role GroupServerRole `json:"role"` // If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field - ReplyOffset int `json:"reply_offset"` + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + // Monitoring status, 0 normal, 1 subjective offline, 2 actual offline // If marked as 2 , no service is provided State GroupServerState `json:"state"` diff --git a/codis/pkg/topom/config.go b/codis/pkg/topom/config.go index 4d7234b66..d1e0d44e5 100644 --- a/codis/pkg/topom/config.go +++ b/codis/pkg/topom/config.go @@ -50,9 +50,10 @@ migration_async_numkeys = 500 migration_timeout = "30s" # Set configs for redis sentinel. -sentinel_check_server_state_interval = "5s" -sentinel_check_master_failover_interval = "1s" -sentinel_master_dead_check_times = 5 +sentinel_check_server_state_interval = "10s" +sentinel_check_master_failover_interval = "2s" +sentinel_master_dead_check_times = 10 +sentinel_check_offline_server_interval = "2s" sentinel_client_timeout = "10s" sentinel_quorum = 2 sentinel_parallel_syncs = 1 @@ -86,6 +87,7 @@ type Config struct { SentinelCheckServerStateInterval timesize.Duration `toml:"sentinel_check_server_state_interval" json:"sentinel_client_timeout"` SentinelCheckMasterFailoverInterval timesize.Duration `toml:"sentinel_check_master_failover_interval" json:"sentinel_check_master_failover_interval"` SentinelMasterDeadCheckTimes int8 `toml:"sentinel_master_dead_check_times" json:"sentinel_master_dead_check_times"` + SentinelCheckOfflineServerInterval timesize.Duration `toml:"sentinel_check_offline_server_interval" json:"sentinel_check_offline_server_interval"` SentinelClientTimeout timesize.Duration `toml:"sentinel_client_timeout" json:"sentinel_client_timeout"` SentinelQuorum int `toml:"sentinel_quorum" json:"sentinel_quorum"` SentinelParallelSyncs int `toml:"sentinel_parallel_syncs" json:"sentinel_parallel_syncs"` diff --git a/codis/pkg/topom/context.go b/codis/pkg/topom/context.go index b765154e7..fcec2157e 100644 --- a/codis/pkg/topom/context.go +++ b/codis/pkg/topom/context.go @@ -40,7 +40,7 @@ func (ctx *context) getSlotMapping(sid int) (*models.SlotMapping, error) { } func (ctx *context) getSlotMappingsByGroupId(gid int) []*models.SlotMapping { - var slots = []*models.SlotMapping{} + var slots []*models.SlotMapping for _, m := range ctx.slots { if m.GroupId == gid || m.Action.TargetId == gid { slots = append(slots, m) diff --git a/codis/pkg/topom/topom.go b/codis/pkg/topom/topom.go index 67bcd7dac..f2c34f6b5 100644 --- a/codis/pkg/topom/topom.go +++ b/codis/pkg/topom/topom.go @@ -210,12 +210,12 @@ func (s *Topom) Start(routines bool) error { } }, nil, true, 0) - // Check the status of the pre-offline master every 1 second + // Check the status of the pre-offline master every 2 second // to determine whether to automatically switch master and slave gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { - w, _ := s.CheckPreOffineMastersState(5 * time.Second) + w, _ := s.CheckPreOfflineMastersState(5 * time.Second) if w != nil { w.Wait() } @@ -224,6 +224,20 @@ func (s *Topom) Start(routines bool) error { } }, nil, true, 0) + // Check the status of the offline master and slave every 30 second + // to determine whether to automatically recover to right master-slave replication relationship + gxruntime.GoUnterminated(func() { + for !s.IsClosed() { + if s.IsOnline() { + w, _ := s.CheckOfflineMastersAndSlavesState(5 * time.Second) + if w != nil { + w.Wait() + } + } + time.Sleep(s.Config().SentinelCheckOfflineServerInterval.Duration()) + } + }, nil, true, 0) + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index 46a53c417..517fb2da4 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -302,18 +302,7 @@ func (s *Topom) GroupPromoteServer(gid int, addr string) error { if err := s.storeUpdateGroup(g); err != nil { return err } - - var ( - master = slice[0].Addr - client *redis.Client - ) - if client, err = redis.NewClient(master, s.config.ProductAuth, time.Second); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - } - defer client.Close() - if err = client.SetMaster("NO:ONE"); err != nil { - log.WarnErrorf(err, "redis %s set master to NO:ONE failed", master) - } + _ = promoteServerToNewMaster(slice[0].Addr, s.config.ProductAuth) fallthrough case models.ActionFinished: @@ -341,129 +330,243 @@ func (s *Topom) GroupPromoteServer(gid int, addr string) error { } } -func (s *Topom) trySwitchGroupMaster(gid int, cache *redis.InfoCache) error { - ctx, err := s.newContext() - if err != nil { - return err - } - g, err := ctx.getGroup(gid) - if err != nil { - return err +func (s *Topom) tryFixReplicationRelationships(ctx *context, recoveredGroupServers []*redis.ReplicationState) { + for _, state := range recoveredGroupServers { + log.Infof("group-[%d] try to fix server[%v-%v] replication relationship", state.GroupID, state.Index, state.Addr) + group, err := ctx.getGroup(state.GroupID) + if err != nil { + log.Error(err) + continue + } + + group.OutOfSync = true + err = s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue + } + + err = s.tryFixReplicationRelationship(group, state.Server, state) + if err != nil { + log.Warnf("group-[%d] fix server[%v] replication relationship failed, err: %v", group.Id, state.Addr, err) + continue + } + + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err = s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) + continue + } else { + group.OutOfSync = false + _ = s.storeUpdateGroup(group) + s.dirtyGroupCache(group.Id) + } } +} - master := s.selectNextMaster(g.Servers) +// tryFixReplicationRelationship +// +// master or slave have already recovered service, fix its master-slave replication relationship. +// only fix which the old state of GroupServer is GroupServerStateOffline. +// It will only update the state of GroupServer to GroupServerStateNormal, If the GroupServer have right +// master-slave replication relationship. +func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *models.GroupServer, state *redis.ReplicationState) (err error) { + curMasterAddr := group.Servers[0].Addr + if isGroupMaster(state, group) { + // current server is master, + if models.GroupServerRole(state.Replication.Role) == models.RoleMaster { + return nil + } + + // execute the command `slaveof no one` + if err = promoteServerToNewMaster(state.Addr, s.config.ProductAuth); err != nil { + return err + } + } else { + // skip if it has right replication relationship + if state.Replication.GetMasterAddr() == curMasterAddr { + return nil + } - if master == "" { - servers, _ := json.Marshal(g) - log.Errorf("group %d donn't has any slaves to switch master, %s", gid, servers) - return errors.Errorf("cann't switch slave to master") + // current server is slave, execute the command `slaveof [new master ip] [new master port]` + if err = updateMasterToNewOne(groupServer.Addr, curMasterAddr, s.config.ProductAuth); err != nil { + return err + } } - return s.doSwitchGroupMaster(gid, master, cache) + groupServer.State = models.GroupServerStateNormal + groupServer.ReCallTimes = 0 + groupServer.ReplicaGroup = true + groupServer.Role = models.GroupServerRole(state.Replication.Role) + groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum + groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.Action.State = models.ActionSynced + err = s.storeUpdateGroup(group) + // clean cache whether err is nil or not + s.dirtyGroupCache(group.Id) + return err } -// Choose to change to the next master node in the group -func (s *Topom) selectNextMaster(servers []*models.GroupServer) string { - if len(servers) == 0 { - return "" - } +func isGroupMaster(state *redis.ReplicationState, g *models.Group) bool { + return state.Index == 0 && g.Servers[0].Addr == state.Addr +} - var masterServer *models.GroupServer +func (s *Topom) updateSlaveOfflineGroups(ctx *context, offlineGroups []*models.Group) { + for _, group := range offlineGroups { + log.Infof("group-[%d] update slave offline state", group.Id) + group.OutOfSync = true + err := s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue + } - for _, server := range servers { - if server.State != models.GroupServerStateNormal { + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err := s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) continue } + } +} - // If there is already a master node in the group working normally, return directly - if server.Role == "master" { - return server.Addr +// trySwitchGroupsToNewMaster +// +// the master have already been offline, and it will select and switch to a new master from the Group +func (s *Topom) trySwitchGroupsToNewMaster(ctx *context, masterOfflineGroups []*models.Group) { + for _, group := range masterOfflineGroups { + log.Infof("group-[%d] try to switch new master", group.Id) + group.OutOfSync = true + err := s.storeUpdateGroup(group) + if err != nil { + s.dirtyGroupCache(group.Id) + continue } - if masterServer == nil { - masterServer = server - } else if server.ReplyOffset > masterServer.ReplyOffset { - // Select the slave node with the latest offset as the master node - masterServer = server + // try to switch to new master + if err := s.trySwitchGroupMaster(group); err != nil { + log.Errorf("group-[%d] switch master failed, %v", group.Id, err) + continue + } + + // Notify all servers to update slot information + slots := ctx.getSlotMappingsByGroupId(group.Id) + if err := s.resyncSlotMappings(ctx, slots...); err != nil { + log.Warnf("group-[%d] notify all proxy failed, %v", group.Id, err) + continue + } else { + group.OutOfSync = false + _ = s.storeUpdateGroup(group) + s.dirtyGroupCache(group.Id) } } +} - if masterServer == nil { - return "" +func (s *Topom) trySwitchGroupMaster(group *models.Group) error { + newMasterAddr, newMasterIndex := group.SelectNewMaster() + if newMasterAddr == "" { + servers, _ := json.Marshal(group) + log.Errorf("group %d don't has any slaves to switch master, %s", group.Id, servers) + return errors.Errorf("can't switch slave to master") } - return masterServer.Addr + // TODO liuchengyu check new master is available + //available := isAvailableAsNewMaster(masterServer, s.Config()) + //if !available { + // return "" + //} + + return s.doSwitchGroupMaster(group, newMasterAddr, newMasterIndex) } -func (s *Topom) doSwitchGroupMaster(gid int, master string, cache *redis.InfoCache) error { - ctx, err := s.newContext() +func isAvailableAsNewMaster(groupServer *models.GroupServer, conf *Config) bool { + rc, err := redis.NewClient(groupServer.Addr, conf.ProductAuth, 500*time.Millisecond) if err != nil { - return err + log.Warnf("connect GroupServer[%v] failed!, error:%v", groupServer.Addr, err) + return false } - g, err := ctx.getGroup(gid) + defer rc.Close() + + info, err := rc.InfoReplication() if err != nil { - return err + log.Warnf("get InfoReplication from GroupServer[%v] failed!, error:%v", groupServer.Addr, err) + return false } - var index = func() int { - for i, x := range g.Servers { - if x.Addr == master { - return i - } - } - for i, x := range g.Servers { - rid1 := cache.GetRunId(master) - rid2 := cache.GetRunId(x.Addr) - if rid1 != "" && rid1 == rid2 { - return i - } - } - return -1 - }() - if index == -1 { - return errors.Errorf("group-[%d] doesn't have server %s with runid = '%s'", g.Id, master, cache.GetRunId(master)) + if info.MasterLinkStatus == "down" { + // down state means the slave does not finished full sync from master + log.Warnf("the master_link_status of GroupServer[%v] is down state. it cannot be selected as master", groupServer.Addr) + return false } - if index == 0 { + + return true +} + +func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMasterIndex int) (err error) { + if newMasterIndex <= 0 || newMasterAddr == "" { return nil } - defer s.dirtyGroupCache(g.Id) - - log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, index, g.Servers[index].Addr) + log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, newMasterIndex, newMasterAddr) // Set the slave node as the new master node - var client *redis.Client - if client, err = redis.NewClient(master, s.config.ProductAuth, 100*time.Millisecond); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - return err + if err = promoteServerToNewMaster(newMasterAddr, s.config.ProductAuth); err != nil { + return errors.Errorf("promote server[%v] to new master failed, err:%v", newMasterAddr, err) } - defer client.Close() - if err = client.SetMaster("NO:ONE"); err != nil { - log.WarnErrorf(err, "redis %s set master to NO:ONE failed", master) - return err - } + g.Servers[newMasterIndex].Role = models.RoleMaster + g.Servers[newMasterIndex].Action.State = models.ActionSynced + g.Servers[0], g.Servers[newMasterIndex] = g.Servers[newMasterIndex], g.Servers[0] + defer func() { + err = s.storeUpdateGroup(g) + // clean cache whether err is nil or not + s.dirtyGroupCache(g.Id) + }() // Set other nodes in the group as slave nodes of the new master node for _, server := range g.Servers { - if server.State != models.GroupServerStateNormal || server.Addr == master { + if server.State != models.GroupServerStateNormal || server.Addr == newMasterAddr { continue } - var client2 *redis.Client - if client2, err = redis.NewClient(server.Addr, s.config.ProductAuth, 100*time.Millisecond); err != nil { - log.WarnErrorf(err, "create redis client to %s failed", master) - return err - } - defer client2.Close() - if err = client2.SetMaster(master); err != nil { - log.WarnErrorf(err, "redis %s set master to %s failed", server.Addr, master) - return err + + err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + if err != nil { + // skip err, and retry to update master-slave replication relationship through next heartbeat check + err = nil + server.Action.State = models.ActionSyncedFailed + server.State = models.GroupServerStateOffline + log.Warnf("group-[%d] update server[%d] replication relationship failed, new master: %s", g.Id, newMasterIndex, newMasterAddr) + } else { + server.Action.State = models.ActionSynced + server.Role = models.RoleSlave } } - g.Servers[0], g.Servers[index] = g.Servers[index], g.Servers[0] - g.Servers[0].Role = "master" - g.OutOfSync = true - return s.storeUpdateGroup(g) + return err +} + +func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) { + return setNewRedisMaster(serverAddr, masterAddr, auth, false) +} + +func promoteServerToNewMaster(serverAddr, auth string) (err error) { + return setNewRedisMaster(serverAddr, "NO:ONE", auth, false) +} + +func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) { + return setNewRedisMaster(serverAddr, masterAddr, auth, true) +} + +func setNewRedisMaster(serverAddr, masterAddr string, auth string, force bool) (err error) { + var rc *redis.Client + if rc, err = redis.NewClient(serverAddr, auth, 500*time.Millisecond); err != nil { + return errors.Errorf("create redis client to %s failed, err:%v", serverAddr, err) + } + defer rc.Close() + if err = rc.SetMaster(masterAddr, force); err != nil { + return errors.Errorf("server[%s] set master to %s failed, force:%v err:%v", serverAddr, masterAddr, force, err) + } + return err } func (s *Topom) EnableReplicaGroups(gid int, addr string, value bool) error { @@ -640,11 +743,14 @@ func (s *Topom) SyncActionComplete(addr string, failed bool) error { var state string if !failed { - state = "synced" + state = models.ActionSynced } else { - state = "synced_failed" + state = models.ActionSyncedFailed } g.Servers[index].Action.State = state + // check whether the master is offline through heartbeat, if so, select a new master + g.Servers[index].State = models.GroupServerStateOffline + return s.storeUpdateGroup(g) } @@ -665,21 +771,16 @@ func (s *Topom) newSyncActionExecutor(addr string) (func() error, error) { return nil, nil } - var master = "NO:ONE" + var masterAddr string if index != 0 { - master = g.Servers[0].Addr + masterAddr = g.Servers[0].Addr } + return func() error { - c, err := redis.NewClient(addr, s.config.ProductAuth, time.Minute*30) - if err != nil { - log.WarnErrorf(err, "create redis client to %s failed", addr) - return err + if index != 0 { + return updateMasterToNewOne(addr, masterAddr, s.config.ProductAuth) + } else { + return promoteServerToNewMaster(addr, s.config.ProductAuth) } - defer c.Close() - if err := c.SetMaster(master); err != nil { - log.WarnErrorf(err, "redis %s set master to %s failed", addr, master) - return err - } - return nil }, nil } diff --git a/codis/pkg/topom/topom_sentinel.go b/codis/pkg/topom/topom_sentinel.go index 88a20403c..3ea8b3cd9 100644 --- a/codis/pkg/topom/topom_sentinel.go +++ b/codis/pkg/topom/topom_sentinel.go @@ -4,14 +4,11 @@ package topom import ( - "time" - "pika/codis/v2/pkg/models" - "pika/codis/v2/pkg/utils/log" "pika/codis/v2/pkg/utils/redis" ) -func (s *Topom) CheckAndSwitchSlavesAndMasters(filter func(index int, g *models.GroupServer) bool) error { +func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *models.GroupServer) bool) error { s.mu.Lock() defer s.mu.Unlock() ctx, err := s.newContext() @@ -19,110 +16,114 @@ func (s *Topom) CheckAndSwitchSlavesAndMasters(filter func(index int, g *models. return err } - config := &redis.MonitorConfig{ - Quorum: s.config.SentinelQuorum, - ParallelSyncs: s.config.SentinelParallelSyncs, - DownAfter: s.config.SentinelDownAfter.Duration(), - FailoverTimeout: s.config.SentinelFailoverTimeout.Duration(), - NotificationScript: s.config.SentinelNotificationScript, - ClientReconfigScript: s.config.SentinelClientReconfigScript, - } - - sentinel := redis.NewCodisSentinel(s.config.ProductName, s.config.ProductAuth) - gs := make(map[int][]*models.GroupServer) - for gid, servers := range ctx.getGroupServers() { - for i, server := range servers { - if filter(i, server) { - if val, ok := gs[gid]; ok { - gs[gid] = append(val, server) - } else { - gs[gid] = []*models.GroupServer{server} - } - } - } - } - if len(gs) == 0 { + groupServers := filterGroupServer(ctx.getGroupServers(), filter) + if len(groupServers) == 0 { return nil } - states := sentinel.RefreshMastersAndSlavesClient(config.ParallelSyncs, gs) - - var pending []*models.Group - + states := checkGroupServersReplicationState(s.Config(), groupServers) + var slaveOfflineGroups []*models.Group + var masterOfflineGroups []*models.Group + var recoveredGroupServersState []*redis.ReplicationState + var group *models.Group for _, state := range states { - var g *models.Group - if g, err = ctx.getGroup(state.GroupID); err != nil { + group, err = ctx.getGroup(state.GroupID) + if err != nil { return err } - serversMap := g.GetServersMap() - if len(serversMap) == 0 { - continue - } + s.checkAndUpdateGroupServerState(s.Config(), group, state.Server, state, &slaveOfflineGroups, + &masterOfflineGroups, &recoveredGroupServersState) + } - // It was the master node before, the master node hangs up, and it is currently the master node - if state.Index == 0 && state.Err != nil && g.Servers[0].Addr == state.Addr { - if g.Servers[0].State == models.GroupServerStateNormal { - g.Servers[0].State = models.GroupServerStateSubjectiveOffline - } else { - // update retries - g.Servers[0].ReCallTimes++ - - // Retry more than config times, start election - if g.Servers[0].ReCallTimes >= s.Config().SentinelMasterDeadCheckTimes { - // Mark enters objective offline state - g.Servers[0].State = models.GroupServerStateOffline - g.Servers[0].ReplicaGroup = false - } - // Start the election master node - if g.Servers[0].State == models.GroupServerStateOffline { - pending = append(pending, g) - } - } - } + if len(slaveOfflineGroups) > 0 { + // slave has been offline, and update state + s.updateSlaveOfflineGroups(ctx, slaveOfflineGroups) + } - // Update the offset information of the state and role nodes - if val, ok := serversMap[state.Addr]; ok { - if state.Err != nil { - if val.State == models.GroupServerStateNormal { - val.State = models.GroupServerStateSubjectiveOffline - } - continue + if len(masterOfflineGroups) > 0 { + // old master offline already, auto switch to new master + s.trySwitchGroupsToNewMaster(ctx, masterOfflineGroups) + } + + if len(recoveredGroupServersState) > 0 { + // offline GroupServer's service has recovered, check and fix it's master-slave replication relationship + s.tryFixReplicationRelationships(ctx, recoveredGroupServersState) + } + + return nil +} + +func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group, groupServer *models.GroupServer, + state *redis.ReplicationState, slaveOfflineGroups *[]*models.Group, masterOfflineGroups *[]*models.Group, + recoveredGroupServers *[]*redis.ReplicationState) { + if state.Err != nil { + if groupServer.State == models.GroupServerStateNormal { + // pre offline + groupServer.State = models.GroupServerStateSubjectiveOffline + } else { + // update retries + groupServer.ReCallTimes++ + + // Retry more than config times, start election + if groupServer.ReCallTimes >= conf.SentinelMasterDeadCheckTimes { + // Mark enters objective offline state + groupServer.State = models.GroupServerStateOffline + groupServer.Action.State = models.ActionNothing + groupServer.ReplicaGroup = false } - val.State = models.GroupServerStateNormal - val.ReCallTimes = 0 - val.Role = state.Replication.Role - if val.Role == "master" { - val.ReplyOffset = state.Replication.MasterReplOffset + // Start the election master node + if groupServer.State == models.GroupServerStateOffline && isGroupMaster(state, group) { + *masterOfflineGroups = append(*masterOfflineGroups, group) } else { - val.ReplyOffset = state.Replication.SlaveReplOffset + *slaveOfflineGroups = append(*slaveOfflineGroups, group) } } + } else { + if groupServer.State == models.GroupServerStateOffline { + *recoveredGroupServers = append(*recoveredGroupServers, state) + // update GroupServer to GroupServerStateNormal state later + } else { + // Update the offset information of the state and role nodes + groupServer.State = models.GroupServerStateNormal + groupServer.ReCallTimes = 0 + groupServer.ReplicaGroup = true + groupServer.Role = models.GroupServerRole(state.Replication.Role) + groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum + groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.Action.State = models.ActionSynced + } } +} - if len(pending) == 0 { - return nil +func checkGroupServersReplicationState(conf *Config, gs map[int][]*models.GroupServer) []*redis.ReplicationState { + config := &redis.MonitorConfig{ + Quorum: conf.SentinelQuorum, + ParallelSyncs: conf.SentinelParallelSyncs, + DownAfter: conf.SentinelDownAfter.Duration(), + FailoverTimeout: conf.SentinelFailoverTimeout.Duration(), + NotificationScript: conf.SentinelNotificationScript, + ClientReconfigScript: conf.SentinelClientReconfigScript, } - cache := &redis.InfoCache{ - Auth: s.config.ProductAuth, Timeout: time.Millisecond * 100, - } - // Try to switch master slave - for _, g := range pending { - if err = s.trySwitchGroupMaster(g.Id, cache); err != nil { - log.Errorf("gid-[%d] switch master failed, %v", g.Id, err) - continue - } + sentinel := redis.NewCodisSentinel(conf.ProductName, conf.ProductAuth) + return sentinel.RefreshMastersAndSlavesClient(config.ParallelSyncs, gs) +} - slots := ctx.getSlotMappingsByGroupId(g.Id) - // Notify all servers to update slot information - if err = s.resyncSlotMappings(ctx, slots...); err != nil { - log.Warnf("group-[%d] resync-rollback to preparing", g.Id) - continue +func filterGroupServer(groupServers map[int][]*models.GroupServer, + filter func(index int, gs *models.GroupServer) bool) map[int][]*models.GroupServer { + filteredGroupServers := make(map[int][]*models.GroupServer) + for gid, servers := range groupServers { + for i, server := range servers { + if filter(i, server) { + if val, ok := filteredGroupServers[gid]; ok { + filteredGroupServers[gid] = append(val, server) + } else { + filteredGroupServers[gid] = []*models.GroupServer{server} + } + } } - s.dirtyGroupCache(g.Id) } - - return nil + return filteredGroupServers } diff --git a/codis/pkg/topom/topom_stats.go b/codis/pkg/topom/topom_stats.go index 9186e05a1..d9538cc7a 100644 --- a/codis/pkg/topom/topom_stats.go +++ b/codis/pkg/topom/topom_stats.go @@ -167,7 +167,7 @@ func (s *Topom) newMastersAndSlavesStats(timeout time.Duration, filter func(inde go func() { defer close(ch) - err := s.CheckAndSwitchSlavesAndMasters(filter) + err := s.CheckStateAndSwitchSlavesAndMasters(filter) if err != nil { log.Errorf("refresh masters and slaves failed, %v", err) stats.Error = err @@ -189,19 +189,31 @@ func (s *Topom) CheckMastersAndSlavesState(timeout time.Duration) (*sync.WaitGro wg := &sync.WaitGroup{} wg.Add(1) go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { - return index != 0 || g.State == models.GroupServerStateNormal + return g.State == models.GroupServerStateNormal }, wg) return wg, nil } -func (s *Topom) CheckPreOffineMastersState(timeout time.Duration) (*sync.WaitGroup, error) { +func (s *Topom) CheckPreOfflineMastersState(timeout time.Duration) (*sync.WaitGroup, error) { s.mu.Lock() defer s.mu.Unlock() wg := &sync.WaitGroup{} wg.Add(1) go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { - return index == 0 && g.State != models.GroupServerStateNormal + return g.State == models.GroupServerStateSubjectiveOffline + }, wg) + return wg, nil +} + +func (s *Topom) CheckOfflineMastersAndSlavesState(timeout time.Duration) (*sync.WaitGroup, error) { + s.mu.Lock() + defer s.mu.Unlock() + + wg := &sync.WaitGroup{} + wg.Add(1) + go s.newMastersAndSlavesStats(timeout, func(index int, g *models.GroupServer) bool { + return g.State == models.GroupServerStateOffline }, wg) return wg, nil } diff --git a/codis/pkg/utils/redis/client.go b/codis/pkg/utils/redis/client.go index 21ae9e83b..5f751321d 100644 --- a/codis/pkg/utils/redis/client.go +++ b/codis/pkg/utils/redis/client.go @@ -46,7 +46,7 @@ func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout), }...) if err != nil { - return nil, errors.Trace(err) + return nil, err } return &Client{ conn: c, Addr: addr, Auth: auth, @@ -203,11 +203,16 @@ func (c *Client) InfoReplication() (*InfoReplication, error) { return nil, errors.Trace(err) } + return parseInfoReplication(text) +} + +func parseInfoReplication(text string) (*InfoReplication, error) { var ( info = make(map[string]string) slaveMap = make([]map[string]string, 0) infoReplication InfoReplication slaves []InfoSlave + err error ) for _, line := range strings.Split(text, "\n") { @@ -231,6 +236,21 @@ func (c *Client) InfoReplication() (*InfoReplication, error) { } slaveMap = append(slaveMap, slave) + } else if strings.HasPrefix(key, "db0") { + // consider only the case of having one DB (db0) + kvArray := strings.Split(kv[1], ",") + for _, kvStr := range kvArray { + subKvArray := strings.Split(kvStr, "=") + if len(subKvArray) != 2 { + continue + } + + if subKvArray[0] == "binlog_offset" { + fileNumAndOffset := strings.Split(subKvArray[1], " ") + info["binlog_file_num"] = strings.TrimSpace(fileNumAndOffset[0]) + info["binlog_offset"] = strings.TrimSpace(fileNumAndOffset[1]) + } + } } else { info[key] = strings.TrimSpace(kv[1]) } @@ -306,7 +326,7 @@ func (c *Client) InfoFullv2() (map[string]string, error) { } } -func (c *Client) SetMaster(master string) error { +func (c *Client) SetMaster(master string, force bool) error { if master == "" || strings.ToUpper(master) == "NO:ONE" { if _, err := c.Do("SLAVEOF", "NO", "ONE"); err != nil { return err @@ -319,8 +339,15 @@ func (c *Client) SetMaster(master string) error { if _, err := c.Do("CONFIG", "set", "masterauth", c.Auth); err != nil { return err } - if _, err := c.Do("SLAVEOF", host, port); err != nil { - return err + + if force { + if _, err := c.Do("SLAVEOF", host, port, "-f"); err != nil { + return err + } + } else { + if _, err := c.Do("SLAVEOF", host, port); err != nil { + return err + } } } if _, err := c.Do("CONFIG", "REWRITE"); err != nil { diff --git a/codis/pkg/utils/redis/client_test.go b/codis/pkg/utils/redis/client_test.go index db726f2b4..f867ee917 100644 --- a/codis/pkg/utils/redis/client_test.go +++ b/codis/pkg/utils/redis/client_test.go @@ -1,63 +1,53 @@ package redis import ( - "encoding/json" "fmt" - "regexp" - "strings" "testing" + + "github.com/stretchr/testify/assert" ) -func TestKk(t *testing.T) { - ok, err := regexp.Match("slave[0-9]+", []byte("slave_01")) +func TestMasterInfoReplication(t *testing.T) { + text := ` +# Replication(MASTER) +role:master +ReplicationID:94e8feeaf9036a77c59ad2f091f1c0b0858047f06fa1e09afa +connected_slaves:1 +slave0:ip=10.224.129.104,port=9971,conn_fd=104,lag=(db0:0) +db0:binlog_offset=2 384,safety_purge=none +` + res, err := parseInfoReplication(text) + if err != nil { + fmt.Println(err) + return + } - fmt.Sprintln(ok, err) + assert.Equal(t, res.DbBinlogFileNum, uint64(2), "db0 binlog file_num not right") + assert.Equal(t, res.DbBinlogOffset, uint64(384), "db0 binlog offset not right") + assert.Equal(t, len(res.Slaves), 1, "slaves numbers not right") + assert.Equal(t, res.Slaves[0].IP, "10.224.129.104", "slave0 IP not right") + assert.Equal(t, res.Slaves[0].Port, "9971", "slave0 Port not right") } -func TestParseInfo(t *testing.T) { - text := "# Replication\nrole:master\nconnected_slaves:1\nslave0:ip=10.174.22.228,port=9225,state=online,offset=2175592,lag=0\nmaster_repl_offset:2175592\nrepl_backlog_active:1\nrepl_backlog_size:1048576\nrepl_backlog_first_byte_offset:1127017\nrepl_backlog_histlen:1048576\n" - info := make(map[string]string) - slaveMap := make([]map[string]string, 0) - var slaves []InfoSlave - var infoReplication InfoReplication - - for _, line := range strings.Split(text, "\n") { - kv := strings.SplitN(line, ":", 2) - if len(kv) != 2 { - continue - } - - if key := strings.TrimSpace(kv[0]); key != "" { - if ok, _ := regexp.Match("slave[0-9]+", []byte(key)); ok { - slaveKvs := strings.Split(kv[1], ",") - - slave := make(map[string]string) - for _, slaveKvStr := range slaveKvs { - slaveKv := strings.Split(slaveKvStr, "=") - if len(slaveKv) != 2 { - continue - } - slave[slaveKv[0]] = slaveKv[1] - } - - slaveMap = append(slaveMap, slave) - } else { - info[key] = strings.TrimSpace(kv[1]) - } - } - } - if len(slaveMap) > 0 { - slavesStr, _ := json.Marshal(slaveMap) - err := json.Unmarshal(slavesStr, &slaves) - - _ = err - info["slaveMap"] = string(slavesStr) +func TestSlaveInfoReplication(t *testing.T) { + text := ` +# Replication(SLAVE) +role:slave +ReplicationID:94e8feeaf9036a77c59ad2f091f1c0b0858047f06fa1e09afa +master_host:10.224.129.40 +master_port:9971 +master_link_status:up +slave_priority:100 +slave_read_only:1 +db0:binlog_offset=1 284,safety_purge=none +` + res, err := parseInfoReplication(text) + if err != nil { + fmt.Println(err) + return } - str, _ := json.Marshal(info) - err := json.Unmarshal(str, &infoReplication) - infoReplication.Slaves = slaves - - _ = err - fmt.Println(err) + assert.Equal(t, res.DbBinlogFileNum, uint64(1), "db0 binlog file_num not right") + assert.Equal(t, res.DbBinlogOffset, uint64(284), "db0 binlog offset not right") + assert.Equal(t, len(res.Slaves), 0) } diff --git a/codis/pkg/utils/redis/codis_sentinel.go b/codis/pkg/utils/redis/codis_sentinel.go index 0b8b150eb..4d1ce73be 100644 --- a/codis/pkg/utils/redis/codis_sentinel.go +++ b/codis/pkg/utils/redis/codis_sentinel.go @@ -108,6 +108,7 @@ func (s *CodisSentinel) RefreshMastersAndSlavesClient(parallel int, groupServers Index: index, GroupID: gid, Addr: server.Addr, + Server: server, Replication: info, Err: err, } diff --git a/codis/pkg/utils/redis/sentinel.go b/codis/pkg/utils/redis/sentinel.go index e71155c06..c76c4d7f6 100644 --- a/codis/pkg/utils/redis/sentinel.go +++ b/codis/pkg/utils/redis/sentinel.go @@ -5,8 +5,11 @@ package redis import ( "encoding/json" + "net" "strconv" "time" + + "pika/codis/v2/pkg/models" ) type SentinelMaster struct { @@ -66,8 +69,9 @@ type InfoReplication struct { ConnectedSlaves int `json:"connected_slaves"` MasterHost string `json:"master_host"` MasterPort string `json:"master_port"` - SlaveReplOffset int `json:"slave_repl_offset"` - MasterReplOffset int `json:"master_repl_offset"` + MasterLinkStatus string `json:"master_link_status"` // down; up + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 Slaves []InfoSlave `json:"-"` } @@ -75,10 +79,19 @@ type ReplicationState struct { GroupID int Index int Addr string + Server *models.GroupServer Replication *InfoReplication Err error } +func (i *InfoReplication) GetMasterAddr() string { + if len(i.MasterHost) == 0 { + return "" + } + + return net.JoinHostPort(i.MasterHost, i.MasterPort) +} + func (i *InfoReplication) UnmarshalJSON(b []byte) error { var kvmap map[string]string if err := json.Unmarshal(b, &kvmap); err != nil { @@ -90,18 +103,23 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error { i.ConnectedSlaves = intval } } - if val, ok := kvmap["slave_repl_offset"]; ok { - if intval, err := strconv.Atoi(val); err == nil { - i.SlaveReplOffset = intval + + i.Role = kvmap["role"] + i.MasterPort = kvmap["master_host"] + i.MasterHost = kvmap["master_port"] + i.MasterLinkStatus = kvmap["master_link_status"] + + if val, ok := kvmap["binlog_file_num"]; ok { + if intval, err := strconv.ParseUint(val, 10, 64); err == nil { + i.DbBinlogFileNum = intval } } - if val, ok := kvmap["master_repl_offset"]; ok { - if intval, err := strconv.Atoi(val); err == nil { - i.MasterReplOffset = intval + + if val, ok := kvmap["binlog_offset"]; ok { + if intval, err := strconv.ParseUint(val, 10, 64); err == nil { + i.DbBinlogOffset = intval } } - i.Role = kvmap["role"] - i.MasterPort = kvmap["master_host"] - i.MasterHost = kvmap["master_port"] + return nil }