Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add isolation between string and hyperloglog( issue#2719) #2720

Merged
merged 11 commits into from
Jun 24, 2024
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Redis {
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Status Get(const Slice& key, std::string* value);
Status HyperloglogGet(const Slice& key, std::string* value);
Status MGet(const Slice& key, std::string* value);
Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Expand All @@ -167,6 +168,7 @@ class Redis {
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Status HyperloglogSet(const Slice& key, const Slice& value);
Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0);
Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret);
Status Setex(const Slice& key, const Slice& value, int64_t ttl);
Expand Down
65 changes: 62 additions & 3 deletions src/storage/src/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis_hyperloglog.h"

#include <algorithm>
#include <cmath>
#include <string>
#include <cstring>

#include "src/storage_murmur3.h"
#include "storage/storage_define.h"
#include "src/redis.h"
#include "src/mutex.h"
#include "src/redis_hyperloglog.h"
#include "src/scope_record_lock.h"

namespace storage {

Expand Down Expand Up @@ -108,7 +115,59 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) {
return result;
}

// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数
// ::__builtin_ctz(x): return the first number of '0' after the first '1' from the right
uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast<uint8_t>(std::min(b, ::__builtin_ctz(x))) + 1; }

} // namespace storage

bool IsHyperloglogObj(const std::string* internal_value_str) {
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
size_t offset = internal_value_str->size() - kStringsValueSuffixLength;
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);

//if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog
return (reserve[0] & hyperloglog_reserve_flag) != 0;;
}

Status Redis::HyperloglogGet(const Slice &key, std::string* value) {
value->clear();

BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
saz97 marked this conversation as resolved.
Show resolved Hide resolved
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}
Comment on lines +132 to +163
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance error handling and messages in HyperloglogGet.

The error handling in HyperloglogGet can be made more descriptive. It currently uses generic messages that do not provide enough context about the expected and actual types. Furthermore, there's a repeated typo in the error messages and a redundant semicolon in the return statement.

- return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + ", expect type: " + "hyperloglog " + "get type: " + DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
+ return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + ", expected type: HyperLogLog, actual type: " + DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status Redis::HyperloglogGet(const Slice &key, std::string* value) {
value->clear();
BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expected type: HyperLogLog, actual type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expected type: HyperLogLog, actual type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}


Status Redis::HyperloglogSet(const Slice &key, const Slice &value) {
HyperloglogValue hyperloglog_value(value);
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode());
}

} // namespace storage
17 changes: 9 additions & 8 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
std::string registers;
std::string result;
auto& inst = GetDBInstance(key);
Status s = inst->Get(key, &value);
Status s = inst->HyperloglogGet(key, &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1573,7 +1573,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
if (previous != now || (s.IsNotFound() && values.empty())) {
*update = true;
}
s = inst->Set(key, result);
s = inst->HyperloglogSet(key, result);
return s;
}

Expand All @@ -1585,19 +1585,20 @@ Status Storage::PfCount(const std::vector<std::string>& keys, int64_t* result) {
std::string value;
std::string first_registers;
auto& inst = GetDBInstance(keys[0]);
Status s = inst->Get(keys[0], &value);
Status s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
first_registers = "";
saz97 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return s;
}

HyperLogLog first_log(kPrecision, first_registers);
for (size_t i = 1; i < keys.size(); ++i) {
std::string value;
std::string registers;
auto& inst = GetDBInstance(keys[i]);
s = inst->Get(keys[i], &value);
s = inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1622,7 +1623,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string first_registers;
std::string result;
auto& inst = GetDBInstance(keys[0]);
s = inst->Get(keys[0], &value);
s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1635,7 +1636,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string value;
std::string registers;
auto& tmp_inst = GetDBInstance(keys[i]);
s = tmp_inst->Get(keys[i], &value);
s = tmp_inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1647,7 +1648,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
result = first_log.Merge(log);
}
auto& ninst = GetDBInstance(keys[0]);
s = ninst->Set(keys[0], result);
s = ninst->HyperloglogSet(keys[0], result);
value_to_dest = std::move(result);
return s;
}
Expand Down
27 changes: 27 additions & 0 deletions src/storage/src/strings_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
#include "src/base_value_format.h"
#include "storage/storage_define.h"


namespace storage {
/*
* | type | value | reserve | cdate | timestamp |
* | 1B | | 16B | 8B | 8B |
* The first bit in reservse field is used to isolate string and hyperloglog
*/
// 80H = 1000000B
constexpr uint8_t hyperloglog_reserve_flag = 0x80;
class StringsValue : public InternalValue {
public:
explicit StringsValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
Expand All @@ -38,6 +42,29 @@ class StringsValue : public InternalValue {
}
};

class HyperloglogValue : public InternalValue {
public:
explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
virtual rocksdb::Slice Encode() override {
size_t usize = user_value_.size();
size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength;
char* dst = ReAllocIfNeeded(needed);
memcpy(dst, &type_, sizeof(type_));
dst += sizeof(type_);
char* start_pos = dst;

memcpy(dst, user_value_.data(), usize);
dst += usize;
reserve_[0] |= hyperloglog_reserve_flag;
memcpy(dst, reserve_, kSuffixReserveLength);
dst += kSuffixReserveLength;
EncodeFixed64(dst, ctime_);
dst += kTimestampLength;
EncodeFixed64(dst, etime_);
return {start_, needed};
}
};

class ParsedStringsValue : public ParsedInternalValue {
public:
// Use this constructor after rocksdb::DB::Get();
Expand Down
2 changes: 1 addition & 1 deletion tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1
# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent
#
# Example:
# rename-command : FLUSHDB 360flushdb
# rename-command : FLUSHDB 360flushdb
Loading
Loading