Skip to content

Commit

Permalink
add streams data column-family compactionfilter (OpenAtomFoundation#2799
Browse files Browse the repository at this point in the history
)

Co-authored-by: wangshaoyi <[email protected]>
  • Loading branch information
wangshao1 and wangshaoyi authored Jul 18, 2024
1 parent 1f2de07 commit 7505072
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 0 deletions.
75 changes: 75 additions & 0 deletions src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "rocksdb/compaction_filter.h"
#include "src/base_data_key_format.h"
#include "src/base_meta_value_format.h"
#include "src/pika_stream_meta_value.h"
#include "src/debug.h"

namespace storage {
Expand Down Expand Up @@ -141,6 +142,80 @@ class BaseDataFilterFactory : public rocksdb::CompactionFilterFactory {
std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_ = nullptr;
};

class StreamsDataFilter : public rocksdb::CompactionFilter {
public:
StreamsDataFilter(rocksdb::DB* db, std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr)
: db_(db),
cf_handles_ptr_(cf_handles_ptr)
{}

bool Filter(int level, const Slice& key, const rocksdb::Slice& value, std::string* new_value,
bool* value_changed) const override {
ParsedStreamDataKey parsed_stream_data_key(key);
TRACE("==========================START==========================");
TRACE("[DataFilter], key: %s, id = %s, version = %d", parsed_base_data_key.key().ToString().c_str(),
parsed_base_data_key.data().ToString().c_str(), parsed_base_data_key.version());

if (parsed_stream_data_key.key().ToString() != cur_key_) {
cur_key_ = parsed_stream_data_key.key().ToString();
std::string meta_value;
// destroyed when close the database, Reserve Current key value
if (cf_handles_ptr_->empty()) {
return false;
}
Status s = db_->Get(default_read_options_, (*cf_handles_ptr_)[0], cur_key_, &meta_value);
if (s.ok()) {
meta_not_found_ = false;
ParsedStreamMetaValue parsed_base_meta_value(meta_value);
cur_meta_version_ = parsed_base_meta_value.version();
} else if (s.IsNotFound()) {
meta_not_found_ = true;
} else {
cur_key_ = "";
TRACE("Reserve[Get meta_key faild]");
return false;
}
}

if (meta_not_found_) {
TRACE("Drop[Meta key not exist]");
return true;
}

if (cur_meta_version_ > parsed_stream_data_key.version()) {
TRACE("Drop[data_key_version < cur_meta_version]");
return true;
}
TRACE("Reserve[data_key_version == cur_meta_version]");
return false;
}

const char* Name() const override { return "StreamsDataFilter"; }

private:
rocksdb::DB* db_ = nullptr;
std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_ = nullptr;
rocksdb::ReadOptions default_read_options_;
mutable std::string cur_key_;
mutable bool meta_not_found_ = false;
mutable int32_t cur_meta_version_ = 0;
};

class StreamsDataFilterFactory : public rocksdb::CompactionFilterFactory {
public:
StreamsDataFilterFactory(rocksdb::DB** db_ptr, std::vector<rocksdb::ColumnFamilyHandle*>* handles_ptr)
: db_ptr_(db_ptr), cf_handles_ptr_(handles_ptr) {}
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
return std::unique_ptr<rocksdb::CompactionFilter>(new StreamsDataFilter(*db_ptr_, cf_handles_ptr_));
}
const char* Name() const override { return "StreamsDataFilterFactory"; }

private:
rocksdb::DB** db_ptr_ = nullptr;
std::vector<rocksdb::ColumnFamilyHandle*>* cf_handles_ptr_ = nullptr;
};

using HashesMetaFilter = BaseMetaFilter;
using HashesMetaFilterFactory = BaseMetaFilterFactory;
using HashesDataFilter = BaseDataFilter;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis_streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ Status RedisStreams::Open(const StorageOptions& storage_options, const std::stri
rocksdb::ColumnFamilyOptions meta_cf_ops(storage_options.options);
rocksdb::ColumnFamilyOptions data_cf_ops(storage_options.options);
// Notice: Stream's Meta dose not have timestamp and version, so it does not need to be filtered.
data_cf_ops.compaction_filter_factory = std::make_shared<StreamsDataFilterFactory>(&db_, &handles_);

// use the bloom filter policy to reduce disk reads
rocksdb::BlockBasedTableOptions table_ops(storage_options.table_options);
Expand Down
1 change: 1 addition & 0 deletions src/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ foreach(blackwindow_test_source ${BLACKWINDOW_TEST_SOURCE})
target_include_directories(${blackwindow_test_name}
PUBLIC ${PROJECT_SOURCE_DIR}/include
PUBLIC ${PROJECT_SOURCE_DIR}/..
PUBLIC ${PROJECT_SOURCE_DIR}/src
${ROCKSDB_INCLUDE_DIR}
${ROCKSDB_SOURCE_DIR}
)
Expand Down
83 changes: 83 additions & 0 deletions src/storage/tests/streams_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <dirent.h>
#include <gtest/gtest.h>
#include <unistd.h>
#include <iostream>
#include <iterator>
#include <thread>

#include "storage/storage.h"
#include "storage/util.h"
#include "src/redis_streams.h"

using namespace storage;

class StreamsTest : public ::testing::Test {
public:
StreamsTest() = default;
~StreamsTest() override = default;

void SetUp() override {
std::string path = "./db/streams";
if (access("./db", F_OK) != 0) {
mkdir("./db", 0755);
}
if (access(path.c_str(), F_OK) != 0) {
mkdir(path.c_str(), 0755);
}
db = new RedisStreams(nullptr, storage::kStreams);
storage_options.options.create_if_missing = true;
db->Open(storage_options, path);
}

void TearDown() override {
std::string path = "./db/streams";
DeleteFiles(path.c_str());
}

static void SetUpTestSuite() {}
static void TearDownTestSuite() {}

StorageOptions storage_options;
RedisStreams* db;
};

TEST_F(StreamsTest, DataFilter) {
int32_t ret = 0;
rocksdb::ReadOptions r_opts;
rocksdb::FlushOptions f_opts;
rocksdb::CompactRangeOptions c_opts;
storage::StreamAddTrimArgs args;

rocksdb::DB* rocks_db = db->GetDB();
auto handles = db->GetHandles();

auto s = db->XAdd("STREAM_KEY_0", "STREAM_MESSAGE_1", args);
ASSERT_TRUE(s.ok());

rocks_db->Flush(f_opts, handles);
auto iter = rocks_db->NewIterator(r_opts, handles[1]);
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
delete iter;

s = db->Del("STREAM_KEY_0");
ASSERT_TRUE(s.ok());

rocks_db->Flush(f_opts, handles);
rocks_db->CompactRange(c_opts, handles[1], nullptr, nullptr);
iter = rocks_db->NewIterator(r_opts, handles[1]);
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
delete iter;
}

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 7505072

Please sign in to comment.