Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add isolation between string and hyperloglog( issue#2719) #2720

Merged
merged 11 commits into from
Jun 24, 2024
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Redis {
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Status Get(const Slice& key, std::string* value);
Status HyperloglogGet(const Slice& key, std::string* value);
Status MGet(const Slice& key, std::string* value);
Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl);
Expand All @@ -167,6 +168,7 @@ class Redis {
Status MSet(const std::vector<KeyValue>& kvs);
Status MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret);
Status Set(const Slice& key, const Slice& value);
Status HyperloglogSet(const Slice& key, const Slice& value);
Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0);
Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret);
Status Setex(const Slice& key, const Slice& value, int64_t ttl);
Expand Down
67 changes: 65 additions & 2 deletions src/storage/src/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "src/redis_hyperloglog.h"

#include <algorithm>
#include <cmath>
#include <string>
#include <cstring>

#include "src/storage_murmur3.h"
#include "storage/storage_define.h"
#include "src/redis.h"
#include "src/mutex.h"
#include "src/redis_hyperloglog.h"
#include "src/scope_record_lock.h"

namespace storage {

Expand Down Expand Up @@ -111,4 +118,60 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) {
// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数
saz97 marked this conversation as resolved.
Show resolved Hide resolved
uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast<uint8_t>(std::min(b, ::__builtin_ctz(x))) + 1; }

} // namespace storage

bool IsHyperloglogObj(std::string *internal_value_str) {
saz97 marked this conversation as resolved.
Show resolved Hide resolved
size_t offset = 0;
size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength;
char reserve[16] = {0};
offset += kTypeLength;
rocksdb::Slice user_value_;
saz97 marked this conversation as resolved.
Show resolved Hide resolved
offset += (rocksdb::Slice(internal_value_str->data() + offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我感觉,你从后往前找reserve的位置是不是好一点,这样看起来比较绕。指针指向string最后一个字符,然后向前移动kStringsValueSuffixLength就是reserve的起始位置。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题解决

internal_value_str->size() - kStringsValueSuffixLength - offset)).size();
memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength);

//if first bit in reserve is 0 , then this obj is string; else the obj is hll
saz97 marked this conversation as resolved.
Show resolved Hide resolved
return (reserve[0] & 0x80) != 0;;
saz97 marked this conversation as resolved.
Show resolved Hide resolved
}

Status Redis::HyperloglogGet(const Slice &key, std::string *value) {
saz97 marked this conversation as resolved.
Show resolved Hide resolved
value->clear();

BaseKey base_key(key);
Status s = db_->Get(default_read_options_, base_key.Encode(), value);
std::string meta_value = *value;
if (!s.ok()) {
return s;
saz97 marked this conversation as resolved.
Show resolved Hide resolved
}
if (!ExpectedMetaValue(DataType::kStrings, meta_value)) {
if (ExpectedStale(meta_value)) {
s = Status::NotFound();
} else {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
", expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
}
} else if (!IsHyperloglogObj(value)) {
return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() +
",expect type: " + "hyperloglog " + "get type: " +
DataTypeStrings[static_cast<int>(GetMetaValueType(meta_value))]);
} else {
ParsedStringsValue parsed_strings_value(value);
if (parsed_strings_value.IsStale()) {
value->clear();
return Status::NotFound("Stale");
} else {
parsed_strings_value.StripSuffix();
}
}
return s;
}

Status Redis::HyperloglogSet(const Slice &key, const Slice &value) {
HyperloglogValue hyperloglog_value(value);
ScopeRecordLock l(lock_mgr_, key);

BaseKey base_key(key);
return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode());
}

} // namespace storage
17 changes: 9 additions & 8 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
std::string registers;
std::string result;
auto& inst = GetDBInstance(key);
Status s = inst->Get(key, &value);
Status s = inst->HyperloglogGet(key, &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1571,7 +1571,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector<std::string>& values,
if (previous != now || (s.IsNotFound() && values.empty())) {
*update = true;
}
s = inst->Set(key, result);
s = inst->HyperloglogSet(key, result);
return s;
}

Expand All @@ -1583,19 +1583,20 @@ Status Storage::PfCount(const std::vector<std::string>& keys, int64_t* result) {
std::string value;
std::string first_registers;
auto& inst = GetDBInstance(keys[0]);
Status s = inst->Get(keys[0], &value);
Status s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
first_registers = "";
saz97 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return s;
}

HyperLogLog first_log(kPrecision, first_registers);
for (size_t i = 1; i < keys.size(); ++i) {
std::string value;
std::string registers;
auto& inst = GetDBInstance(keys[i]);
s = inst->Get(keys[i], &value);
s = inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = value;
} else if (s.IsNotFound()) {
Expand All @@ -1620,7 +1621,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string first_registers;
std::string result;
auto& inst = GetDBInstance(keys[0]);
s = inst->Get(keys[0], &value);
s = inst->HyperloglogGet(keys[0], &value);
if (s.ok()) {
first_registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1633,7 +1634,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
std::string value;
std::string registers;
auto& tmp_inst = GetDBInstance(keys[i]);
s = tmp_inst->Get(keys[i], &value);
s = tmp_inst->HyperloglogGet(keys[i], &value);
if (s.ok()) {
registers = std::string(value.data(), value.size());
} else if (s.IsNotFound()) {
Expand All @@ -1645,7 +1646,7 @@ Status Storage::PfMerge(const std::vector<std::string>& keys, std::string& value
result = first_log.Merge(log);
}
auto& ninst = GetDBInstance(keys[0]);
s = ninst->Set(keys[0], result);
s = ninst->HyperloglogSet(keys[0], result);
value_to_dest = std::move(result);
return s;
}
Expand Down
23 changes: 23 additions & 0 deletions src/storage/src/strings_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ class StringsValue : public InternalValue {
}
};

class HyperloglogValue : public InternalValue {
public:
explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {}
virtual rocksdb::Slice Encode() override {
size_t usize = user_value_.size();
size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength;
char* dst = ReAllocIfNeeded(needed);
memcpy(dst, &type_, sizeof(type_));
dst += sizeof(type_);
char* start_pos = dst;

memcpy(dst, user_value_.data(), usize);
dst += usize;
reserve_[0] = 0x80;
memcpy(dst, reserve_, kSuffixReserveLength);
dst += kSuffixReserveLength;
EncodeFixed64(dst, ctime_);
dst += kTimestampLength;
EncodeFixed64(dst, etime_);
return {start_, needed};
}
};

class ParsedStringsValue : public ParsedInternalValue {
public:
// Use this constructor after rocksdb::DB::Get();
Expand Down
27 changes: 23 additions & 4 deletions tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -101,6 +108,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down Expand Up @@ -308,6 +317,11 @@ max-write-buffer-num : 2
# whether the key exists. Setting this value too high may hurt performance.
min-write-buffer-number-to-merge : 1

# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families
# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when
# process restart.
max-total-wal-size : 1073741824

# rocksdb level0_stop_writes_trigger
level0-stop-writes-trigger : 36

Expand Down Expand Up @@ -466,9 +480,14 @@ default-slot-num : 1024
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1

# Rsync Rate limiting configuration 200MB/s
# Rsync Rate limiting configuration [Default value is 200MB/s]
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
rsync-timeout-ms : 1000
# The valid range for max-rsync-parallel-num is [1, 4].
# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4.
max-rsync-parallel-num : 4
Expand Down
Loading
Loading