Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add isolation between string and hyperloglog( issue#2719) #2720

Merged
merged 11 commits into from
Jun 24, 2024
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Redis {
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Status Get(const Slice& key, std::string* value);
Status HyperloglogGet(const Slice& key, std::string* value);
Status MGet(const Slice& key, std::string* value);
Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Expand All @@ -167,6 +168,7 @@ class Redis {
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Status HyperloglogSet(const Slice& key, const Slice& value);
Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0);
Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret);
Status Setex(const Slice& key, const Slice& value, int64_t ttl);
Expand Down
68 changes: 65 additions & 3 deletions src/storage/src/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis_hyperloglog.h"

#include <algorithm>
#include <cmath>
#include <string>
#include <cstring>

#include "src/storage_murmur3.h"
#include "storage/storage_define.h"
#include "src/redis.h"
#include "src/mutex.h"
#include "src/redis_hyperloglog.h"
#include "src/scope_record_lock.h"

namespace storage {

Expand Down Expand Up @@ -108,7 +115,62 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) {
return result;
}

// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数
// ::__builtin_ctz(x): return the first number of '0' after the first '1' from the right
uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast<uint8_t>(std::min(b, ::__builtin_ctz(x))) + 1; }

} // namespace storage

bool IsHyperloglogObj(const std::string* internal_value_str) {
size_t offset = 0;
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
offset += kTypeLength;
offset += (rocksdb::Slice(internal_value_str->data() + offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我感觉,你从后往前找reserve的位置是不是好一点,这样看起来比较绕。指针指向string最后一个字符,然后向前移动kStringsValueSuffixLength就是reserve的起始位置。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题解决

internal_value_str->size() - kStringsValueSuffixLength - offset)).size();
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);

//if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog
return (reserve[0] & hyperloglog_reserve_flag) != 0;;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor bitwise operation for clarity and robustness.

The function IsHyperloglogObj uses a bitwise operation to determine the type of the object. The current implementation relies on a magic number and could benefit from using a named constant for hyperloglog_reserve_flag to improve code readability and maintainability.

- return (reserve[0] & hyperloglog_reserve_flag) != 0;;
+ const char HYPERLOGLOG_RESERVE_FLAG = 0x80; // This should be defined globally or in an appropriate location
+ return (reserve[0] & HYPERLOGLOG_RESERVE_FLAG) != 0;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool IsHyperloglogObj(const std::string* internal_value_str) {
size_t offset = 0;
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
offset += kTypeLength;
offset += (rocksdb::Slice(internal_value_str->data() + offset,
internal_value_str->size() - kStringsValueSuffixLength - offset)).size();
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);
//if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog
return (reserve[0] & hyperloglog_reserve_flag) != 0;;
}
bool IsHyperloglogObj(const std::string* internal_value_str) {
size_t offset = 0;
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
offset += kTypeLength;
offset += (rocksdb::Slice(internal_value_str->data() + offset,
internal_value_str->size() - kStringsValueSuffixLength - offset)).size();
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);
const char HYPERLOGLOG_RESERVE_FLAG = 0x80; // This should be defined globally or in an appropriate location
//if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog
return (reserve[0] & HYPERLOGLOG_RESERVE_FLAG) != 0;
}


Status Redis::HyperloglogGet(const Slice &key, std::string* value) {
value->clear();

BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
saz97 marked this conversation as resolved.
Show resolved Hide resolved
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}
Comment on lines +132 to +163
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance error handling and messages in HyperloglogGet.

The error handling in HyperloglogGet can be made more descriptive. It currently uses generic messages that do not provide enough context about the expected and actual types. Furthermore, there's a repeated typo in the error messages and a redundant semicolon in the return statement.

- return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + ", expect type: " + "hyperloglog " + "get type: " + DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
+ return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + ", expected type: HyperLogLog, actual type: " + DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Redis::HyperloglogGet(const Slice &key, std::string* value) {
value->clear();
BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expected type: HyperLogLog, actual type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expected type: HyperLogLog, actual type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}


Status Redis::HyperloglogSet(const Slice &key, const Slice &value) {
HyperloglogValue hyperloglog_value(value);
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode());
}

} // namespace storage
17 changes: 9 additions & 8 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
std::string registers;
std::string result;
auto& inst = GetDBInstance(key);
Status s = inst->Get(key, &value);
Status s = inst->HyperloglogGet(key, &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1573,7 +1573,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
if (previous != now || (s.IsNotFound() && values.empty())) {
*update = true;
}
s = inst->Set(key, result);
s = inst->HyperloglogSet(key, result);
return s;
}

Expand All @@ -1585,19 +1585,20 @@ Status Storage::PfCount(const std::vector<std::string>& keys, int64_t* result) {
std::string value;
std::string first_registers;
auto& inst = GetDBInstance(keys[0]);
Status s = inst->Get(keys[0], &value);
Status s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
first_registers = "";
saz97 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return s;
}

HyperLogLog first_log(kPrecision, first_registers);
for (size_t i = 1; i < keys.size(); ++i) {
std::string value;
std::string registers;
auto& inst = GetDBInstance(keys[i]);
s = inst->Get(keys[i], &value);
s = inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1622,7 +1623,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string first_registers;
std::string result;
auto& inst = GetDBInstance(keys[0]);
s = inst->Get(keys[0], &value);
s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1635,7 +1636,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string value;
std::string registers;
auto& tmp_inst = GetDBInstance(keys[i]);
s = tmp_inst->Get(keys[i], &value);
s = tmp_inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1647,7 +1648,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
result = first_log.Merge(log);
}
auto& ninst = GetDBInstance(keys[0]);
s = ninst->Set(keys[0], result);
s = ninst->HyperloglogSet(keys[0], result);
value_to_dest = std::move(result);
return s;
}
Expand Down
27 changes: 27 additions & 0 deletions src/storage/src/strings_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
#include "src/base_value_format.h"
#include "storage/storage_define.h"


namespace storage {
/*
* | type | value | reserve | cdate | timestamp |
* | 1B | | 16B | 8B | 8B |
* The first bit in reservse field is used to isolate string and hyperloglog
*/
// 80H = 1000000B
constexpr uint8_t hyperloglog_reserve_flag = 0x80;
class StringsValue : public InternalValue {
public:
explicit StringsValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
Expand All @@ -38,6 +42,29 @@ class StringsValue : public InternalValue {
}
};

class HyperloglogValue : public InternalValue {
public:
explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
virtual rocksdb::Slice Encode() override {
size_t usize = user_value_.size();
size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength;
char* dst = ReAllocIfNeeded(needed);
memcpy(dst, &type_, sizeof(type_));
dst += sizeof(type_);
char* start_pos = dst;

memcpy(dst, user_value_.data(), usize);
dst += usize;
reserve_[0] |= hyperloglog_reserve_flag;
memcpy(dst, reserve_, kSuffixReserveLength);
dst += kSuffixReserveLength;
EncodeFixed64(dst, ctime_);
dst += kTimestampLength;
EncodeFixed64(dst, etime_);
return {start_, needed};
}
};

class ParsedStringsValue : public ParsedInternalValue {
public:
// Use this constructor after rocksdb::DB::Get();
Expand Down
2 changes: 1 addition & 1 deletion tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1
# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent
#
# Example:
# rename-command : FLUSHDB 360flushdb
# rename-command : FLUSHDB 360flushdb
Loading
Loading