Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add metric is_eligible_for_master_election to support reelection decision in codis-sentinel #2766

Merged
Merged
7 changes: 4 additions & 3 deletions codis/pkg/models/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (g *Group) SelectNewMaster() (string, int) {
var newMasterIndex = -1

for index, server := range g.Servers {
if index == 0 || server.State != GroupServerStateNormal {
if index == 0 || server.State != GroupServerStateNormal || !server.IsEligibleForMasterElection {
continue
}

Expand Down Expand Up @@ -84,8 +84,9 @@ type GroupServer struct {
// master or slave
Role GroupServerRole `json:"role"`
// If it is a master node, take the master_repl_offset field, otherwise take the slave_repl_offset field
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`

// Monitoring status, 0 normal, 1 subjective offline, 2 actual offline
// If marked as 2 , no service is provided
Expand Down
11 changes: 10 additions & 1 deletion codis/pkg/topom/topom_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (s *Topom) tryFixReplicationRelationship(group *models.Group, groupServer *
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
err = s.storeUpdateGroup(group)
// clean cache whether err is nil or not
Expand Down Expand Up @@ -531,7 +532,12 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
continue
}

err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
if server.IsEligibleForMasterElection {
err = updateMasterToNewOne(server.Addr, newMasterAddr, s.config.ProductAuth)
} else {
err = updateMasterToNewOneForcefully(server.Addr, newMasterAddr, s.config.ProductAuth)
}

if err != nil {
// skip err, and retry to update master-slave replication relationship through next heartbeat check
err = nil
Expand All @@ -548,14 +554,17 @@ func (s *Topom) doSwitchGroupMaster(g *models.Group, newMasterAddr string, newMa
}

func updateMasterToNewOne(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s]", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, false)
}

func promoteServerToNewMaster(serverAddr, auth string) (err error) {
log.Infof("[%s] switch master to NO:ONE", serverAddr)
return setNewRedisMaster(serverAddr, "NO:ONE", auth, false)
}

func updateMasterToNewOneForcefully(serverAddr, masterAddr string, auth string) (err error) {
log.Infof("[%s] switch master to server [%s] forcefully", serverAddr, masterAddr)
return setNewRedisMaster(serverAddr, masterAddr, auth, true)
}

Expand Down
3 changes: 2 additions & 1 deletion codis/pkg/topom/topom_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Topom) CheckStateAndSwitchSlavesAndMasters(filter func(index int, g *mo

if len(recoveredGroupServersState) > 0 {
// offline GroupServer's service has recovered, check and fix it's master-slave replication relationship
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState,len(masterOfflineGroups))
s.tryFixReplicationRelationships(ctx, recoveredGroupServersState, len(masterOfflineGroups))
}

return nil
Expand Down Expand Up @@ -92,6 +92,7 @@ func (s *Topom) checkAndUpdateGroupServerState(conf *Config, group *models.Group
groupServer.Role = models.GroupServerRole(state.Replication.Role)
groupServer.DbBinlogFileNum = state.Replication.DbBinlogFileNum
groupServer.DbBinlogOffset = state.Replication.DbBinlogOffset
groupServer.IsEligibleForMasterElection = state.Replication.IsEligibleForMasterElection
groupServer.Action.State = models.ActionSynced
}
}
Expand Down
18 changes: 10 additions & 8 deletions codis/pkg/utils/redis/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func (i *InfoSlave) UnmarshalJSON(b []byte) error {
}

type InfoReplication struct {
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
Slaves []InfoSlave `json:"-"`
Role string `json:"role"`
ConnectedSlaves int `json:"connected_slaves"`
MasterHost string `json:"master_host"`
MasterPort string `json:"master_port"`
MasterLinkStatus string `json:"master_link_status"` // down; up
DbBinlogFileNum uint64 `json:"binlog_file_num"` // db0
DbBinlogOffset uint64 `json:"binlog_offset"` // db0
IsEligibleForMasterElection bool `json:"is_eligible_for_master_election"`
Slaves []InfoSlave `json:"-"`
}

type ReplicationState struct {
Expand Down Expand Up @@ -108,6 +109,7 @@ func (i *InfoReplication) UnmarshalJSON(b []byte) error {
i.MasterPort = kvmap["master_host"]
i.MasterHost = kvmap["master_port"]
i.MasterLinkStatus = kvmap["master_link_status"]
i.IsEligibleForMasterElection = kvmap["is_eligible_for_master_election"] == "true"

if val, ok := kvmap["binlog_file_num"]; ok {
if intval, err := strconv.ParseUint(val, 10, 64); err == nil {
Expand Down
7 changes: 7 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,10 @@ cache-lfu-decay-time: 1
#
# Example:
# rename-command : FLUSHDB 360flushdb

# [You can ignore this item]
# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage.
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
39 changes: 39 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ class PikaConf : public pstd::BaseConf {
int64_t rsync_timeout_ms() {
return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed);
}

// Slow Commands configuration
const std::string GetSlowCmd() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -825,6 +826,7 @@ class PikaConf : public pstd::BaseConf {
}

int64_t cache_maxmemory() { return cache_maxmemory_; }

void SetSlowCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
Expand All @@ -841,6 +843,40 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetInternalUsedUnFinishedFullSync(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
pstd::StringSplit2Set(lower_value, ',', internal_used_unfinished_full_sync_);
}

void AddInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.insert(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}
Comment on lines +854 to +863
Copy link

@coderabbitai coderabbitai bot Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic management of sync configurations.

The AddInternalUsedUnfinishedFullSync method dynamically updates the internal set and rewrites the configuration. This approach is flexible but could lead to performance issues if called frequently due to the repeated calls to ConfigRewrite.

- ConfigRewrite();
+ // Consider deferring the rewrite operation or batching updates to improve performance.

Evaluate the impact of these operations on system performance, especially under high load, and consider optimizing by batching or deferring updates.

Committable suggestion was skipped due to low confidence.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigRewrite() is not that expensive although it write files, and more important, AddInternalUsedUnfinishedFullSync(const std::string& db_name) will not be called frequently and which is the main reason of no need of worry about performance

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheniujh, thank you for the clarification. It's good to know that ConfigRewrite() is not expensive and AddInternalUsedUnfinishedFullSync will not be called frequently. This alleviates concerns about potential performance issues.


Learnings added
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: include/pika_conf.h:854-863
Timestamp: 2024-07-03T02:45:10.624Z
Learning: The `ConfigRewrite()` function is not expensive, even though it writes files, and the `AddInternalUsedUnfinishedFullSync(const std::string& db_name)` method will not be called frequently, thus minimizing performance concerns.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


Learnings used
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: src/pika_conf.cc:841-841
Timestamp: 2024-07-03T02:23:46.695Z
Learning: The `internal-used-unfinished-full-sync` configuration is related to `replicationID`, which is why it is handled together in the `ConfigRewriteReplicationID` method.

Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: src/pika_conf.cc:188-193
Timestamp: 2024-07-03T02:24:14.456Z
Learning: The `internal-used-unfinished-full-sync` configuration is handled without conditional clearing based on `replication_id_`.


void RemoveInternalUsedUnfinishedFullSync(const std::string& db_name) {
{
std::lock_guard l(rwlock_);
internal_used_unfinished_full_sync_.erase(db_name);
std::string lower_value = pstd::Set2String(internal_used_unfinished_full_sync_, ',');
pstd::StringToLower(lower_value);
TryPushDiffCommands("internal-used-unfinished-full-sync", lower_value);
}
ConfigRewrite();
}

size_t GetUnfinishedFullSyncCount() {
std::shared_lock l(rwlock_);
return internal_used_unfinished_full_sync_.size();
}
void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand Down Expand Up @@ -1014,6 +1050,9 @@ class PikaConf : public pstd::BaseConf {
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;
};

#endif
10 changes: 10 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ void InfoCmd::InfoReplication(std::string& info) {
std::stringstream tmp_stream;
std::stringstream out_of_sync;
std::stringstream repl_connect_status;
int32_t syncing_full_count = 0;
bool all_db_sync = true;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
for (const auto& db_item : g_pika_server->GetDB()) {
Expand All @@ -1077,6 +1078,7 @@ void InfoCmd::InfoReplication(std::string& info) {
} else if (slave_db->State() == ReplState::kWaitDBSync) {
out_of_sync << "WaitDBSync)";
repl_connect_status << "syncing_full";
++syncing_full_count;
} else if (slave_db->State() == ReplState::kError) {
out_of_sync << "Error)";
repl_connect_status << "error";
Expand Down Expand Up @@ -1151,6 +1153,13 @@ void InfoCmd::InfoReplication(std::string& info) {
<< slaves_list_str;
}

//if current instance is syncing full or has full sync corrupted, it's not qualified to be a new master
if (syncing_full_count == 0 && g_pika_conf->GetUnfinishedFullSyncCount() == 0) {
tmp_stream << "is_eligible_for_master_election:true" << "\r\n";
} else {
tmp_stream << "is_eligible_for_master_election:false" << "\r\n";
}

Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
Expand Down Expand Up @@ -3321,6 +3330,7 @@ void ClearReplicationIDCmd::DoInitial() {

void ClearReplicationIDCmd::Do() {
g_pika_conf->SetReplicationID("");
g_pika_conf->SetInternalUsedUnFinishedFullSync("");
g_pika_conf->ConfigRewriteReplicationID();
res_.SetRes(CmdRes::kOk, "ReplicationID is cleared");
}
Expand Down
11 changes: 11 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ int PikaConf::Load() {
SetAdminCmd(admin_cmd_list);
}

std::string unfinished_full_sync;
GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync);
if (replication_id_.empty()) {
unfinished_full_sync.clear();
}
SetInternalUsedUnFinishedFullSync(unfinished_full_sync);
Comment on lines +188 to +193
Copy link

@coderabbitai coderabbitai bot Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure robust handling of configuration parameters.

The implementation clears the unfinished_full_sync string if replication_id_ is empty. This conditional clearing might be intended to reset the configuration under certain conditions, but it's unclear why this is tied specifically to replication_id_. This could potentially lead to unexpected behavior if replication_id_ is not directly related to the unfinished full syncs.

- if (replication_id_.empty()) {
-   unfinished_full_sync.clear();
- }

Consider documenting the rationale behind this conditional or ensuring that it aligns with the intended logic.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::string unfinished_full_sync;
GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync);
if (replication_id_.empty()) {
unfinished_full_sync.clear();
}
SetInternalUsedUnFinishedFullSync(unfinished_full_sync);
std::string unfinished_full_sync;
GetConfStr("internal-used-unfinished-full-sync", &unfinished_full_sync);
SetInternalUsedUnFinishedFullSync(unfinished_full_sync);

Copy link
Collaborator Author

@cheniujh cheniujh Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 unfinished_full_sync is related with replication_id_
2 I will add more docs/comments

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheniujh, glad you agree! I'll make sure to remember this for future reviews.


Learnings added
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: src/pika_conf.cc:188-193
Timestamp: 2024-07-03T02:24:14.326Z
Learning: The `internal-used-unfinished-full-sync` configuration is handled without conditional clearing based on `replication_id_`.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


Learnings used
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: src/pika_conf.cc:841-841
Timestamp: 2024-07-03T02:23:46.695Z
Learning: The `internal-used-unfinished-full-sync` configuration is related to `replicationID`, which is why it is handled together in the `ConfigRewriteReplicationID` method.



GetConfInt("sync-thread-num", &sync_thread_num_);
if (sync_thread_num_ <= 0) {
sync_thread_num_ = 3;
Expand Down Expand Up @@ -701,6 +709,7 @@ int PikaConf::Load() {
} else {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
}

return ret;
}

Expand Down Expand Up @@ -774,6 +783,7 @@ int PikaConf::ConfigRewrite() {
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
SetConfInt("slave-priority", slave_priority_);
SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_);
SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ','));
SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_);
SetConfInt("sync-window-size", sync_window_size_.load());
SetConfInt("consensus-level", consensus_level_.load());
Expand Down Expand Up @@ -828,6 +838,7 @@ int PikaConf::ConfigRewrite() {
int PikaConf::ConfigRewriteReplicationID() {
std::lock_guard l(rwlock_);
SetConfStr("replication-id", replication_id_);
SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ','));
Copy link

@coderabbitai coderabbitai bot Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication in handling configuration rewriting.

The method ConfigRewriteReplicationID also handles the internal-used-unfinished-full-sync, which is already handled in ConfigRewrite. This duplication could lead to maintenance challenges and bugs if the synchronization logic changes in one place but not the other.

- SetConfStr("internal-used-unfinished-full-sync", pstd::Set2String(internal_used_unfinished_full_sync_, ','));

Consider centralizing the handling of this configuration to ensure consistency and reduce code duplication.

Committable suggestion was skipped due to low confidence.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal-used-unfinished-full-sync is related with replicationID, that's why I put them together there.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheniujh, thank you for the clarification. I understand now that internal-used-unfinished-full-sync is logically related to replicationID, which justifies handling them together in ConfigRewriteReplicationID. To ensure consistency and avoid potential bugs, it would be helpful to document this relationship clearly. This way, any future changes to this logic can be applied consistently in both methods.


Learnings added
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2766
File: src/pika_conf.cc:841-841
Timestamp: 2024-07-03T02:23:46.544Z
Learning: The `internal-used-unfinished-full-sync` configuration is related to `replicationID`, which is why it is handled together in the `ConfigRewriteReplicationID` method.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

if (!diff_commands_.empty()) {
std::vector<pstd::BaseConf::Rep::ConfItem> filtered_items;
for (const auto& diff_command : diff_commands_) {
Expand Down
4 changes: 4 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ bool DB::TryUpdateMasterOffset() {
}
master_db->Logger()->SetProducerStatus(filenum, offset);
slave_db->SetReplState(ReplState::kTryConnect);

//now full sync is finished, remove unfinished full sync count
g_pika_conf->RemoveInternalUsedUnfinishedFullSync(slave_db->DBName());

return true;
}

Expand Down
3 changes: 3 additions & 0 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
slave_db->StopRsync();
slave_db->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "DB: " << db_name << " Need Wait To Sync";

//now full sync is starting, add an unfinished full sync count
g_pika_conf->AddInternalUsedUnfinishedFullSync(slave_db->DBName());
}

void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
Expand Down
Loading