diff --git a/codis/pkg/models/group.go b/codis/pkg/models/group.go index 092ec2f117..3c2009452c 100644 --- a/codis/pkg/models/group.go +++ b/codis/pkg/models/group.go @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) { var newMasterIndex = -1 for index, server := range g.Servers { - if index == 0 || server.State != GroupServerStateNormal { + if index == 0 || server.State != GroupServerStateNormal || !server.IsEligibleForMasterElection { continue } @@ -84,8 +84,9 @@ type GroupServer struct { // master or slave Role GroupServerRole `json:"role"` // If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field - DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 - DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"` // Monitoring status, 0 normal, 1 subjective offline, 2 actual offline // If marked as 2 , no service is provided diff --git a/codis/pkg/topom/topom_group.go b/codis/pkg/topom/topom_group.go index 341e494c90..98f2d3c537 100644 --- a/codis/pkg/topom/topom_group.go +++ b/codis/pkg/topom/topom_group.go @@ -403,6 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer * groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection groupServer.Action.State = models.ActionSynced err = s.storeUpdateGroup(group) // clean cache whether err is nil or not @@ -531,7 +532,12 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa continue } - err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + if server.IsEligibleForMasterElection { + err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth) + } else { + err = updateMasterToNewOneForcefully(server.Addr, newMasterAddr, s.config.ProductAuth) + } + if err != nil { // skip err, and retry to update master-slave replication relationship through next heartbeat check err = nil @@ -548,14 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa } func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) { + log.Infof("[%s] switch master to server [%s]", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, false) } func promoteServerToNewMaster(serverAddr, auth string) (err error) { + log.Infof("[%s] switch master to NO:ONE", serverAddr) return setNewRedisMaster(serverAddr, "NO:ONE", auth, false) } func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) { + log.Infof("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr) return setNewRedisMaster(serverAddr, masterAddr, auth, true) } diff --git a/codis/pkg/topom/topom_sentinel.go b/codis/pkg/topom/topom_sentinel.go index 9d4299583e..807b9019a1 100644 --- a/codis/pkg/topom/topom_sentinel.go +++ b/codis/pkg/topom/topom_sentinel.go @@ -48,7 +48,7 @@ func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *mo if len(recoveredGroupServersState) > 0 { // offline GroupServer's service has recovered, check and fix it's master-slave replication relationship - s.tryFixReplicationRelationships(ctx, recoveredGroupServersState,len(masterOfflineGroups)) + s.tryFixReplicationRelationships(ctx, recoveredGroupServersState, len(masterOfflineGroups)) } return nil @@ -92,6 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group groupServer.Role = models.GroupServerRole(state.Replication.Role) groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset + groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection groupServer.Action.State = models.ActionSynced } } diff --git a/codis/pkg/utils/redis/sentinel.go b/codis/pkg/utils/redis/sentinel.go index c76c4d7f68..0415ede6f1 100644 --- a/codis/pkg/utils/redis/sentinel.go +++ b/codis/pkg/utils/redis/sentinel.go @@ -65,14 +65,15 @@ func (i *InfoSlave) UnmarshalJSON(b []byte) error { } type InfoReplication struct { - Role string `json:"role"` - ConnectedSlaves int `json:"connected_slaves"` - MasterHost string `json:"master_host"` - MasterPort string `json:"master_port"` - MasterLinkStatus string `json:"master_link_status"` // down; up - DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 - DbBinlogOffset uint64 `json:"binlog_offset"` // db0 - Slaves []InfoSlave `json:"-"` + Role string `json:"role"` + ConnectedSlaves int `json:"connected_slaves"` + MasterHost string `json:"master_host"` + MasterPort string `json:"master_port"` + MasterLinkStatus string `json:"master_link_status"` // down; up + DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0 + DbBinlogOffset uint64 `json:"binlog_offset"` // db0 + IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"` + Slaves []InfoSlave `json:"-"` } type ReplicationState struct { @@ -108,6 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error { i.MasterPort = kvmap["master_host"] i.MasterHost = kvmap["master_port"] i.MasterLinkStatus = kvmap["master_link_status"] + i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true" if val, ok := kvmap["binlog_file_num"]; ok { if intval, err := strconv.ParseUint(val, 10, 64); err == nil { diff --git a/conf/pika.conf b/conf/pika.conf index 090f719964..496d974174 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -630,3 +630,10 @@ cache-lfu-decay-time: 1 # # Example: # rename-command : FLUSHDB 360flushdb + +# [You can ignore this item] +# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage. +# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' +# which serves for the scenario of codis-pika cluster reelection +# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] +internal-used-unfinished-full-sync : \ No newline at end of file diff --git a/include/pika_conf.h b/include/pika_conf.h index 2c0cb17d09..bec1ae14ca 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -439,6 +439,7 @@ class PikaConf : public pstd::BaseConf { int64_t rsync_timeout_ms() { return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed); } + // Slow Commands configuration const std::string GetSlowCmd() { std::shared_lock l(rwlock_); @@ -825,6 +826,7 @@ class PikaConf : public pstd::BaseConf { } int64_t cache_maxmemory() { return cache_maxmemory_; } + void SetSlowCmd(const std::string& value) { std::lock_guard l(rwlock_); std::string lower_value = value; @@ -841,6 +843,40 @@ class PikaConf : public pstd::BaseConf { pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_); } + void SetInternalUsedUnFinishedFullSync(const std::string& value) { + std::lock_guard l(rwlock_); + std::string lower_value = value; + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + pstd::StringSplit2Set(lower_value, ',', internal_used_unfinished_full_sync_); + } + + void AddInternalUsedUnfinishedFullSync(const std::string& db_name) { + { + std::lock_guard l(rwlock_); + internal_used_unfinished_full_sync_.insert(db_name); + std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ','); + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + } + ConfigRewrite(); + } + + void RemoveInternalUsedUnfinishedFullSync(const std::string& db_name) { + { + std::lock_guard l(rwlock_); + internal_used_unfinished_full_sync_.erase(db_name); + std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ','); + pstd::StringToLower(lower_value); + TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value); + } + ConfigRewrite(); + } + + size_t GetUnfinishedFullSyncCount() { + std::shared_lock l(rwlock_); + return internal_used_unfinished_full_sync_.size(); + } void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_direction() { return zset_cache_start_direction_; } @@ -1014,6 +1050,9 @@ class PikaConf : public pstd::BaseConf { int throttle_bytes_per_second_ = 200 << 20; // 200MB/s int max_rsync_parallel_num_ = kMaxRsyncParallelNum; std::atomic_int64_t rsync_timeout_ms_ = 1000; + + //Internal used metrics Persisted by pika.conf + std::unordered_set internal_used_unfinished_full_sync_; }; #endif diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 81a5a36ab6..870aaf965a 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1058,6 +1058,7 @@ void InfoCmd::InfoReplication(std::string& info) { std::stringstream tmp_stream; std::stringstream out_of_sync; std::stringstream repl_connect_status; + int32_t syncing_full_count = 0; bool all_db_sync = true; std::shared_lock db_rwl(g_pika_server->dbs_rw_); for (const auto& db_item : g_pika_server->GetDB()) { @@ -1077,6 +1078,7 @@ void InfoCmd::InfoReplication(std::string& info) { } else if (slave_db->State() == ReplState::kWaitDBSync) { out_of_sync << "WaitDBSync)"; repl_connect_status << "syncing_full"; + ++syncing_full_count; } else if (slave_db->State() == ReplState::kError) { out_of_sync << "Error)"; repl_connect_status << "error"; @@ -1151,6 +1153,13 @@ void InfoCmd::InfoReplication(std::string& info) { << slaves_list_str; } + //if current instance is syncing full or has full sync corrupted, it's not qualified to be a new master + if (syncing_full_count == 0 && g_pika_conf->GetUnfinishedFullSyncCount() == 0) { + tmp_stream << "is_eligible_for_master_election:true" << "\r\n"; + } else { + tmp_stream << "is_eligible_for_master_election:false" << "\r\n"; + } + Status s; uint32_t filenum = 0; uint64_t offset = 0; @@ -3321,6 +3330,7 @@ void ClearReplicationIDCmd::DoInitial() { void ClearReplicationIDCmd::Do() { g_pika_conf->SetReplicationID(""); + g_pika_conf->SetInternalUsedUnFinishedFullSync(""); g_pika_conf->ConfigRewriteReplicationID(); res_.SetRes(CmdRes::kOk, "ReplicationID is cleared"); } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 95ef53391b..913f669880 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -185,6 +185,14 @@ int PikaConf::Load() { SetAdminCmd(admin_cmd_list); } + std::string unfinished_full_sync; + GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync); + if (replication_id_.empty()) { + unfinished_full_sync.clear(); + } + SetInternalUsedUnFinishedFullSync(unfinished_full_sync); + + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; @@ -701,6 +709,7 @@ int PikaConf::Load() { } else { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); } + return ret; } @@ -774,6 +783,7 @@ int PikaConf::ConfigRewrite() { SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_); SetConfInt("slave-priority", slave_priority_); SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_); + SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ',')); SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_); SetConfInt("sync-window-size", sync_window_size_.load()); SetConfInt("consensus-level", consensus_level_.load()); @@ -828,6 +838,7 @@ int PikaConf::ConfigRewrite() { int PikaConf::ConfigRewriteReplicationID() { std::lock_guard l(rwlock_); SetConfStr("replication-id", replication_id_); + SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ',')); if (!diff_commands_.empty()) { std::vector filtered_items; for (const auto& diff_command : diff_commands_) { diff --git a/src/pika_db.cc b/src/pika_db.cc index 0905206fcd..add26c66c1 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -483,6 +483,10 @@ bool DB::TryUpdateMasterOffset() { } master_db->Logger()->SetProducerStatus(filenum, offset); slave_db->SetReplState(ReplState::kTryConnect); + + //now full sync is finished, remove unfinished full sync count + g_pika_conf->RemoveInternalUsedUnfinishedFullSync(slave_db->DBName()); + return true; } diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 6c8e001bf1..8fb30d9306 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -184,6 +184,9 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { slave_db->StopRsync(); slave_db->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "DB: " << db_name << " Need Wait To Sync"; + + //now full sync is starting, add an unfinished full sync count + g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName()); } void PikaReplClientConn::HandleTrySyncResponse(void* arg) {