Skip to content

Commit

Permalink
fix: Add isolation between string and hyperloglog( issue#2719) (#2720)
Browse files Browse the repository at this point in the history
* use one bit in reserve to add isolation between string and hyperloglog
  • Loading branch information
saz97 authored Jun 24, 2024
1 parent e131567 commit 3d3c6d1
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Redis {
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Status Get(const Slice& key, std::string* value);
Status HyperloglogGet(const Slice& key, std::string* value);
Status MGet(const Slice& key, std::string* value);
Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Expand All @@ -167,6 +168,7 @@ class Redis {
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Status HyperloglogSet(const Slice& key, const Slice& value);
Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0);
Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret);
Status Setex(const Slice& key, const Slice& value, int64_t ttl);
Expand Down
65 changes: 62 additions & 3 deletions src/storage/src/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis_hyperloglog.h"

#include <algorithm>
#include <cmath>
#include <string>
#include <cstring>

#include "src/storage_murmur3.h"
#include "storage/storage_define.h"
#include "src/redis.h"
#include "src/mutex.h"
#include "src/redis_hyperloglog.h"
#include "src/scope_record_lock.h"

namespace storage {

Expand Down Expand Up @@ -108,7 +115,59 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) {
return result;
}

// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数
// ::__builtin_ctz(x): return the first number of '0' after the first '1' from the right
uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast<uint8_t>(std::min(b, ::__builtin_ctz(x))) + 1; }

} // namespace storage

bool IsHyperloglogObj(const std::string* internal_value_str) {
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
size_t offset = internal_value_str->size() - kStringsValueSuffixLength;
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);

//if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog
return (reserve[0] & hyperloglog_reserve_flag) != 0;;
}

Status Redis::HyperloglogGet(const Slice &key, std::string* value) {
value->clear();

BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}

Status Redis::HyperloglogSet(const Slice &key, const Slice &value) {
HyperloglogValue hyperloglog_value(value);
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode());
}

} // namespace storage
17 changes: 9 additions & 8 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
std::string registers;
std::string result;
auto& inst = GetDBInstance(key);
Status s = inst->Get(key, &value);
Status s = inst->HyperloglogGet(key, &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1573,7 +1573,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
if (previous != now || (s.IsNotFound() && values.empty())) {
*update = true;
}
s = inst->Set(key, result);
s = inst->HyperloglogSet(key, result);
return s;
}

Expand All @@ -1585,19 +1585,20 @@ Status Storage::PfCount(const std::vector<std::string>& keys, int64_t* result) {
std::string value;
std::string first_registers;
auto& inst = GetDBInstance(keys[0]);
Status s = inst->Get(keys[0], &value);
Status s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
first_registers = "";
} else {
return s;
}

HyperLogLog first_log(kPrecision, first_registers);
for (size_t i = 1; i < keys.size(); ++i) {
std::string value;
std::string registers;
auto& inst = GetDBInstance(keys[i]);
s = inst->Get(keys[i], &value);
s = inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1622,7 +1623,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string first_registers;
std::string result;
auto& inst = GetDBInstance(keys[0]);
s = inst->Get(keys[0], &value);
s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1635,7 +1636,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string value;
std::string registers;
auto& tmp_inst = GetDBInstance(keys[i]);
s = tmp_inst->Get(keys[i], &value);
s = tmp_inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1647,7 +1648,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
result = first_log.Merge(log);
}
auto& ninst = GetDBInstance(keys[0]);
s = ninst->Set(keys[0], result);
s = ninst->HyperloglogSet(keys[0], result);
value_to_dest = std::move(result);
return s;
}
Expand Down
27 changes: 27 additions & 0 deletions src/storage/src/strings_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
#include "src/base_value_format.h"
#include "storage/storage_define.h"


namespace storage {
/*
* | type | value | reserve | cdate | timestamp |
* | 1B | | 16B | 8B | 8B |
* The first bit in reservse field is used to isolate string and hyperloglog
*/
// 80H = 1000000B
constexpr uint8_t hyperloglog_reserve_flag = 0x80;
class StringsValue : public InternalValue {
public:
explicit StringsValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
Expand All @@ -38,6 +42,29 @@ class StringsValue : public InternalValue {
}
};

class HyperloglogValue : public InternalValue {
public:
explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
virtual rocksdb::Slice Encode() override {
size_t usize = user_value_.size();
size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength;
char* dst = ReAllocIfNeeded(needed);
memcpy(dst, &type_, sizeof(type_));
dst += sizeof(type_);
char* start_pos = dst;

memcpy(dst, user_value_.data(), usize);
dst += usize;
reserve_[0] |= hyperloglog_reserve_flag;
memcpy(dst, reserve_, kSuffixReserveLength);
dst += kSuffixReserveLength;
EncodeFixed64(dst, ctime_);
dst += kTimestampLength;
EncodeFixed64(dst, etime_);
return {start_, needed};
}
};

class ParsedStringsValue : public ParsedInternalValue {
public:
// Use this constructor after rocksdb::DB::Get();
Expand Down
2 changes: 1 addition & 1 deletion tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1
# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent
#
# Example:
# rename-command : FLUSHDB 360flushdb
# rename-command : FLUSHDB 360flushdb
Loading

0 comments on commit 3d3c6d1

Please sign in to comment.