Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

200 lines of code to rewrite rocksdb into a coroutine program (production ready, with CI tests, db_bench tests, and RPC integration) #5

Open
wants to merge 29 commits into
base: 6.1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[submodule "third-party/PhotonLibOS"]
path = third-party/PhotonLibOS
url = https://github.com/alibaba/PhotonLibOS.git
branch = release/0.8
25 changes: 19 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ endif()

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/modules/")

# Photon
set(PHOTON_ENABLE_URING OFF CACHE INTERNAL "Enable iouring")
add_subdirectory(third-party/PhotonLibOS)
if (PHOTON_ENABLE_URING)
add_compile_definitions("PHOTON_ENABLE_URING")
endif ()
option(INIT_PHOTON_IN_ENV "INIT PHOTON IN ROCKSDB" OFF)
if(INIT_PHOTON_IN_ENV)
add_compile_definitions("INIT_PHOTON_IN_ENV")
endif()

option(WITH_JEMALLOC "build with JeMalloc" OFF)
option(WITH_SNAPPY "build with SNAPPY" OFF)
option(WITH_LZ4 "build with lz4" OFF)
Expand Down Expand Up @@ -174,11 +185,11 @@ if(MSVC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /FC /d2Zi+ /W4 /wd4127 /wd4800 /wd4996 /wd4351 /wd4100 /wd4204 /wd4324")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -W -Wextra -Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wsign-compare -Wshadow -Wno-unused-parameter -Wno-unused-variable -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers -Wno-strict-aliasing")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wsign-compare -Wno-unused-parameter -Wno-unused-variable -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers -Wno-strict-aliasing")
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
include(CheckCXXCompilerFlag)
Expand Down Expand Up @@ -458,7 +469,7 @@ endif()

include_directories(${PROJECT_SOURCE_DIR})
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src ${PROJECT_SOURCE_DIR}/third-party/PhotonLibOS/include)
find_package(Threads REQUIRED)

add_subdirectory(third-party/gtest-1.7.0/fused-src/gtest)
Expand Down Expand Up @@ -742,18 +753,18 @@ else()

add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
target_link_libraries(${ROCKSDB_SHARED_LIB}
${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $<BUILD_INTERFACE:photon_static>)
set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
LINKER_LANGUAGE CXX
VERSION ${ROCKSDB_VERSION}
SOVERSION ${ROCKSDB_VERSION_MAJOR}
CXX_STANDARD 11
CXX_STANDARD 14
OUTPUT_NAME "rocksdb")
endif()

add_library(${ROCKSDB_STATIC_LIB} STATIC ${SOURCES})
target_link_libraries(${ROCKSDB_STATIC_LIB}
${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $<BUILD_INTERFACE:photon_static>)

if(WIN32)
add_library(${ROCKSDB_IMPORT_LIB} SHARED ${SOURCES})
Expand Down Expand Up @@ -1067,3 +1078,5 @@ option(WITH_TOOLS "build with tools" ON)
if(WITH_TOOLS)
add_subdirectory(tools)
endif()

add_subdirectory(examples)
1 change: 1 addition & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
leader_count++;
while (ready_count < kNumThreads) {
// busy waiting
std::this_thread::yield();
}
}
});
Expand Down
1 change: 1 addition & 0 deletions db/dbformat_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ TEST_F(FormatTest, RangeTombstoneSerializeEndKey) {
} // namespace rocksdb

int main(int argc, char** argv) {
rocksdb::PhotonEnv::Singleton();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
16 changes: 12 additions & 4 deletions db/prefix_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ DEFINE_uint64(num_locks, 10001, "number of locks");
DEFINE_bool(random_prefix, false, "randomize prefix");
DEFINE_uint64(total_prefixes, 100000, "total number of prefixes");
DEFINE_uint64(items_per_prefix, 1, "total number of values per prefix");
DEFINE_int64(write_buffer_size, 33554432, "");
DEFINE_int32(max_write_buffer_number, 2, "");
DEFINE_int32(min_write_buffer_number_to_merge, 1, "");
// DEFINE_int64(write_buffer_size, 33554432, "");
DECLARE_int64(write_buffer_size);
// DEFINE_int32(max_write_buffer_number, 2, "");
DECLARE_int32(max_write_buffer_number);
// DEFINE_int32(min_write_buffer_number_to_merge, 1, "");
DECLARE_int32(min_write_buffer_number_to_merge);
DEFINE_int32(skiplist_height, 4, "");
DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, "");
DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, "");
DEFINE_int32(value_size, 40, "");
// DEFINE_int32(value_size, 40, "");
DECLARE_int32(value_size);
DEFINE_bool(enable_print, false, "Print options generated to console.");

// Path to the database on file system
Expand Down Expand Up @@ -876,6 +880,10 @@ TEST_F(PrefixTest, PrefixSeekModePrev3) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
ParseCommandLineFlags(&argc, &argv, true);
FLAGS_write_buffer_size = 33554432;
FLAGS_max_write_buffer_number = 2;
FLAGS_min_write_buffer_number_to_merge = 1;
FLAGS_value_size = 40;
return RUN_ALL_TESTS();
}

Expand Down
1 change: 1 addition & 0 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <set>
#include <string>
#include <vector>
#include <cstdint>

#include "db/compaction_iteration_stats.h"
#include "db/dbformat.h"
Expand Down
1 change: 1 addition & 0 deletions db/range_del_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ TEST_F(RangeDelAggregatorTest,
} // namespace rocksdb

int main(int argc, char** argv) {
rocksdb::PhotonEnv::Singleton();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions db/range_tombstone_fragmenter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ TEST_F(RangeTombstoneFragmenterTest, SeekOutOfBounds) {
} // namespace rocksdb

int main(int argc, char** argv) {
rocksdb::PhotonEnv::Singleton();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
16 changes: 8 additions & 8 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ struct FileDescriptor {
smallest_seqno(_smallest_seqno),
largest_seqno(_largest_seqno) {}

FileDescriptor& operator=(const FileDescriptor& fd) {
table_reader = fd.table_reader;
packed_number_and_path_id = fd.packed_number_and_path_id;
file_size = fd.file_size;
smallest_seqno = fd.smallest_seqno;
largest_seqno = fd.largest_seqno;
return *this;
}
// FileDescriptor& operator=(const FileDescriptor& fd) {
// table_reader = fd.table_reader;
// packed_number_and_path_id = fd.packed_number_and_path_id;
// file_size = fd.file_size;
// smallest_seqno = fd.smallest_seqno;
// largest_seqno = fd.largest_seqno;
// return *this;
// }

uint64_t GetNumber() const {
return packed_number_and_path_id & kFileNumberMask;
Expand Down
3 changes: 3 additions & 0 deletions db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
// This allows us to confidently detect the first writer
// who increases threads_linked as the leader.
while (threads_linked.load() < cur_threads_joining) {
std::this_thread::yield();
}
});

Expand Down Expand Up @@ -258,11 +259,13 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {

// leaders gotta lead
while (i > 0 && threads_verified.load() < 1) {
std::this_thread::yield();
}

// loser has to lose
while (i == write_group.size() - 1 &&
threads_verified.load() < write_group.size() - 1) {
std::this_thread::yield();
}

auto& write_op = write_group.at(i);
Expand Down
65 changes: 40 additions & 25 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -120,8 +119,9 @@ class PosixEnv : public Env {
PosixEnv();

~PosixEnv() override {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
LOG_INFO("global PosixEnv destruct: Join thread pools");
for (auto& tid : threads_to_join_) {
tid.join();
}
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
Expand Down Expand Up @@ -255,7 +255,7 @@ class PosixEnv : public Env {
result->reset();
Status s;
int fd = -1;
int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC | O_APPEND);
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
// Note: we should avoid O_APPEND here due to ta the following bug:
Expand Down Expand Up @@ -760,18 +760,11 @@ class PosixEnv : public Env {
return thread_status_updater_->GetThreadList(thread_list);
}

static uint64_t gettid(pthread_t tid) {
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}

static uint64_t gettid() {
pthread_t tid = pthread_self();
return gettid(tid);
return (uint64_t) photon::CURRENT;
}

uint64_t GetThreadID() const override { return gettid(pthread_self()); }
uint64_t GetThreadID() const override { return gettid(); }

Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override {
struct statvfs sbuf;
Expand Down Expand Up @@ -847,7 +840,7 @@ class PosixEnv : public Env {
return 0;
}

void SleepForMicroseconds(int micros) override { usleep(micros); }
void SleepForMicroseconds(int micros) override { std::this_thread::sleep_for(std::chrono::microseconds(micros)); }

Status GetHostName(char* name, uint64_t len) override {
int ret = gethostname(name, static_cast<size_t>(len));
Expand Down Expand Up @@ -1008,8 +1001,8 @@ class PosixEnv : public Env {
size_t page_size_;

std::vector<ThreadPoolImpl> thread_pools_;
pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_;
std::mutex mu_;
std::vector<std::thread> threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool allow_non_owner_access_;
Expand All @@ -1021,7 +1014,7 @@ PosixEnv::PosixEnv()
page_size_(getpagesize()),
thread_pools_(Priority::TOTAL),
allow_non_owner_access_(true) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
LOG_INFO("global PosixEnv construct: Create thread pools");
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id));
Expand Down Expand Up @@ -1059,20 +1052,16 @@ static void* StartThreadWrapper(void* arg) {
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
std::lock_guard<std::mutex> lock(mu_);
threads_to_join_.emplace_back(std::thread(&StartThreadWrapper, state));
}

void PosixEnv::WaitForJoin() {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& tid : threads_to_join_) {
tid.join();
}
threads_to_join_.clear();
}
Expand Down Expand Up @@ -1104,6 +1093,29 @@ std::string Env::GenerateUniqueId() {
return uuid2;
}

PhotonEnv::PhotonEnv(int vcpu_num, int ev_engine) {
LOG_INFO("Begin init Photon Env");
set_log_output_level(ALOG_INFO);
int ret = photon::init(ev_engine, photon::INIT_IO_NONE);
if (ret != 0) {
LOG_FATAL("Photon init failed");
abort();
}
ret = photon_std::work_pool_init(vcpu_num, ev_engine, photon::INIT_IO_NONE);
if (ret != 0) {
LOG_FATAL("Work-pool init failed");
abort();
}
LOG_INFO("End init Photon Env");
}

PhotonEnv::~PhotonEnv() {
LOG_INFO("Begin destruct Photon Env");
photon_std::work_pool_fini();
photon::fini();
LOG_INFO("End destruct Photon Env");
}

//
// Default Posix Env
//
Expand All @@ -1118,6 +1130,9 @@ Env* Env::Default() {
// of their construction, having this call here guarantees that
// the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr.
#ifdef INIT_PHOTON_IN_ENV
PhotonEnv::Singleton();
#endif
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
Expand Down
Loading