From fbbc4a370a6eee3d59205cbfba444c63741a2985 Mon Sep 17 00:00:00 2001 From: kernelai Date: Wed, 6 Apr 2022 18:12:56 +0800 Subject: [PATCH] feature: support quit command (#1155) --- include/pika_admin.h | 15 + include/pika_command.h | 1 + src/pika_admin.cc | 708 +++++++++++++++++++++++++---------------- src/pika_command.cc | 2 + third/pink | 2 +- 5 files changed, 454 insertions(+), 274 deletions(-) diff --git a/include/pika_admin.h b/include/pika_admin.h index 55860e4339..305fc34fa7 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -521,4 +521,19 @@ class DummyCmd : public Cmd { virtual void DoInitial() override; }; +class QuitCmd : public Cmd { + public: + QuitCmd(const std::string& name, int arity, uint16_t flag) + : Cmd(name, arity, flag) {} + virtual void Do(std::shared_ptr partition = nullptr); + virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys) {}; + virtual void Merge() {}; + virtual Cmd* Clone() override { + return new QuitCmd(*this); + } + + private: + virtual void DoInitial() override; +}; + #endif // PIKA_ADMIN_H_ diff --git a/include/pika_command.h b/include/pika_command.h index 698d079549..f3cb3ef589 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -46,6 +46,7 @@ const std::string kCmdNameTcmalloc = "tcmalloc"; #endif const std::string kCmdNamePKPatternMatchDel = "pkpatternmatchdel"; const std::string kCmdDummy = "dummy"; +const std::string kCmdNameQuit = "quit"; //Kv const std::string kCmdNameSet = "set"; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 1e85709bd6..ed1eecd554 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -5,27 +5,27 @@ #include "include/pika_admin.h" -#include #include #include -#include "slash/include/rsync.h" +#include +#include "include/build_version.h" #include "include/pika_conf.h" -#include "include/pika_server.h" #include "include/pika_rm.h" +#include "include/pika_server.h" #include "include/pika_version.h" -#include "include/build_version.h" +#include "slash/include/rsync.h" #ifdef TCMALLOC_EXTENSION #include #endif -extern PikaServer *g_pika_server; -extern PikaConf *g_pika_conf; -extern PikaReplicaManager *g_pika_rm; +extern PikaServer* g_pika_server; +extern PikaConf* g_pika_conf; +extern PikaReplicaManager* g_pika_rm; -static std::string ConstructPinginPubSubResp(const PikaCmdArgsType &argv) { +static std::string ConstructPinginPubSubResp(const PikaCmdArgsType& argv) { if (argv.size() > 2) { return "-ERR wrong number of arguments for " + kCmdNamePing + " command\r\n"; @@ -59,9 +59,8 @@ void SlaveofCmd::DoInitial() { return; } - if (argv_.size() == 3 - && !strcasecmp(argv_[1].data(), "no") - && !strcasecmp(argv_[2].data(), "one")) { + if (argv_.size() == 3 && !strcasecmp(argv_[1].data(), "no") && + !strcasecmp(argv_[2].data(), "one")) { is_noone_ = true; return; } @@ -74,13 +73,15 @@ void SlaveofCmd::DoInitial() { master_ip_ = argv_[1]; std::string str_master_port = argv_[2]; - if (!slash::string2l(str_master_port.data(), str_master_port.size(), &master_port_) || master_port_ <= 0) { + if (!slash::string2l(str_master_port.data(), str_master_port.size(), + &master_port_) || + master_port_ <= 0) { res_.SetRes(CmdRes::kInvalidInt); return; } - if ((master_ip_ == "127.0.0.1" || master_ip_ == g_pika_server->host()) - && master_port_ == g_pika_server->port()) { + if ((master_ip_ == "127.0.0.1" || master_ip_ == g_pika_server->host()) && + master_port_ == g_pika_server->port()) { res_.SetRes(CmdRes::kErrOther, "you fucked up"); return; } @@ -96,8 +97,8 @@ void SlaveofCmd::DoInitial() { void SlaveofCmd::Do(std::shared_ptr partition) { // Check if we are already connected to the specified master - if ((master_ip_ == "127.0.0.1" || g_pika_server->master_ip() == master_ip_) - && g_pika_server->master_port() == master_port_) { + if ((master_ip_ == "127.0.0.1" || g_pika_server->master_ip() == master_ip_) && + g_pika_server->master_port() == master_port_) { res_.SetRes(CmdRes::kOk); return; } @@ -117,7 +118,8 @@ void SlaveofCmd::Do(std::shared_ptr partition) { g_pika_conf->SetSlaveof(master_ip_ + ":" + std::to_string(master_port_)); g_pika_server->SetFirstMetaSync(true); } else { - res_.SetRes(CmdRes::kErrOther, "Server is not in correct state for slaveof"); + res_.SetRes(CmdRes::kErrOther, + "Server is not in correct state for slaveof"); } } @@ -136,8 +138,8 @@ void DbSlaveofCmd::DoInitial() { res_.SetRes(CmdRes::kErrOther, "DbSlaveof only support on classic mode"); return; } - if (g_pika_server->role() ^ PIKA_ROLE_SLAVE - || !g_pika_server->MetaSyncDone()) { + if (g_pika_server->role() ^ PIKA_ROLE_SLAVE || + !g_pika_server->MetaSyncDone()) { res_.SetRes(CmdRes::kErrOther, "Not currently a slave"); return; } @@ -153,24 +155,25 @@ void DbSlaveofCmd::DoInitial() { return; } - if (argv_.size() == 3 - && !strcasecmp(argv_[2].data(), "force")) { + if (argv_.size() == 3 && !strcasecmp(argv_[2].data(), "force")) { force_sync_ = true; return; } if (argv_.size() == 4) { - if (!strcasecmp(argv_[2].data(), "no") - && !strcasecmp(argv_[3].data(), "one")) { + if (!strcasecmp(argv_[2].data(), "no") && + !strcasecmp(argv_[3].data(), "one")) { is_noone_ = true; return; } - if (!slash::string2l(argv_[2].data(), argv_[2].size(), &filenum_) || filenum_ < 0) { + if (!slash::string2l(argv_[2].data(), argv_[2].size(), &filenum_) || + filenum_ < 0) { res_.SetRes(CmdRes::kInvalidInt); return; } - if (!slash::string2l(argv_[3].data(), argv_[3].size(), &offset_) || offset_ < 0) { + if (!slash::string2l(argv_[3].data(), argv_[3].size(), &offset_) || + offset_ < 0) { res_.SetRes(CmdRes::kInvalidInt); return; } @@ -191,19 +194,20 @@ void DbSlaveofCmd::Do(std::shared_ptr partition) { // In classic mode a table has only one partition s = g_pika_rm->SendRemoveSlaveNodeRequest(db_name_, 0); } else { - if (slave_partition->State() == ReplState::kNoConnect - || slave_partition->State() == ReplState::kError - || slave_partition->State() == ReplState::kDBNoConnect) { + if (slave_partition->State() == ReplState::kNoConnect || + slave_partition->State() == ReplState::kError || + slave_partition->State() == ReplState::kDBNoConnect) { if (have_offset_) { std::shared_ptr db_partition = - g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(db_name_, 0)); + g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(db_name_, 0)); db_partition->Logger()->SetProducerStatus(filenum_, offset_); } - ReplState state = force_sync_ - ? ReplState::kTryDBSync : ReplState::kTryConnect; + ReplState state = + force_sync_ ? ReplState::kTryDBSync : ReplState::kTryConnect; s = g_pika_rm->ActivateSyncSlavePartition( RmNode(g_pika_server->master_ip(), g_pika_server->master_port(), - db_name_, 0), state); + db_name_, 0), + state); } } @@ -244,10 +248,11 @@ void AuthCmd::Do(std::shared_ptr partition) { std::shared_ptr conn = GetConn(); if (!conn) { res_.SetRes(CmdRes::kErrOther, kCmdNamePing); - LOG(WARNING) << name_ << " weak ptr is empty"; + LOG(WARNING) << name_ << " weak ptr is empty"; return; } - std::shared_ptr cli_conn = std::dynamic_pointer_cast(conn); + std::shared_ptr cli_conn = + std::dynamic_pointer_cast(conn); cli_conn->auth_stat().ChecknUpdate(res().raw_message()); } @@ -277,14 +282,14 @@ void BgsaveCmd::Do(std::shared_ptr partition) { } void CompactCmd::DoInitial() { - if (!CheckArg(argv_.size()) - || argv_.size() > 3) { + if (!CheckArg(argv_.size()) || argv_.size() > 3) { res_.SetRes(CmdRes::kWrongNum, kCmdNameCompact); return; } if (g_pika_server->IsKeyScaning()) { - res_.SetRes(CmdRes::kErrOther, "The info keyspace operation is executing, Try again later"); + res_.SetRes(CmdRes::kErrOther, + "The info keyspace operation is executing, Try again later"); return; } @@ -309,17 +314,23 @@ void CompactCmd::DoInitial() { void CompactCmd::Do(std::shared_ptr partition) { if (!strcasecmp(struct_type_.data(), "all")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactAll, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactAll, + compact_tables_); } else if (!strcasecmp(struct_type_.data(), "string")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactStrings, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactStrings, + compact_tables_); } else if (!strcasecmp(struct_type_.data(), "hash")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactHashes, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactHashes, + compact_tables_); } else if (!strcasecmp(struct_type_.data(), "set")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactSets, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactSets, + compact_tables_); } else if (!strcasecmp(struct_type_.data(), "zset")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactZSets, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactZSets, + compact_tables_); } else if (!strcasecmp(struct_type_.data(), "list")) { - g_pika_server->DoSameThingSpecificTable(TaskType::kCompactList, compact_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kCompactList, + compact_tables_); } else { res_.SetRes(CmdRes::kInvalidDbType, struct_type_); return; @@ -329,14 +340,13 @@ void CompactCmd::Do(std::shared_ptr partition) { } void PurgelogstoCmd::DoInitial() { - if (!CheckArg(argv_.size()) - || argv_.size() > 3) { + if (!CheckArg(argv_.size()) || argv_.size() > 3) { res_.SetRes(CmdRes::kWrongNum, kCmdNamePurgelogsto); return; } std::string filename = argv_[1]; if (filename.size() <= kBinlogPrefixLen || - kBinlogPrefix != filename.substr(0, kBinlogPrefixLen)) { + kBinlogPrefix != filename.substr(0, kBinlogPrefixLen)) { res_.SetRes(CmdRes::kInvalidParameter); return; } @@ -348,7 +358,7 @@ void PurgelogstoCmd::DoInitial() { } num_ = num; - table_ = (argv_.size() == 3) ? argv_[2] :g_pika_conf->default_table(); + table_ = (argv_.size() == 3) ? argv_[2] : g_pika_conf->default_table(); if (!g_pika_server->IsTableExist(table_)) { res_.SetRes(CmdRes::kInvalidTable, table_); return; @@ -356,7 +366,8 @@ void PurgelogstoCmd::DoInitial() { } void PurgelogstoCmd::Do(std::shared_ptr partition) { - std::shared_ptr sync_partition = g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_, 0)); + std::shared_ptr sync_partition = + g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_, 0)); if (!sync_partition) { res_.SetRes(CmdRes::kErrOther, "Partition not found"); } else { @@ -376,10 +387,11 @@ void PingCmd::Do(std::shared_ptr partition) { std::shared_ptr conn = GetConn(); if (!conn) { res_.SetRes(CmdRes::kErrOther, kCmdNamePing); - LOG(WARNING) << name_ << " weak ptr is empty"; + LOG(WARNING) << name_ << " weak ptr is empty"; return; } - std::shared_ptr cli_conn = std::dynamic_pointer_cast(conn); + std::shared_ptr cli_conn = + std::dynamic_pointer_cast(conn); if (cli_conn->IsPubSub()) { return res_.SetRes(CmdRes::kNone, ConstructPinginPubSubResp(argv_)); @@ -399,7 +411,8 @@ void SelectCmd::DoInitial() { } if (g_pika_conf->classic_mode()) { if (index < 0 || index >= g_pika_conf->databases()) { - res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range"); + res_.SetRes(CmdRes::kInvalidIndex, + kCmdNameSelect + " DB index is out of range"); return; } } @@ -412,10 +425,10 @@ void SelectCmd::DoInitial() { void SelectCmd::Do(std::shared_ptr partition) { std::shared_ptr conn = - std::dynamic_pointer_cast(GetConn()); + std::dynamic_pointer_cast(GetConn()); if (!conn) { res_.SetRes(CmdRes::kErrOther, kCmdNameSelect); - LOG(WARNING) << name_ << " weak ptr is empty"; + LOG(WARNING) << name_ << " weak ptr is empty"; return; } conn->SetCurrentTable(table_name_); @@ -437,12 +450,9 @@ void FlushallCmd::Do(std::shared_ptr partition) { } // flushall convert flushdb writes to every partition binlog -std::string FlushallCmd::ToBinlog( - uint32_t exec_time, - uint32_t term_id, - uint64_t logic_id, - uint32_t filenum, - uint64_t offset) { +std::string FlushallCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, + uint64_t logic_id, uint32_t filenum, + uint64_t offset) { std::string content; content.reserve(RAW_ARGS_LEN); RedisAppendLen(content, 1, "*"); @@ -451,14 +461,9 @@ std::string FlushallCmd::ToBinlog( std::string flushdb_cmd("flushdb"); RedisAppendLen(content, flushdb_cmd.size(), "$"); RedisAppendContent(content, flushdb_cmd); - return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, - exec_time, - term_id, - logic_id, - filenum, - offset, - content, - {}); + return PikaBinlogTransverter::BinlogEncode(BinlogType::TypeFirst, exec_time, + term_id, logic_id, filenum, offset, + content, {}); } void FlushdbCmd::DoInitial() { @@ -511,13 +516,14 @@ void ClientCmd::DoInitial() { info_ = argv_[4]; } else { res_.SetRes(CmdRes::kErrOther, - "Syntax error, try CLIENT (LIST [order by [addr|idle])"); + "Syntax error, try CLIENT (LIST [order by [addr|idle])"); return; } } else if (!strcasecmp(argv_[1].data(), "kill") && argv_.size() == 3) { info_ = argv_[2]; } else { - res_.SetRes(CmdRes::kErrOther, + res_.SetRes( + CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle]| KILL ip:port)"); return; } @@ -541,14 +547,16 @@ void ClientCmd::Do(std::shared_ptr partition) { } while (iter != clients.end()) { snprintf(buf, sizeof(buf), "addr=%s fd=%d idle=%ld\n", - iter->ip_port.c_str(), iter->fd, - iter->last_interaction == 0 ? 0 : now.tv_sec - iter->last_interaction); + iter->ip_port.c_str(), iter->fd, + iter->last_interaction == 0 + ? 0 + : now.tv_sec - iter->last_interaction); reply.append(buf); iter++; } res_.AppendString(reply); } else if (!strcasecmp(operation_.data(), "kill") && - !strcasecmp(info_.data(), "all")) { + !strcasecmp(info_.data(), "all")) { g_pika_server->ClientKillAll(); res_.SetRes(CmdRes::kOk); } else if (g_pika_server->ClientKill(info_) == 1) { @@ -569,13 +577,15 @@ void ShutdownCmd::DoInitial() { if (is_local()) { std::shared_ptr conn = GetConn(); if (conn) { - if (conn->ip_port().find("127.0.0.1") == std::string::npos - && conn->ip_port().find(g_pika_server->host()) == std::string::npos) { - LOG(WARNING) << "\'shutdown\' should be localhost" << " command from " << conn->ip_port(); - res_.SetRes(CmdRes::kErrOther, kCmdNameShutdown + " should be localhost"); + if (conn->ip_port().find("127.0.0.1") == std::string::npos && + conn->ip_port().find(g_pika_server->host()) == std::string::npos) { + LOG(WARNING) << "\'shutdown\' should be localhost" + << " command from " << conn->ip_port(); + res_.SetRes(CmdRes::kErrOther, + kCmdNameShutdown + " should be localhost"); } } else { - LOG(WARNING) << name_ << " weak ptr is empty"; + LOG(WARNING) << name_ << " weak ptr is empty"; res_.SetRes(CmdRes::kErrOther, kCmdNameShutdown); return; } @@ -609,7 +619,7 @@ void InfoCmd::DoInitial() { if (argc == 1) { info_section_ = kInfo; return; - } // then the agc is 2 or 3 + } // then the agc is 2 or 3 if (!strcasecmp(argv_[1].data(), kAllSection.data())) { info_section_ = kInfoAll; @@ -637,7 +647,8 @@ void InfoCmd::DoInitial() { // info keyspace off db0,db1 if (argv_[2] == "1") { if (g_pika_server->IsCompacting()) { - res_.SetRes(CmdRes::kErrOther, "The compact operation is executing, Try again later"); + res_.SetRes(CmdRes::kErrOther, + "The compact operation is executing, Try again later"); } else { rescan_ = true; } @@ -736,11 +747,10 @@ void InfoCmd::Do(std::shared_ptr partition) { InfoDebug(info); break; default: - //kInfoErr is nothing + // kInfoErr is nothing break; } - res_.AppendStringLen(info.size()); res_.AppendContent(info); return; @@ -757,21 +767,29 @@ void InfoCmd::InfoServer(std::string& info) { time_t current_time_s = time(NULL); std::stringstream tmp_stream; char version[32]; - snprintf(version, sizeof(version), "%d.%d.%d", PIKA_MAJOR, - PIKA_MINOR, PIKA_PATCH); + snprintf(version, sizeof(version), "%d.%d.%d", PIKA_MAJOR, PIKA_MINOR, + PIKA_PATCH); tmp_stream << "# Server\r\n"; tmp_stream << "pika_version:" << version << "\r\n"; tmp_stream << pika_build_git_sha << "\r\n"; - tmp_stream << "pika_build_compile_date: " << - pika_build_compile_date << "\r\n"; - tmp_stream << "os:" << host_info.sysname << " " << host_info.release << " " << host_info.machine << "\r\n"; - tmp_stream << "arch_bits:" << (reinterpret_cast(&host_info.machine) + strlen(host_info.machine) - 2) << "\r\n"; + tmp_stream << "pika_build_compile_date: " << pika_build_compile_date + << "\r\n"; + tmp_stream << "os:" << host_info.sysname << " " << host_info.release << " " + << host_info.machine << "\r\n"; + tmp_stream << "arch_bits:" + << (reinterpret_cast(&host_info.machine) + + strlen(host_info.machine) - 2) + << "\r\n"; tmp_stream << "process_id:" << getpid() << "\r\n"; tmp_stream << "tcp_port:" << g_pika_conf->port() << "\r\n"; tmp_stream << "thread_num:" << g_pika_conf->thread_num() << "\r\n"; tmp_stream << "sync_thread_num:" << g_pika_conf->sync_thread_num() << "\r\n"; - tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pika_server->start_time_s()) << "\r\n"; - tmp_stream << "uptime_in_days:" << (current_time_s / (24*3600) - g_pika_server->start_time_s() / (24*3600) + 1) << "\r\n"; + tmp_stream << "uptime_in_seconds:" + << (current_time_s - g_pika_server->start_time_s()) << "\r\n"; + tmp_stream << "uptime_in_days:" + << (current_time_s / (24 * 3600) - + g_pika_server->start_time_s() / (24 * 3600) + 1) + << "\r\n"; tmp_stream << "config_file:" << g_pika_conf->conf_path() << "\r\n"; tmp_stream << "server_id:" << g_pika_conf->server_id() << "\r\n"; @@ -789,14 +807,21 @@ void InfoCmd::InfoClients(std::string& info) { void InfoCmd::InfoStats(std::string& info) { std::stringstream tmp_stream; tmp_stream << "# Stats\r\n"; - tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n"; - tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n"; - tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n"; - tmp_stream << "is_bgsaving:" << (g_pika_server->IsBgSaving() ? "Yes" : "No") << "\r\n"; - tmp_stream << "is_scaning_keyspace:" << (g_pika_server->IsKeyScaning() ? "Yes" : "No") << "\r\n"; - tmp_stream << "is_compact:" << (g_pika_server->IsCompacting() ? "Yes" : "No") << "\r\n"; + tmp_stream << "total_connections_received:" + << g_pika_server->accumulative_connections() << "\r\n"; + tmp_stream << "instantaneous_ops_per_sec:" + << g_pika_server->ServerCurrentQps() << "\r\n"; + tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() + << "\r\n"; + tmp_stream << "is_bgsaving:" << (g_pika_server->IsBgSaving() ? "Yes" : "No") + << "\r\n"; + tmp_stream << "is_scaning_keyspace:" + << (g_pika_server->IsKeyScaning() ? "Yes" : "No") << "\r\n"; + tmp_stream << "is_compact:" << (g_pika_server->IsCompacting() ? "Yes" : "No") + << "\r\n"; tmp_stream << "compact_cron:" << g_pika_conf->compact_cron() << "\r\n"; - tmp_stream << "compact_interval:" << g_pika_conf->compact_interval() << "\r\n"; + tmp_stream << "compact_interval:" << g_pika_conf->compact_interval() + << "\r\n"; info.append(tmp_stream.str()); } @@ -805,7 +830,8 @@ void InfoCmd::InfoExecCount(std::string& info) { std::stringstream tmp_stream; tmp_stream << "# Command_Exec_Count\r\n"; - std::unordered_map command_exec_count_table = g_pika_server->ServerExecCountTable(); + std::unordered_map command_exec_count_table = + g_pika_server->ServerExecCountTable(); for (const auto& item : command_exec_count_table) { if (item.second == 0) { continue; @@ -821,38 +847,43 @@ void InfoCmd::InfoCPU(std::string& info) { getrusage(RUSAGE_CHILDREN, &c_ru); std::stringstream tmp_stream; tmp_stream << "# CPU\r\n"; - tmp_stream << "used_cpu_sys:" << - setiosflags(std::ios::fixed) << std::setprecision(2) << - (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000 << - "\r\n"; - tmp_stream << "used_cpu_user:" << - setiosflags(std::ios::fixed) << std::setprecision(2) << - (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000 << - "\r\n"; - tmp_stream << "used_cpu_sys_children:" << - setiosflags(std::ios::fixed) << std::setprecision(2) << - (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000 << - "\r\n"; - tmp_stream << "used_cpu_user_children:" << - setiosflags(std::ios::fixed) << std::setprecision(2) << - (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000 << - "\r\n"; + tmp_stream << "used_cpu_sys:" << setiosflags(std::ios::fixed) + << std::setprecision(2) + << (float)self_ru.ru_stime.tv_sec + + (float)self_ru.ru_stime.tv_usec / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_user:" << setiosflags(std::ios::fixed) + << std::setprecision(2) + << (float)self_ru.ru_utime.tv_sec + + (float)self_ru.ru_utime.tv_usec / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_sys_children:" << setiosflags(std::ios::fixed) + << std::setprecision(2) + << (float)c_ru.ru_stime.tv_sec + + (float)c_ru.ru_stime.tv_usec / 1000000 + << "\r\n"; + tmp_stream << "used_cpu_user_children:" << setiosflags(std::ios::fixed) + << std::setprecision(2) + << (float)c_ru.ru_utime.tv_sec + + (float)c_ru.ru_utime.tv_usec / 1000000 + << "\r\n"; info.append(tmp_stream.str()); } void InfoCmd::InfoShardingReplication(std::string& info) { int role = 0; std::string slave_list_string; - uint32_t slave_num = g_pika_server->GetShardingSlaveListString(slave_list_string); + uint32_t slave_num = + g_pika_server->GetShardingSlaveListString(slave_list_string); if (slave_num) { - role |= PIKA_ROLE_MASTER; + role |= PIKA_ROLE_MASTER; } std::string common_master; std::string master_ip; int master_port = 0; g_pika_rm->FindCommonMaster(&common_master); if (!common_master.empty()) { - role |= PIKA_ROLE_SLAVE; + role |= PIKA_ROLE_SLAVE; if (!slash::ParseIpPortString(common_master, master_ip, master_port)) { return; } @@ -861,26 +892,38 @@ void InfoCmd::InfoShardingReplication(std::string& info) { std::stringstream tmp_stream; tmp_stream << "# Replication("; switch (role) { - case PIKA_ROLE_SINGLE : - case PIKA_ROLE_MASTER : tmp_stream << "MASTER)\r\nrole:master\r\n"; break; - case PIKA_ROLE_SLAVE : tmp_stream << "SLAVE)\r\nrole:slave\r\n"; break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : tmp_stream << "Master && SLAVE)\r\nrole:master&&slave\r\n"; break; - default: info.append("ERR: server role is error\r\n"); return; + case PIKA_ROLE_SINGLE: + case PIKA_ROLE_MASTER: + tmp_stream << "MASTER)\r\nrole:master\r\n"; + break; + case PIKA_ROLE_SLAVE: + tmp_stream << "SLAVE)\r\nrole:slave\r\n"; + break; + case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: + tmp_stream << "Master && SLAVE)\r\nrole:master&&slave\r\n"; + break; + default: + info.append("ERR: server role is error\r\n"); + return; } switch (role) { - case PIKA_ROLE_SLAVE : + case PIKA_ROLE_SLAVE: tmp_stream << "master_host:" << master_ip << "\r\n"; tmp_stream << "master_port:" << master_port << "\r\n"; - tmp_stream << "master_link_status:up"<< "\r\n"; - tmp_stream << "slave_priority:" << g_pika_conf->slave_priority() << "\r\n"; + tmp_stream << "master_link_status:up" + << "\r\n"; + tmp_stream << "slave_priority:" << g_pika_conf->slave_priority() + << "\r\n"; break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : + case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: tmp_stream << "master_host:" << master_ip << "\r\n"; tmp_stream << "master_port:" << master_port << "\r\n"; - tmp_stream << "master_link_status:up"<< "\r\n"; - case PIKA_ROLE_SINGLE : - case PIKA_ROLE_MASTER : - tmp_stream << "connected_slaves:" << slave_num << "\r\n" << slave_list_string; + tmp_stream << "master_link_status:up" + << "\r\n"; + case PIKA_ROLE_SINGLE: + case PIKA_ROLE_MASTER: + tmp_stream << "connected_slaves:" << slave_num << "\r\n" + << slave_list_string; } info.append(tmp_stream.str()); } @@ -903,10 +946,11 @@ void InfoCmd::InfoReplication(std::string& info) { for (const auto& partition_item : table_item.second->partitions_) { std::shared_ptr slave_partition = g_pika_rm->GetSyncSlavePartitionByName( - PartitionInfo(table_item.second->GetTableName(), - partition_item.second->GetPartitionId())); + PartitionInfo(table_item.second->GetTableName(), + partition_item.second->GetPartitionId())); if (!slave_partition) { - out_of_sync << "(" << partition_item.second->GetPartitionName() << ": InternalError)"; + out_of_sync << "(" << partition_item.second->GetPartitionName() + << ": InternalError)"; continue; } if (slave_partition->State() != ReplState::kConnected) { @@ -935,41 +979,63 @@ void InfoCmd::InfoReplication(std::string& info) { tmp_stream << "# Replication("; switch (host_role) { - case PIKA_ROLE_SINGLE : - case PIKA_ROLE_MASTER : tmp_stream << "MASTER)\r\nrole:master\r\n"; break; - case PIKA_ROLE_SLAVE : tmp_stream << "SLAVE)\r\nrole:slave\r\n"; break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : tmp_stream << "Master && SLAVE)\r\nrole:master&&slave\r\n"; break; - default: info.append("ERR: server role is error\r\n"); return; + case PIKA_ROLE_SINGLE: + case PIKA_ROLE_MASTER: + tmp_stream << "MASTER)\r\nrole:master\r\n"; + break; + case PIKA_ROLE_SLAVE: + tmp_stream << "SLAVE)\r\nrole:slave\r\n"; + break; + case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: + tmp_stream << "Master && SLAVE)\r\nrole:master&&slave\r\n"; + break; + default: + info.append("ERR: server role is error\r\n"); + return; } std::string slaves_list_str; switch (host_role) { - case PIKA_ROLE_SLAVE : + case PIKA_ROLE_SLAVE: tmp_stream << "master_host:" << g_pika_server->master_ip() << "\r\n"; tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n"; - tmp_stream << "master_link_status:" << (((g_pika_server->repl_state() == PIKA_REPL_META_SYNC_DONE) - && all_partition_sync) ? "up" : "down") << "\r\n"; - tmp_stream << "slave_priority:" << g_pika_conf->slave_priority() << "\r\n"; - tmp_stream << "slave_read_only:" << g_pika_conf->slave_read_only() << "\r\n"; + tmp_stream << "master_link_status:" + << (((g_pika_server->repl_state() == + PIKA_REPL_META_SYNC_DONE) && + all_partition_sync) + ? "up" + : "down") + << "\r\n"; + tmp_stream << "slave_priority:" << g_pika_conf->slave_priority() + << "\r\n"; + tmp_stream << "slave_read_only:" << g_pika_conf->slave_read_only() + << "\r\n"; if (!all_partition_sync) { - tmp_stream <<"db_repl_state:" << out_of_sync.str() << "\r\n"; + tmp_stream << "db_repl_state:" << out_of_sync.str() << "\r\n"; } break; - case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : + case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE: tmp_stream << "master_host:" << g_pika_server->master_ip() << "\r\n"; tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n"; - tmp_stream << "master_link_status:" << (((g_pika_server->repl_state() == PIKA_REPL_META_SYNC_DONE) - && all_partition_sync) ? "up" : "down") << "\r\n"; - tmp_stream << "slave_read_only:" << g_pika_conf->slave_read_only() << "\r\n"; + tmp_stream << "master_link_status:" + << (((g_pika_server->repl_state() == + PIKA_REPL_META_SYNC_DONE) && + all_partition_sync) + ? "up" + : "down") + << "\r\n"; + tmp_stream << "slave_read_only:" << g_pika_conf->slave_read_only() + << "\r\n"; if (!all_partition_sync) { - tmp_stream <<"db_repl_state:" << out_of_sync.str() << "\r\n"; + tmp_stream << "db_repl_state:" << out_of_sync.str() << "\r\n"; } - case PIKA_ROLE_SINGLE : - case PIKA_ROLE_MASTER : - tmp_stream << "connected_slaves:" << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" << slaves_list_str; + case PIKA_ROLE_SINGLE: + case PIKA_ROLE_MASTER: + tmp_stream << "connected_slaves:" + << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" + << slaves_list_str; } - Status s; uint32_t filenum = 0; uint64_t offset = 0; @@ -980,19 +1046,22 @@ void InfoCmd::InfoReplication(std::string& info) { for (const auto& p_item : t_item.second->partitions_) { std::string table_name = p_item.second->GetTableName(); uint32_t partition_id = p_item.second->GetPartitionId(); - master_partition = g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); + master_partition = g_pika_rm->GetSyncMasterPartitionByName( + PartitionInfo(table_name, partition_id)); if (!master_partition) { - LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << partition_id - << ", NotFound"; + LOG(WARNING) << "Sync Master Partition: " << table_name << ":" + << partition_id << ", NotFound"; continue; } master_partition->Logger()->GetProducerStatus(&filenum, &offset); tmp_stream << table_name << " binlog_offset=" << filenum << " " << offset; s = master_partition->GetSafetyPurgeBinlog(&safety_purge); - tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n"; + tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") + << "\r\n"; if (g_pika_conf->consensus_level()) { LogOffset last_log = master_partition->ConsensusLastIndex(); - tmp_stream << table_name << " consensus last_log=" << last_log.ToString() << "\r\n"; + tmp_stream << table_name + << " consensus last_log=" << last_log.ToString() << "\r\n"; } } } @@ -1002,7 +1071,8 @@ void InfoCmd::InfoReplication(std::string& info) { void InfoCmd::InfoKeyspace(std::string& info) { if (off_) { - g_pika_server->DoSameThingSpecificTable(TaskType::kStopKeyScan, keyspace_scan_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kStopKeyScan, + keyspace_scan_tables_); info.append("OK\r\n"); return; } @@ -1015,8 +1085,9 @@ void InfoCmd::InfoKeyspace(std::string& info) { tmp_stream << "# Keyspace\r\n"; slash::RWLock rwl(&g_pika_server->tables_rw_, false); for (const auto& table_item : g_pika_server->tables_) { - if (keyspace_scan_tables_.empty() - || keyspace_scan_tables_.find(table_item.first) != keyspace_scan_tables_.end()) { + if (keyspace_scan_tables_.empty() || + keyspace_scan_tables_.find(table_item.first) != + keyspace_scan_tables_.end()) { table_name = table_item.second->GetTableName(); key_scan_info = table_item.second->GetKeyScanInfo(); key_infos = key_scan_info.key_infos; @@ -1027,24 +1098,39 @@ void InfoCmd::InfoKeyspace(std::string& info) { } tmp_stream << "# Time:" << key_scan_info.s_start_time << "\r\n"; if (duration == -2) { - tmp_stream << "# Duration: " << "In Waiting\r\n"; + tmp_stream << "# Duration: " + << "In Waiting\r\n"; } else if (duration == -1) { - tmp_stream << "# Duration: " << "In Processing\r\n"; + tmp_stream << "# Duration: " + << "In Processing\r\n"; } else if (duration >= 0) { - tmp_stream << "# Duration: " << std::to_string(duration) + "s" << "\r\n"; + tmp_stream << "# Duration: " << std::to_string(duration) + "s" + << "\r\n"; } - tmp_stream << table_name << " Strings_keys=" << key_infos[0].keys << ", expires=" << key_infos[0].expires << ", invalid_keys=" << key_infos[0].invaild_keys << "\r\n"; - tmp_stream << table_name << " Hashes_keys=" << key_infos[1].keys << ", expires=" << key_infos[1].expires << ", invalid_keys=" << key_infos[1].invaild_keys << "\r\n"; - tmp_stream << table_name << " Lists_keys=" << key_infos[2].keys << ", expires=" << key_infos[2].expires << ", invalid_keys=" << key_infos[2].invaild_keys << "\r\n"; - tmp_stream << table_name << " Zsets_keys=" << key_infos[3].keys << ", expires=" << key_infos[3].expires << ", invalid_keys=" << key_infos[3].invaild_keys << "\r\n"; - tmp_stream << table_name << " Sets_keys=" << key_infos[4].keys << ", expires=" << key_infos[4].expires << ", invalid_keys=" << key_infos[4].invaild_keys << "\r\n\r\n"; + tmp_stream << table_name << " Strings_keys=" << key_infos[0].keys + << ", expires=" << key_infos[0].expires + << ", invalid_keys=" << key_infos[0].invaild_keys << "\r\n"; + tmp_stream << table_name << " Hashes_keys=" << key_infos[1].keys + << ", expires=" << key_infos[1].expires + << ", invalid_keys=" << key_infos[1].invaild_keys << "\r\n"; + tmp_stream << table_name << " Lists_keys=" << key_infos[2].keys + << ", expires=" << key_infos[2].expires + << ", invalid_keys=" << key_infos[2].invaild_keys << "\r\n"; + tmp_stream << table_name << " Zsets_keys=" << key_infos[3].keys + << ", expires=" << key_infos[3].expires + << ", invalid_keys=" << key_infos[3].invaild_keys << "\r\n"; + tmp_stream << table_name << " Sets_keys=" << key_infos[4].keys + << ", expires=" << key_infos[4].expires + << ", invalid_keys=" << key_infos[4].invaild_keys + << "\r\n\r\n"; } } info.append(tmp_stream.str()); if (rescan_) { - g_pika_server->DoSameThingSpecificTable(TaskType::kStartKeyScan, keyspace_scan_tables_); + g_pika_server->DoSameThingSpecificTable(TaskType::kStartKeyScan, + keyspace_scan_tables_); } return; } @@ -1054,7 +1140,8 @@ void InfoCmd::InfoData(std::string& info) { std::stringstream db_fatal_msg_stream; int64_t db_size = slash::Du(g_pika_conf->db_path()); - tmp_stream << "# Data" << "\r\n"; + tmp_stream << "# Data" + << "\r\n"; tmp_stream << "db_size:" << db_size << "\r\n"; tmp_stream << "db_size_human:" << (db_size >> 20) << "M\r\n"; int64_t log_size = slash::Du(g_pika_conf->log_path()); @@ -1074,28 +1161,39 @@ void InfoCmd::InfoData(std::string& info) { type_result.clear(); memtable_usage = table_reader_usage = 0; patition_item.second->DbRWLockReader(); - patition_item.second->db()->GetUsage(blackwidow::PROPERTY_TYPE_ROCKSDB_MEMTABLE, &memtable_usage); - patition_item.second->db()->GetUsage(blackwidow::PROPERTY_TYPE_ROCKSDB_TABLE_READER, &table_reader_usage); - patition_item.second->db()->GetUsage(blackwidow::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &type_result); + patition_item.second->db()->GetUsage( + blackwidow::PROPERTY_TYPE_ROCKSDB_MEMTABLE, &memtable_usage); + patition_item.second->db()->GetUsage( + blackwidow::PROPERTY_TYPE_ROCKSDB_TABLE_READER, &table_reader_usage); + patition_item.second->db()->GetUsage( + blackwidow::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &type_result); patition_item.second->DbRWUnLock(); total_memtable_usage += memtable_usage; total_table_reader_usage += table_reader_usage; for (const auto& item : type_result) { if (item.second != 0) { db_fatal_msg_stream << (total_background_errors != 0 ? "," : ""); - db_fatal_msg_stream << patition_item.second->GetPartitionName() << "/" << item.first; + db_fatal_msg_stream << patition_item.second->GetPartitionName() << "/" + << item.first; total_background_errors += item.second; } } } } - tmp_stream << "used_memory:" << (total_memtable_usage + total_table_reader_usage) << "\r\n"; - tmp_stream << "used_memory_human:" << ((total_memtable_usage + total_table_reader_usage) >> 20) << "M\r\n"; + tmp_stream << "used_memory:" + << (total_memtable_usage + total_table_reader_usage) << "\r\n"; + tmp_stream << "used_memory_human:" + << ((total_memtable_usage + total_table_reader_usage) >> 20) + << "M\r\n"; tmp_stream << "db_memtable_usage:" << total_memtable_usage << "\r\n"; tmp_stream << "db_tablereader_usage:" << total_table_reader_usage << "\r\n"; - tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") << "\r\n"; - tmp_stream << "db_fatal_msg:" << (total_background_errors != 0 ? db_fatal_msg_stream.str() : "NULL") << "\r\n"; + tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") + << "\r\n"; + tmp_stream << "db_fatal_msg:" + << (total_background_errors != 0 ? db_fatal_msg_stream.str() + : "NULL") + << "\r\n"; info.append(tmp_stream.str()); return; @@ -1103,12 +1201,14 @@ void InfoCmd::InfoData(std::string& info) { void InfoCmd::InfoDebug(std::string& info) { std::stringstream tmp_stream; - tmp_stream << "# Synchronization Status" << "\r\n"; + tmp_stream << "# Synchronization Status" + << "\r\n"; info.append(tmp_stream.str()); g_pika_rm->RmStatus(&info); tmp_stream.str(std::string()); - tmp_stream << "# Running Status " << "\r\n"; + tmp_stream << "# Running Status " + << "\r\n"; info.append(tmp_stream.str()); g_pika_server->ServerStatus(&info); return; @@ -1122,29 +1222,36 @@ void ConfigCmd::DoInitial() { size_t argc = argv_.size(); if (!strcasecmp(argv_[1].data(), "get")) { if (argc != 3) { - res_.SetRes(CmdRes::kErrOther, "Wrong number of arguments for CONFIG get"); + res_.SetRes(CmdRes::kErrOther, + "Wrong number of arguments for CONFIG get"); return; } } else if (!strcasecmp(argv_[1].data(), "set")) { if (argc == 3 && argv_[2] != "*") { - res_.SetRes(CmdRes::kErrOther, "Wrong number of arguments for CONFIG set"); + res_.SetRes(CmdRes::kErrOther, + "Wrong number of arguments for CONFIG set"); return; } else if (argc != 4 && argc != 3) { - res_.SetRes(CmdRes::kErrOther, "Wrong number of arguments for CONFIG set"); + res_.SetRes(CmdRes::kErrOther, + "Wrong number of arguments for CONFIG set"); return; } } else if (!strcasecmp(argv_[1].data(), "rewrite")) { if (argc != 2) { - res_.SetRes(CmdRes::kErrOther, "Wrong number of arguments for CONFIG rewrite"); + res_.SetRes(CmdRes::kErrOther, + "Wrong number of arguments for CONFIG rewrite"); return; } } else if (!strcasecmp(argv_[1].data(), "resetstat")) { if (argc != 2) { - res_.SetRes(CmdRes::kErrOther, "Wrong number of arguments for CONFIG resetstat"); + res_.SetRes(CmdRes::kErrOther, + "Wrong number of arguments for CONFIG resetstat"); return; } } else { - res_.SetRes(CmdRes::kErrOther, "CONFIG subcommand must be one of GET, SET, RESETSTAT, REWRITE"); + res_.SetRes( + CmdRes::kErrOther, + "CONFIG subcommand must be one of GET, SET, RESETSTAT, REWRITE"); return; } config_args_v_.assign(argv_.begin() + 1, argv_.end()); @@ -1166,7 +1273,7 @@ void ConfigCmd::Do(std::shared_ptr partition) { return; } -static void EncodeString(std::string *dst, const std::string &value) { +static void EncodeString(std::string* dst, const std::string& value) { dst->append("$"); dst->append(std::to_string(value.size())); dst->append("\r\n"); @@ -1174,7 +1281,7 @@ static void EncodeString(std::string *dst, const std::string &value) { dst->append("\r\n"); } -static void EncodeInt32(std::string *dst, const int32_t v) { +static void EncodeInt32(std::string* dst, const int32_t v) { std::string vstr = std::to_string(v); dst->append("$"); dst->append(std::to_string(vstr.length())); @@ -1183,7 +1290,7 @@ static void EncodeInt32(std::string *dst, const int32_t v) { dst->append("\r\n"); } -static void EncodeInt64(std::string *dst, const int64_t v) { +static void EncodeInt64(std::string* dst, const int64_t v) { std::string vstr = std::to_string(v); dst->append("$"); dst->append(std::to_string(vstr.length())); @@ -1192,7 +1299,7 @@ static void EncodeInt64(std::string *dst, const int64_t v) { dst->append("\r\n"); } -void ConfigCmd::ConfigGet(std::string &ret) { +void ConfigCmd::ConfigGet(std::string& ret) { size_t elements = 0; std::string config_body; std::string pattern = config_args_v_[1]; @@ -1290,18 +1397,19 @@ void ConfigCmd::ConfigGet(std::string &ret) { if (slash::stringmatch(pattern.data(), "instance-mode", 1)) { elements += 2; EncodeString(&config_body, "instance-mode"); - EncodeString(&config_body, (g_pika_conf->classic_mode() ? "classic" : "sharding")); + EncodeString(&config_body, + (g_pika_conf->classic_mode() ? "classic" : "sharding")); } - if (g_pika_conf->classic_mode() - && slash::stringmatch(pattern.data(), "databases", 1)) { + if (g_pika_conf->classic_mode() && + slash::stringmatch(pattern.data(), "databases", 1)) { elements += 2; EncodeString(&config_body, "databases"); EncodeInt32(&config_body, g_pika_conf->databases()); } - if (!g_pika_conf->classic_mode() - && slash::stringmatch(pattern.data(), "default-slot-num", 1)) { + if (!g_pika_conf->classic_mode() && + slash::stringmatch(pattern.data(), "default-slot-num", 1)) { elements += 2; EncodeString(&config_body, "default-slot-num"); EncodeInt32(&config_body, g_pika_conf->default_slot_num()); @@ -1406,19 +1514,24 @@ void ConfigCmd::ConfigGet(std::string &ret) { if (slash::stringmatch(pattern.data(), "cache-index-and-filter-blocks", 1)) { elements += 2; EncodeString(&config_body, "cache-index-and-filter-blocks"); - EncodeString(&config_body, g_pika_conf->cache_index_and_filter_blocks() ? "yes" : "no"); + EncodeString(&config_body, + g_pika_conf->cache_index_and_filter_blocks() ? "yes" : "no"); } if (slash::stringmatch(pattern.data(), "optimize-filters-for-hits", 1)) { elements += 2; EncodeString(&config_body, "optimize-filters-for-hits"); - EncodeString(&config_body, g_pika_conf->optimize_filters_for_hits() ? "yes" : "no"); + EncodeString(&config_body, + g_pika_conf->optimize_filters_for_hits() ? "yes" : "no"); } - if (slash::stringmatch(pattern.data(), "level-compaction-dynamic-level-bytes", 1)) { + if (slash::stringmatch(pattern.data(), "level-compaction-dynamic-level-bytes", + 1)) { elements += 2; EncodeString(&config_body, "level-compaction-dynamic-level-bytes"); - EncodeString(&config_body, g_pika_conf->level_compaction_dynamic_level_bytes() ? "yes" : "no"); + EncodeString( + &config_body, + g_pika_conf->level_compaction_dynamic_level_bytes() ? "yes" : "no"); } if (slash::stringmatch(pattern.data(), "expire-logs-days", 1)) { @@ -1442,7 +1555,8 @@ void ConfigCmd::ConfigGet(std::string &ret) { if (slash::stringmatch(pattern.data(), "slowlog-write-errorlog", 1)) { elements += 2; EncodeString(&config_body, "slowlog-write-errorlog"); - EncodeString(&config_body, g_pika_conf->slowlog_write_errorlog() ? "yes" : "no"); + EncodeString(&config_body, + g_pika_conf->slowlog_write_errorlog() ? "yes" : "no"); } if (slash::stringmatch(pattern.data(), "slowlog-log-slower-than", 1)) { @@ -1621,7 +1735,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "maxclients") { if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'maxclients'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'maxclients'\r\n"; return; } g_pika_conf->SetMaxConnection(ival); @@ -1629,35 +1744,40 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "dump-expire") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'dump-expire'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'dump-expire'\r\n"; return; } g_pika_conf->SetExpireDumpDays(ival); ret = "+OK\r\n"; } else if (set_item == "slave-priority") { - if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slave-priority'\r\n"; + if (!slash::string2l(value.data(), value.size(), &ival)) { + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'slave-priority'\r\n"; return; } g_pika_conf->SetSlavePriority(ival); ret = "+OK\r\n"; } else if (set_item == "expire-logs-days") { if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-days'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'expire-logs-days'\r\n"; return; } g_pika_conf->SetExpireLogsDays(ival); ret = "+OK\r\n"; } else if (set_item == "expire-logs-nums") { if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'expire-logs-nums'\r\n"; return; } g_pika_conf->SetExpireLogsNums(ival); ret = "+OK\r\n"; } else if (set_item == "root-connection-num") { if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'root-connection-num'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'root-connection-num'\r\n"; return; } g_pika_conf->SetRootConnectionNum(ival); @@ -1669,21 +1789,24 @@ void ConfigCmd::ConfigSet(std::string& ret) { } else if (value == "no") { is_write_errorlog = false; } else { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-write-errorlog'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'slowlog-write-errorlog'\r\n"; return; } g_pika_conf->SetSlowlogWriteErrorlog(is_write_errorlog); ret = "+OK\r\n"; } else if (set_item == "slowlog-log-slower-than") { if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"; return; } g_pika_conf->SetSlowlogSlowerThan(ival); ret = "+OK\r\n"; } else if (set_item == "slowlog-max-len") { if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-max-len'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'slowlog-max-len'\r\n"; return; } g_pika_conf->SetSlowlogMaxLen(ival); @@ -1691,7 +1814,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "max-cache-statistic-keys") { if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n"; return; } g_pika_conf->SetMaxCacheStatisticKeys(ival); @@ -1699,7 +1823,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "small-compaction-threshold") { if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-threshold'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'small-compaction-threshold'\r\n"; return; } g_pika_conf->SetSmallCompactionThreshold(ival); @@ -1707,7 +1832,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "max-client-response-size") { if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'max-client-response-size'\r\n"; return; } g_pika_conf->SetMaxClientResponseSize(ival); @@ -1726,7 +1852,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { } } else if (set_item == "db-sync-speed") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'db-sync-speed(MB)'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'db-sync-speed(MB)'\r\n"; return; } if (ival < 0 || ival > 1024) { @@ -1753,15 +1880,16 @@ void ConfigCmd::ConfigSet(std::string& ret) { std::string::size_type colon = compact_cron.find("-"); std::string::size_type underline = compact_cron.find("/"); if (colon == std::string::npos || underline == std::string::npos || - colon >= underline || colon + 1 >= len || - colon + 1 == underline || underline + 1 >= len) { - invalid = true; + colon >= underline || colon + 1 >= len || colon + 1 == underline || + underline + 1 >= len) { + invalid = true; } else { int week = std::atoi(week_str.c_str()); int start = std::atoi(compact_cron.substr(0, colon).c_str()); int end = std::atoi(compact_cron.substr(colon + 1, underline).c_str()); int usage = std::atoi(compact_cron.substr(underline + 1).c_str()); - if ((have_week && (week < 1 || week > 7)) || start < 0 || start > 23 || end < 0 || end > 23 || usage < 0 || usage > 100) { + if ((have_week && (week < 1 || week > 7)) || start < 0 || start > 23 || + end < 0 || end > 23 || usage < 0 || usage > 100) { invalid = true; } } @@ -1782,7 +1910,7 @@ void ConfigCmd::ConfigSet(std::string& ret) { invalid = true; } else { int interval = std::atoi(value.substr(0, slash).c_str()); - int usage = std::atoi(value.substr(slash+1).c_str()); + int usage = std::atoi(value.substr(slash + 1).c_str()); if (interval <= 0 || usage < 0 || usage > 100) { invalid = true; } @@ -1797,22 +1925,27 @@ void ConfigCmd::ConfigSet(std::string& ret) { } } else if (set_item == "sync-window-size") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'sync-window-size'\r\n"; return; } if (ival <= 0 || ival > kBinlogReadWinMaxSize) { - ret = "-ERR Argument exceed range \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + ret = "-ERR Argument exceed range \'" + value + + "\' for CONFIG SET 'sync-window-size'\r\n"; return; } g_pika_conf->SetSyncWindowSize(ival); ret = "+OK\r\n"; } else if (set_item == "max-cache-files") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'max-cache-files'\r\n"; return; } - std::unordered_map options_map{{"max_open_files", value}}; - blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map); + std::unordered_map options_map{ + {"max_open_files", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions( + blackwidow::OptionType::kDB, options_map); if (!s.ok()) { ret = "-ERR Set max-cache-files wrong: " + s.ToString() + "\r\n"; return; @@ -1821,24 +1954,31 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "max-background-compactions") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-compactions'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'max-background-compactions'\r\n"; return; } - std::unordered_map options_map{{"max_background_compactions", value}}; - blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map); + std::unordered_map options_map{ + {"max_background_compactions", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions( + blackwidow::OptionType::kDB, options_map); if (!s.ok()) { - ret = "-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n"; + ret = + "-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n"; return; } g_pika_conf->SetMaxBackgroudCompactions(ival); ret = "+OK\r\n"; } else if (set_item == "write-buffer-size") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'write-buffer-size'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'write-buffer-size'\r\n"; return; } - std::unordered_map options_map{{"write_buffer_size", value}}; - blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + std::unordered_map options_map{ + {"write_buffer_size", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions( + blackwidow::OptionType::kColumnFamily, options_map); if (!s.ok()) { ret = "-ERR Set write-buffer-size wrong: " + s.ToString() + "\r\n"; return; @@ -1847,11 +1987,14 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "max-write-buffer-num") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-write-buffer-number'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'max-write-buffer-number'\r\n"; return; } - std::unordered_map options_map{{"max_write_buffer_number", value}}; - blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + std::unordered_map options_map{ + {"max_write_buffer_number", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions( + blackwidow::OptionType::kColumnFamily, options_map); if (!s.ok()) { ret = "-ERR Set max-write-buffer-number wrong: " + s.ToString() + "\r\n"; return; @@ -1860,11 +2003,14 @@ void ConfigCmd::ConfigSet(std::string& ret) { ret = "+OK\r\n"; } else if (set_item == "arena-block-size") { if (!slash::string2l(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n"; + ret = "-ERR Invalid argument \'" + value + + "\' for CONFIG SET 'arena-block-size'\r\n"; return; } - std::unordered_map options_map{{"arena_block_size", value}}; - blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map); + std::unordered_map options_map{ + {"arena_block_size", value}}; + blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions( + blackwidow::OptionType::kColumnFamily, options_map); if (!s.ok()) { ret = "-ERR Set arena-block-size wrong: " + s.ToString() + "\r\n"; return; @@ -1876,12 +2022,12 @@ void ConfigCmd::ConfigSet(std::string& ret) { } } -void ConfigCmd::ConfigRewrite(std::string &ret) { +void ConfigCmd::ConfigRewrite(std::string& ret) { g_pika_conf->ConfigRewrite(); ret = "+OK\r\n"; } -void ConfigCmd::ConfigResetstat(std::string &ret) { +void ConfigCmd::ConfigResetstat(std::string& ret) { g_pika_server->ResetStat(); ret = "+OK\r\n"; } @@ -1897,15 +2043,18 @@ void MonitorCmd::Do(std::shared_ptr partition) { std::shared_ptr conn_repl = GetConn(); if (!conn_repl) { res_.SetRes(CmdRes::kErrOther, kCmdNameMonitor); - LOG(WARNING) << name_ << " weak ptr is empty"; + LOG(WARNING) << name_ << " weak ptr is empty"; return; } std::shared_ptr conn = - std::dynamic_pointer_cast(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd()); + std::dynamic_pointer_cast(conn_repl) + ->server_thread() + ->MoveConnOut(conn_repl->fd()); assert(conn.get() == conn_repl.get()); - g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn)); + g_pika_server->AddMonitorClient( + std::dynamic_pointer_cast(conn)); g_pika_server->AddMonitorMessage("OK"); - return; // Monitor thread will return "OK" + return; // Monitor thread will return "OK" } void DbsizeCmd::DoInitial() { @@ -1926,11 +2075,8 @@ void DbsizeCmd::Do(std::shared_ptr partition) { res_.SetRes(CmdRes::kErrOther, "keyspace error"); return; } - int64_t dbsize = key_infos[0].keys - + key_infos[1].keys - + key_infos[2].keys - + key_infos[3].keys - + key_infos[4].keys; + int64_t dbsize = key_infos[0].keys + key_infos[1].keys + key_infos[2].keys + + key_infos[3].keys + key_infos[4].keys; res_.AppendInteger(dbsize); } } @@ -1984,12 +2130,14 @@ void DelbackupCmd::Do(std::shared_ptr partition) { int len = dump_dir.size(); for (size_t i = 0; i < dump_dir.size(); i++) { - if (dump_dir[i].substr(0, db_sync_prefix.size()) != db_sync_prefix || dump_dir[i].size() != (db_sync_prefix.size() + 8)) { + if (dump_dir[i].substr(0, db_sync_prefix.size()) != db_sync_prefix || + dump_dir[i].size() != (db_sync_prefix.size() + 8)) { continue; } - std::string str_date = dump_dir[i].substr(db_sync_prefix.size(), (dump_dir[i].size() - db_sync_prefix.size())); - char *end = NULL; + std::string str_date = dump_dir[i].substr( + db_sync_prefix.size(), (dump_dir[i].size() - db_sync_prefix.size())); + char* end = NULL; std::strtol(str_date.c_str(), &end, 10); if (*end != 0) { continue; @@ -2001,7 +2149,8 @@ void DelbackupCmd::Do(std::shared_ptr partition) { slash::DeleteDirIfExist(dump_dir_name); len--; } else { - LOG(INFO) << "Syncing, can not delete " << dump_dir_name << " dump file" << std::endl; + LOG(INFO) << "Syncing, can not delete " << dump_dir_name << " dump file" + << std::endl; } } res_.SetRes(CmdRes::kOk); @@ -2030,7 +2179,7 @@ void ScandbCmd::DoInitial() { if (argv_.size() == 1) { type_ = blackwidow::kAll; } else { - if (!strcasecmp(argv_[1].data(),"string")) { + if (!strcasecmp(argv_[1].data(), "string")) { type_ = blackwidow::kStrings; } else if (!strcasecmp(argv_[1].data(), "hash")) { type_ = blackwidow::kHashes; @@ -2067,14 +2216,18 @@ void SlowlogCmd::DoInitial() { condition_ = SlowlogCmd::kRESET; } else if (argv_.size() == 2 && !strcasecmp(argv_[1].data(), "len")) { condition_ = SlowlogCmd::kLEN; - } else if ((argv_.size() == 2 || argv_.size() == 3) && !strcasecmp(argv_[1].data(), "get")) { + } else if ((argv_.size() == 2 || argv_.size() == 3) && + !strcasecmp(argv_[1].data(), "get")) { condition_ = SlowlogCmd::kGET; - if (argv_.size() == 3 && !slash::string2l(argv_[2].data(), argv_[2].size(), &number_)) { + if (argv_.size() == 3 && + !slash::string2l(argv_[2].data(), argv_[2].size(), &number_)) { res_.SetRes(CmdRes::kInvalidInt); return; } } else { - res_.SetRes(CmdRes::kErrOther, "Unknown SLOWLOG subcommand or wrong # of args. Try GET, RESET, LEN."); + res_.SetRes( + CmdRes::kErrOther, + "Unknown SLOWLOG subcommand or wrong # of args. Try GET, RESET, LEN."); return; } } @@ -2083,7 +2236,7 @@ void SlowlogCmd::Do(std::shared_ptr partition) { if (condition_ == SlowlogCmd::kRESET) { g_pika_server->SlowlogReset(); res_.SetRes(CmdRes::kOk); - } else if (condition_ == SlowlogCmd::kLEN) { + } else if (condition_ == SlowlogCmd::kLEN) { res_.AppendInteger(g_pika_server->SlowlogLen()); } else { std::vector slowlogs; @@ -2114,15 +2267,13 @@ void PaddingCmd::Do(std::shared_ptr partition) { res_.SetRes(CmdRes::kOk); } -std::string PaddingCmd::ToBinlog( - uint32_t exec_time, - uint32_t term_id, - uint64_t logic_id, - uint32_t filenum, - uint64_t offset) { +std::string PaddingCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, + uint64_t logic_id, uint32_t filenum, + uint64_t offset) { return PikaBinlogTransverter::ConstructPaddingBinlog( - BinlogType::TypeFirst, argv_[1].size() + BINLOG_ITEM_HEADER_SIZE - + PADDING_BINLOG_PROTOCOL_SIZE + SPACE_STROE_PARAMETER_LENGTH); + BinlogType::TypeFirst, argv_[1].size() + BINLOG_ITEM_HEADER_SIZE + + PADDING_BINLOG_PROTOCOL_SIZE + + SPACE_STROE_PARAMETER_LENGTH); } #ifdef TCMALLOC_EXTENSION @@ -2155,7 +2306,7 @@ void TcmallocCmd::DoInitial() { void TcmallocCmd::Do(std::shared_ptr partition) { std::vector fli; std::vector elems; - switch(type_) { + switch (type_) { case 0: char stats[1024]; MallocExtension::instance()->GetStats(stats, 1024); @@ -2175,8 +2326,10 @@ void TcmallocCmd::Do(std::shared_ptr partition) { MallocExtension::instance()->GetFreeListSizes(&fli); res_.AppendArrayLen(fli.size()); for (auto& i : fli) { - res_.AppendString("type: " + std::string(i.type) + ", min: " + std::to_string(i.min_object_size) + - ", max: " + std::to_string(i.max_object_size) + ", total: " + std::to_string(i.total_bytes_free)); + res_.AppendString("type: " + std::string(i.type) + + ", min: " + std::to_string(i.min_object_size) + + ", max: " + std::to_string(i.max_object_size) + + ", total: " + std::to_string(i.total_bytes_free)); } break; case 3: @@ -2188,9 +2341,9 @@ void TcmallocCmd::Do(std::shared_ptr partition) { void PKPatternMatchDelCmd::DoInitial() { if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNamePKPatternMatchDel); + res_.SetRes(CmdRes::kWrongNum, kCmdNamePKPatternMatchDel); return; - } + } pattern_ = argv_[1]; if (!strcasecmp(argv_[2].data(), "set")) { type_ = blackwidow::kSets; @@ -2218,8 +2371,17 @@ void PKPatternMatchDelCmd::Do(std::shared_ptr partition) { } } -void DummyCmd::DoInitial() { -} +void DummyCmd::DoInitial() {} + +void DummyCmd::Do(std::shared_ptr partition) {} -void DummyCmd::Do(std::shared_ptr partition) { +void QuitCmd::DoInitial() { + if (!CheckArg(argv_.size())) { + res_.SetRes(CmdRes::kWrongNum, kCmdNameQuit); + } } + +void QuitCmd::Do(std::shared_ptr partition) { + res_.SetRes(CmdRes::kOk); + GetConn()->SetClose(true); +} \ No newline at end of file diff --git a/src/pika_command.cc b/src/pika_command.cc index 8b3f75bcba..927f8fb11d 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -76,6 +76,8 @@ void InitCmdTable(std::unordered_map *cmd_table) { cmd_table->insert(std::pair(kCmdNamePKPatternMatchDel, pkpatternmatchdelptr)); Cmd* dummyptr = new DummyCmd(kCmdDummy, 0, kCmdFlagsWrite | kCmdFlagsSinglePartition); cmd_table->insert(std::pair(kCmdDummy, dummyptr)); + Cmd* quitptr = new QuitCmd(kCmdNameQuit, 1, kCmdFlagsRead); + cmd_table->insert(std::pair(kCmdNameQuit, quitptr)); // Slots related Cmd* slotsinfoptr = new SlotsInfoCmd(kCmdNameSlotsInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin); diff --git a/third/pink b/third/pink index 83f884cfeb..60ac6c5677 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit 83f884cfeb5f5788659c316538c4f355b339bb87 +Subproject commit 60ac6c5677eb1dd51bb9b95e4c2f12a903633d0b