diff --git a/src/pika_command.cc b/src/pika_command.cc index a40cb77f3..b374218cb 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNamePfCount, std::move(pfcountptr))); ////pfmergeCmd std::unique_ptr pfmergeptr = std::make_unique( - kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow); + kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow); cmd_table->insert(std::pair>(kCmdNamePfMerge, std::move(pfmergeptr))); // GEO diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index d818fc3e7..ccad63526 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -154,6 +154,7 @@ class Redis { Status BitOp(BitOpType op, const std::string& dest_key, const std::vector& src_keys, std::string &value_to_dest, int64_t* ret); Status Decrby(const Slice& key, int64_t value, int64_t* ret); Status Get(const Slice& key, std::string* value); + Status HyperloglogGet(const Slice& key, std::string* value); Status MGet(const Slice& key, std::string* value); Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl); Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl); @@ -167,6 +168,7 @@ class Redis { Status MSet(const std::vector& kvs); Status MSetnx(const std::vector& kvs, int32_t* ret); Status Set(const Slice& key, const Slice& value); + Status HyperloglogSet(const Slice& key, const Slice& value); Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret); Status Setex(const Slice& key, const Slice& value, int64_t ttl); diff --git a/src/storage/src/redis_hyperloglog.cc b/src/storage/src/redis_hyperloglog.cc index 52dae4246..c9cd1dd4c 100644 --- a/src/storage/src/redis_hyperloglog.cc +++ b/src/storage/src/redis_hyperloglog.cc @@ -3,11 +3,18 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "src/redis_hyperloglog.h" + #include #include #include +#include + #include "src/storage_murmur3.h" +#include "storage/storage_define.h" +#include "src/redis.h" +#include "src/mutex.h" +#include "src/redis_hyperloglog.h" +#include "src/scope_record_lock.h" namespace storage { @@ -108,7 +115,59 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) { return result; } -// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数 +// ::__builtin_ctz(x): return the first number of '0' after the first '1' from the right uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast(std::min(b, ::__builtin_ctz(x))) + 1; } -} // namespace storage + +bool IsHyperloglogObj(const std::string* internal_value_str) { + size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength; + char reserve[16] = {0}; + size_t offset = internal_value_str->size() - kStringsValueSuffixLength; + memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength); + + //if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog + return (reserve[0] & hyperloglog_reserve_flag) != 0;; +} + +Status Redis::HyperloglogGet(const Slice &key, std::string* value) { + value->clear(); + + BaseKey base_key(key); + Status s = db_->Get(default_read_options_, base_key.Encode(), value); + std::string meta_value = *value; + if (!s.ok()) { + return s; + } + if (!ExpectedMetaValue(DataType::kStrings, meta_value)) { + if (ExpectedStale(meta_value)) { + s = Status::NotFound(); + } else { + return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + + ", expect type: " + "hyperloglog " + "get type: " + + DataTypeStrings[static_cast(GetMetaValueType(meta_value))]); + } + } else if (!IsHyperloglogObj(value)) { + return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + + ",expect type: " + "hyperloglog " + "get type: " + + DataTypeStrings[static_cast(GetMetaValueType(meta_value))]); + } else { + ParsedStringsValue parsed_strings_value(value); + if (parsed_strings_value.IsStale()) { + value->clear(); + return Status::NotFound("Stale"); + } else { + parsed_strings_value.StripSuffix(); + } + } + return s; +} + +Status Redis::HyperloglogSet(const Slice &key, const Slice &value) { + HyperloglogValue hyperloglog_value(value); + ScopeRecordLock l(lock_mgr_, key); + + BaseKey base_key(key); + return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode()); +} + +} // namespace storage \ No newline at end of file diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index ddeac6dd3..a26478392 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1555,7 +1555,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector& values, std::string registers; std::string result; auto& inst = GetDBInstance(key); - Status s = inst->Get(key, &value); + Status s = inst->HyperloglogGet(key, &value); if (s.ok()) { registers = value; } else if (s.IsNotFound()) { @@ -1573,7 +1573,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector& values, if (previous != now || (s.IsNotFound() && values.empty())) { *update = true; } - s = inst->Set(key, result); + s = inst->HyperloglogSet(key, result); return s; } @@ -1585,19 +1585,20 @@ Status Storage::PfCount(const std::vector& keys, int64_t* result) { std::string value; std::string first_registers; auto& inst = GetDBInstance(keys[0]); - Status s = inst->Get(keys[0], &value); + Status s = inst->HyperloglogGet(keys[0], &value); if (s.ok()) { first_registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { first_registers = ""; + } else { + return s; } - HyperLogLog first_log(kPrecision, first_registers); for (size_t i = 1; i < keys.size(); ++i) { std::string value; std::string registers; auto& inst = GetDBInstance(keys[i]); - s = inst->Get(keys[i], &value); + s = inst->HyperloglogGet(keys[i], &value); if (s.ok()) { registers = value; } else if (s.IsNotFound()) { @@ -1622,7 +1623,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value std::string first_registers; std::string result; auto& inst = GetDBInstance(keys[0]); - s = inst->Get(keys[0], &value); + s = inst->HyperloglogGet(keys[0], &value); if (s.ok()) { first_registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { @@ -1635,7 +1636,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value std::string value; std::string registers; auto& tmp_inst = GetDBInstance(keys[i]); - s = tmp_inst->Get(keys[i], &value); + s = tmp_inst->HyperloglogGet(keys[i], &value); if (s.ok()) { registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { @@ -1647,7 +1648,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value result = first_log.Merge(log); } auto& ninst = GetDBInstance(keys[0]); - s = ninst->Set(keys[0], result); + s = ninst->HyperloglogSet(keys[0], result); value_to_dest = std::move(result); return s; } diff --git a/src/storage/src/strings_value_format.h b/src/storage/src/strings_value_format.h index 96b9d4d27..6e001d747 100644 --- a/src/storage/src/strings_value_format.h +++ b/src/storage/src/strings_value_format.h @@ -11,11 +11,15 @@ #include "src/base_value_format.h" #include "storage/storage_define.h" + namespace storage { /* * | type | value | reserve | cdate | timestamp | * | 1B | | 16B | 8B | 8B | +* The first bit in reservse field is used to isolate string and hyperloglog */ + // 80H = 1000000B +constexpr uint8_t hyperloglog_reserve_flag = 0x80; class StringsValue : public InternalValue { public: explicit StringsValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {} @@ -38,6 +42,29 @@ class StringsValue : public InternalValue { } }; +class HyperloglogValue : public InternalValue { + public: + explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {} + virtual rocksdb::Slice Encode() override { + size_t usize = user_value_.size(); + size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength; + char* dst = ReAllocIfNeeded(needed); + memcpy(dst, &type_, sizeof(type_)); + dst += sizeof(type_); + char* start_pos = dst; + + memcpy(dst, user_value_.data(), usize); + dst += usize; + reserve_[0] |= hyperloglog_reserve_flag; + memcpy(dst, reserve_, kSuffixReserveLength); + dst += kSuffixReserveLength; + EncodeFixed64(dst, ctime_); + dst += kTimestampLength; + EncodeFixed64(dst, etime_); + return {start_, needed}; + } +}; + class ParsedStringsValue : public ParsedInternalValue { public: // Use this constructor after rocksdb::DB::Get(); diff --git a/tests/assets/default.conf b/tests/assets/default.conf index 468d253e8..d5d1318f5 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1 # Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent # # Example: -# rename-command : FLUSHDB 360flushdb +# rename-command : FLUSHDB 360flushdb \ No newline at end of file diff --git a/tests/unit/type/hyperloglog.tcl b/tests/unit/type/hyperloglog.tcl new file mode 100644 index 000000000..1f719cc4d --- /dev/null +++ b/tests/unit/type/hyperloglog.tcl @@ -0,0 +1,262 @@ +start_server {tags {"hll"}} { +# Pika does not support the pfdebug command +# test {HyperLogLog self test passes} { +# catch {r pfselftest} e +# set e +# } {OK} + + test {PFADD without arguments creates an HLL value} { + r pfadd hll + r exists hll + } {1} + + test {Approximated cardinality after creation is zero} { + r pfcount hll + } {0} + + test {PFADD returns 1 when at least 1 reg was modified} { + r pfadd hll a b c + } {1} + + test {PFADD returns 0 when no reg was modified} { + r pfadd hll a b c + } {0} + + test {PFADD works with empty string (regression)} { + r pfadd hll "" + } + + # Note that the self test stresses much better the + # cardinality estimation error. We are testing just the + # command implementation itself here. + test {PFCOUNT returns approximated cardinality of set} { + r del hll + set res {} + r pfadd hll 1 2 3 4 5 + lappend res [r pfcount hll] + # Call it again to test cached value invalidation. + r pfadd hll 6 7 8 8 9 10 + lappend res [r pfcount hll] + set res + } {5 10} + +# This parameter is not available in Pika +# test {HyperLogLogs are promote from sparse to dense} { +# r del hll +# r config set hll-sparse-max-bytes 3000 +# set n 0 +# while {$n < 100} { +# set elements {} +# for {set j 0} {$j < 100} {incr j} {lappend elements [expr rand()]} +# incr n 100 +# r pfadd hll {*}$elements +# set card [r pfcount hll] +# set err [expr {abs($card-$n)}] +# assert {$err < (double($card)/100)*5} +# if {$n < 1000} { +# assert {[r pfdebug encoding hll] eq {sparse}} +# } elseif {$n > 10000} { +# assert {[r pfdebug encoding hll] eq {dense}} +# } +# } +# } + +# Pika does not support the pfdebug command +# test {HyperLogLog sparse encoding stress test} { +# for {set x 0} {$x < 1000} {incr x} { +# r del hll1 hll2 +# set numele [randomInt 100] +# set elements {} +# for {set j 0} {$j < $numele} {incr j} { +# lappend elements [expr rand()] +# } + # Force dense representation of hll2 +# r pfadd hll2 +# r pfdebug todense hll2 +# r pfadd hll1 {*}$elements +# r pfadd hll2 {*}$elements +# assert {[r pfdebug encoding hll1] eq {sparse}} +# assert {[r pfdebug encoding hll2] eq {dense}} +# # Cardinality estimated should match exactly. +# assert {[r pfcount hll1] eq [r pfcount hll2]} +# } +# } + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Additionl at tail} { + r del hll + r pfadd hll a b c + r append hll "hello" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Broken magic} { + r del hll + r pfadd hll a b c + r setrange hll 0 "0123" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Invalid encoding} { + r del hll + r pfadd hll a b c + r setrange hll 4 "x" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted dense HyperLogLogs are detected: Wrong length} { + r del hll + r pfadd hll a b c + r setrange hll 4 "\x00" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {PFADD, PFCOUNT, PFMERGE type checking works} { + r set foo bar + catch {r pfadd foo 1} e + assert_match {*WRONGTYPE*} $e + catch {r pfcount foo} e + assert_match {*WRONGTYPE*} $e + catch {r pfmerge bar foo} e + assert_match {*WRONGTYPE*} $e + # catch {r pfmerge foo bar} e + # assert_match {*WRONGTYPE*} $e + } + + test {PFMERGE results on the cardinality of union of sets} { + r del hll hll1 hll2 hll3 + r pfadd hll1 a b c + r pfadd hll2 b c d + r pfadd hll3 c d e + r pfmerge hll hll1 hll2 hll3 + r pfcount hll + } {5} + +# The return value of Pika is inconsistent with Redis + test {PFCOUNT multiple-keys merge returns cardinality of union} { + r del hll1 hll2 hll3 + for {set x 1} {$x < 100} {incr x} { + # Force dense representation of hll2 + r pfadd hll1 "foo-$x" + r pfadd hll2 "bar-$x" + r pfadd hll3 "zap-$x" + + set card [r pfcount hll1 hll2 hll3] + set realcard [expr {$x*3}] + set err [expr {abs($card-$realcard)}] + assert {$err < (double($card)/100)*5} + } + } + +# The return value of Pika is inconsistent with Redis +# test {HYPERLOGLOG press test: 5w, 10w, 15w, 20w, 30w, 50w, 100w} { +# r del hll1 +# for {set x 1} {$x <= 1000000} {incr x} { +# r pfadd hll1 "foo-$x" +# if {$x == 50000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 100000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 150000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 300000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 500000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 1000000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.03} +# } +# } +# } + +# Pika does not support the pfdebug command +# test {PFDEBUG GETREG returns the HyperLogLog raw registers} { +# r del hll +# r pfadd hll 1 2 3 +# llength [r pfdebug getreg hll] +# } {16384} + +# Pika does not support the pfdebug command +# test {PFDEBUG GETREG returns the HyperLogLog raw registers} { +# r del hll +# r pfadd hll 1 2 3 +# llength [r pfdebug getreg hll] +# } {16384} + +# The return value of Pika is inconsistent with Redis + test {PFADD / PFCOUNT cache invalidation works} { + r del hll + r pfadd hll a b c + r pfcount hll + assert {[r getrange hll 15 15] eq "\x00"} + r pfadd hll a b c + assert {[r getrange hll 15 15] eq "\x00"} + # r pfadd hll 1 2 3 + # assert {[r getrange hll 15 15] eq "\x80"} + } +}