diff --git a/CMakeLists.txt b/CMakeLists.txt index 132d3b04e96..78f3b3d73cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,17 @@ endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/modules/") +include(FetchContent) +set(FETCHCONTENT_QUIET false) +FetchContent_Declare( + photon + GIT_REPOSITORY https://github.com/alibaba/PhotonLibOS.git + GIT_TAG v0.6.3 +) +set(ENABLE_URING ON CACHE INTERNAL "Enable iouring") +FetchContent_MakeAvailable(photon) +set(PHOTON_INCLUDE_DIR ${photon_SOURCE_DIR}/include/) + option(WITH_JEMALLOC "build with JeMalloc" OFF) option(WITH_SNAPPY "build with SNAPPY" OFF) option(WITH_LZ4 "build with lz4" OFF) @@ -178,7 +189,7 @@ else() if(MINGW) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format") endif() - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer") include(CheckCXXCompilerFlag) @@ -459,6 +470,7 @@ endif() include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_SOURCE_DIR}/include) include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src) +include_directories(${PHOTON_INCLUDE_DIR}) find_package(Threads REQUIRED) add_subdirectory(third-party/gtest-1.7.0/fused-src/gtest) @@ -742,18 +754,18 @@ else() add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES}) target_link_libraries(${ROCKSDB_SHARED_LIB} - ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) + ${THIRDPARTY_LIBS} ${SYSTEM_LIBS} -Wl,--whole-archive $ -Wl,--no-whole-archive) set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES LINKER_LANGUAGE CXX VERSION ${ROCKSDB_VERSION} SOVERSION ${ROCKSDB_VERSION_MAJOR} - CXX_STANDARD 11 + CXX_STANDARD 14 OUTPUT_NAME "rocksdb") endif() add_library(${ROCKSDB_STATIC_LIB} STATIC ${SOURCES}) target_link_libraries(${ROCKSDB_STATIC_LIB} - ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) + ${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $) if(WIN32) add_library(${ROCKSDB_IMPORT_LIB} SHARED ${SOURCES}) diff --git a/env/env_posix.cc b/env/env_posix.cc index 387c0279397..09cb528163f 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -12,7 +12,6 @@ #if defined(OS_LINUX) #include #endif -#include #include #include #include @@ -120,8 +119,9 @@ class PosixEnv : public Env { PosixEnv(); ~PosixEnv() override { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + LOG_INFO("global PosixEnv destruct: Join thread pools"); + for (auto& tid : threads_to_join_) { + tid.join(); } for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].JoinAllThreads(); @@ -760,18 +760,11 @@ class PosixEnv : public Env { return thread_status_updater_->GetThreadList(thread_list); } - static uint64_t gettid(pthread_t tid) { - uint64_t thread_id = 0; - memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); - return thread_id; - } - static uint64_t gettid() { - pthread_t tid = pthread_self(); - return gettid(tid); + return (uint64_t) photon::CURRENT; } - uint64_t GetThreadID() const override { return gettid(pthread_self()); } + uint64_t GetThreadID() const override { return gettid(); } Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override { struct statvfs sbuf; @@ -847,7 +840,7 @@ class PosixEnv : public Env { return 0; } - void SleepForMicroseconds(int micros) override { usleep(micros); } + void SleepForMicroseconds(int micros) override { std::this_thread::sleep_for(std::chrono::microseconds(micros)); } Status GetHostName(char* name, uint64_t len) override { int ret = gethostname(name, static_cast(len)); @@ -1008,8 +1001,8 @@ class PosixEnv : public Env { size_t page_size_; std::vector thread_pools_; - pthread_mutex_t mu_; - std::vector threads_to_join_; + std::mutex mu_; + std::vector threads_to_join_; // If true, allow non owner read access for db files. Otherwise, non-owner // has no access to db files. bool allow_non_owner_access_; @@ -1021,7 +1014,6 @@ PosixEnv::PosixEnv() page_size_(getpagesize()), thread_pools_(Priority::TOTAL), allow_non_owner_access_(true) { - ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); @@ -1059,20 +1051,16 @@ static void* StartThreadWrapper(void* arg) { } void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { - pthread_t t; StartThreadState* state = new StartThreadState; state->user_function = function; state->arg = arg; - ThreadPoolImpl::PthreadCall( - "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); - ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); - threads_to_join_.push_back(t); - ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + std::lock_guard lock(mu_); + threads_to_join_.emplace_back(std::thread(&StartThreadWrapper, state)); } void PosixEnv::WaitForJoin() { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + for (auto& tid : threads_to_join_) { + tid.join(); } threads_to_join_.clear(); } @@ -1104,6 +1092,24 @@ std::string Env::GenerateUniqueId() { return uuid2; } +PhotonEnv::PhotonEnv() { + int ret = photon::init(photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE); + if (ret != 0) { + LOG_FATAL("photon init failed"); + } + // Max 8 vcpu. Hardcoded for now. + ret = photon::std::work_pool_init(8, photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE); + if (ret != 0) { + LOG_FATAL("work pool init failed"); + } +} + +PhotonEnv::~PhotonEnv() { + photon::std::work_pool_fini(); + photon::fini(); + LOG_INFO("PhotonEnv finished"); +} + // // Default Posix Env // @@ -1118,6 +1124,7 @@ Env* Env::Default() { // of their construction, having this call here guarantees that // the destructor of static PosixEnv will go first, then the // the singletons of ThreadLocalPtr. + PhotonEnv::Singleton(); ThreadLocalPtr::InitSingletons(); CompressionContextCache::InitSingleton(); INIT_SYNC_POINT_SINGLETONS(); diff --git a/env/io_posix.cc b/env/io_posix.cc index 628ed841300..29dfc2989e7 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -201,11 +201,8 @@ Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n, size_t left = n; char* ptr = scratch; while (left > 0) { - r = pread(fd_, ptr, left, static_cast(offset)); + r = photon::iouring_pread(fd_, ptr, left, offset, 0, -1); if (r <= 0) { - if (r == -1 && errno == EINTR) { - continue; - } break; } ptr += r; @@ -335,11 +332,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, size_t left = n; char* ptr = scratch; while (left > 0) { - r = pread(fd_, ptr, left, static_cast(offset)); + r = photon::iouring_pread(fd_, ptr, left, offset, 0, -1); if (r <= 0) { - if (r == -1 && errno == EINTR) { - continue; - } break; } ptr += r; @@ -761,6 +755,7 @@ Status PosixWritableFile::Append(const Slice& data) { size_t left = data.size(); while (left != 0) { ssize_t done = write(fd_, src, left); + std::this_thread::yield(); if (done < 0) { if (errno == EINTR) { continue; @@ -784,11 +779,8 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { const char* src = data.data(); size_t left = data.size(); while (left != 0) { - ssize_t done = pwrite(fd_, src, left, static_cast(offset)); + ssize_t done = photon::iouring_pwrite(fd_, src, left, offset, -1);; if (done < 0) { - if (errno == EINTR) { - continue; - } return IOError("While pwrite to file at offset " + ToString(offset), filename_, errno); } @@ -870,14 +862,14 @@ Status PosixWritableFile::Close() { Status PosixWritableFile::Flush() { return Status::OK(); } Status PosixWritableFile::Sync() { - if (fdatasync(fd_) < 0) { + if (photon::iouring_fdatasync(fd_) < 0) { return IOError("While fdatasync", filename_, errno); } return Status::OK(); } Status PosixWritableFile::Fsync() { - if (fsync(fd_) < 0) { + if (photon::iouring_fsync(fd_) < 0) { return IOError("While fsync", filename_, errno); } return Status::OK(); @@ -984,13 +976,9 @@ Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) { const char* src = data.data(); size_t left = data.size(); while (left != 0) { - ssize_t done = pwrite(fd_, src, left, offset); + ssize_t done = photon::iouring_pwrite(fd_, src, left, offset, -1); if (done < 0) { // error while writing to file - if (errno == EINTR) { - // write was interrupted, try again. - continue; - } return IOError( "While write random read/write file at offset " + ToString(offset), filename_, errno); @@ -1010,13 +998,9 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, size_t left = n; char* ptr = scratch; while (left > 0) { - ssize_t done = pread(fd_, ptr, left, offset); + ssize_t done = photon::iouring_pread(fd_, ptr, left, offset, 0, -1); if (done < 0) { // error while reading from file - if (errno == EINTR) { - // read was interrupted, try again. - continue; - } return IOError("While reading random read/write file offset " + ToString(offset) + " len " + ToString(n), filename_, errno); @@ -1038,14 +1022,14 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, Status PosixRandomRWFile::Flush() { return Status::OK(); } Status PosixRandomRWFile::Sync() { - if (fdatasync(fd_) < 0) { + if (photon::iouring_fdatasync(fd_) < 0) { return IOError("While fdatasync random read/write file", filename_, errno); } return Status::OK(); } Status PosixRandomRWFile::Fsync() { - if (fsync(fd_) < 0) { + if (photon::iouring_fsync(fd_) < 0) { return IOError("While fsync random read/write file", filename_, errno); } return Status::OK(); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 4d3a96fe288..e79dcecbd9c 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -1454,4 +1454,21 @@ Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname); // This is a factory method for TimedEnv defined in utilities/env_timed.cc. Env* NewTimedEnv(Env* base_env); +class PhotonEnv { + public: + static PhotonEnv& Singleton() { + static PhotonEnv instance; + return instance; + } + + PhotonEnv(PhotonEnv const&) = delete; + PhotonEnv(PhotonEnv&&) = delete; + PhotonEnv& operator=(PhotonEnv const&) = delete; + PhotonEnv& operator=(PhotonEnv&&) = delete; + + private: + PhotonEnv(); + ~PhotonEnv(); +}; + } // namespace rocksdb diff --git a/monitoring/iostats_context.cc b/monitoring/iostats_context.cc index 3d102f91203..5534853d64f 100644 --- a/monitoring/iostats_context.cc +++ b/monitoring/iostats_context.cc @@ -10,12 +10,12 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread IOStatsContext iostats_context; +photon::thread_local_ptr iostats_context; #endif IOStatsContext* get_iostats_context() { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - return &iostats_context; + return iostats_context.operator->(); #else return nullptr; #endif diff --git a/monitoring/iostats_context_imp.h b/monitoring/iostats_context_imp.h index 23c2088cab2..80a84d98502 100644 --- a/monitoring/iostats_context_imp.h +++ b/monitoring/iostats_context_imp.h @@ -9,38 +9,38 @@ #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL namespace rocksdb { -extern __thread IOStatsContext iostats_context; +extern photon::thread_local_ptr iostats_context; } // namespace rocksdb // increment a specific counter by the specified value -#define IOSTATS_ADD(metric, value) (iostats_context.metric += value) +#define IOSTATS_ADD(metric, value) (iostats_context->metric += value) // Increase metric value only when it is positive #define IOSTATS_ADD_IF_POSITIVE(metric, value) \ if (value > 0) { IOSTATS_ADD(metric, value); } // reset a specific counter to zero -#define IOSTATS_RESET(metric) (iostats_context.metric = 0) +#define IOSTATS_RESET(metric) (iostats_context->metric = 0) // reset all counters to zero -#define IOSTATS_RESET_ALL() (iostats_context.Reset()) +#define IOSTATS_RESET_ALL() (iostats_context->Reset()) #define IOSTATS_SET_THREAD_POOL_ID(value) \ - (iostats_context.thread_pool_id = value) + (iostats_context->thread_pool_id = value) -#define IOSTATS_THREAD_POOL_ID() (iostats_context.thread_pool_id) +#define IOSTATS_THREAD_POOL_ID() (iostats_context->thread_pool_id) -#define IOSTATS(metric) (iostats_context.metric) +#define IOSTATS(metric) (iostats_context->metric) // Declare and set start time of the timer #define IOSTATS_TIMER_GUARD(metric) \ - PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \ + PerfStepTimer iostats_step_timer_##metric(&(iostats_context->metric)); \ iostats_step_timer_##metric.Start(); // Declare and set start time of the timer #define IOSTATS_CPU_TIMER_GUARD(metric, env) \ PerfStepTimer iostats_step_timer_##metric( \ - &(iostats_context.metric), env, true, \ + &(iostats_context->metric), env, true, \ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ iostats_step_timer_##metric.Start(); diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index 40b0b215c47..bdc1fb06c06 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -15,7 +15,7 @@ PerfContext perf_context; #if defined(OS_SOLARIS) __thread PerfContext perf_context_; #else -thread_local PerfContext perf_context; +photon::thread_local_ptr perf_context; #endif #endif @@ -26,7 +26,7 @@ PerfContext* get_perf_context() { #if defined(OS_SOLARIS) return &perf_context_; #else - return &perf_context; + return perf_context.operator->(); #endif #endif } diff --git a/monitoring/perf_context_imp.h b/monitoring/perf_context_imp.h index e0ff8afc58e..a0d4b89c948 100644 --- a/monitoring/perf_context_imp.h +++ b/monitoring/perf_context_imp.h @@ -16,7 +16,7 @@ extern PerfContext perf_context; extern __thread PerfContext perf_context_; #define perf_context (*get_perf_context()) #else -extern thread_local PerfContext perf_context; +extern photon::thread_local_ptr perf_context; #endif #endif @@ -38,24 +38,24 @@ extern thread_local PerfContext perf_context; // Declare and set start time of the timer #define PERF_TIMER_GUARD(metric) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric)); \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric)); \ perf_step_timer_##metric.Start(); // Declare and set start time of the timer #define PERF_TIMER_GUARD_WITH_ENV(metric, env) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), env); \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric), env); \ perf_step_timer_##metric.Start(); // Declare and set start time of the timer #define PERF_CPU_TIMER_GUARD(metric, env) \ PerfStepTimer perf_step_timer_##metric( \ - &(perf_context.metric), env, true, \ + &(perf_context->metric), env, true, \ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ perf_step_timer_##metric.Start(); #define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \ ticker_type) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), nullptr, \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric), nullptr, \ false, PerfLevel::kEnableTime, stats, \ ticker_type); \ if (condition) { \ @@ -68,23 +68,23 @@ extern thread_local PerfContext perf_context; // Increase metric value #define PERF_COUNTER_ADD(metric, value) \ - if (perf_level >= PerfLevel::kEnableCount) { \ - perf_context.metric += value; \ + if (*perf_level >= PerfLevel::kEnableCount) { \ + perf_context->metric += value; \ } // Increase metric value #define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level) \ - if (perf_level >= PerfLevel::kEnableCount && \ - perf_context.per_level_perf_context_enabled && \ - perf_context.level_to_perf_context) { \ - if ((*(perf_context.level_to_perf_context)).find(level) != \ - (*(perf_context.level_to_perf_context)).end()) { \ - (*(perf_context.level_to_perf_context))[level].metric += value; \ + if (*perf_level >= PerfLevel::kEnableCount && \ + perf_context->per_level_perf_context_enabled && \ + perf_context->level_to_perf_context) { \ + if ((*(perf_context->level_to_perf_context)).find(level) != \ + (*(perf_context->level_to_perf_context)).end()) { \ + (*(perf_context->level_to_perf_context))[level].metric += value; \ } \ else { \ PerfContextByLevel empty_context; \ - (*(perf_context.level_to_perf_context))[level] = empty_context; \ - (*(perf_context.level_to_perf_context))[level].metric += value; \ + (*(perf_context->level_to_perf_context))[level] = empty_context; \ + (*(perf_context->level_to_perf_context))[level].metric += value; \ } \ } \ diff --git a/monitoring/perf_level.cc b/monitoring/perf_level.cc index 79c718cce76..527f6d43490 100644 --- a/monitoring/perf_level.cc +++ b/monitoring/perf_level.cc @@ -10,7 +10,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread PerfLevel perf_level = kEnableCount; +photon::thread_local_ptr perf_level(kEnableCount); #else PerfLevel perf_level = kEnableCount; #endif @@ -18,11 +18,11 @@ PerfLevel perf_level = kEnableCount; void SetPerfLevel(PerfLevel level) { assert(level > kUninitialized); assert(level < kOutOfBounds); - perf_level = level; + *perf_level = level; } PerfLevel GetPerfLevel() { - return perf_level; + return *perf_level; } } // namespace rocksdb diff --git a/monitoring/perf_level_imp.h b/monitoring/perf_level_imp.h index 2a3add19cee..6e56c79e1cf 100644 --- a/monitoring/perf_level_imp.h +++ b/monitoring/perf_level_imp.h @@ -10,7 +10,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -extern __thread PerfLevel perf_level; +extern photon::thread_local_ptr perf_level; #else extern PerfLevel perf_level; #endif diff --git a/monitoring/perf_step_timer.h b/monitoring/perf_step_timer.h index 6501bd54aba..35a416e30cc 100644 --- a/monitoring/perf_step_timer.h +++ b/monitoring/perf_step_timer.h @@ -16,7 +16,7 @@ class PerfStepTimer { uint64_t* metric, Env* env = nullptr, bool use_cpu_time = false, PerfLevel enable_level = PerfLevel::kEnableTimeExceptForMutex, Statistics* statistics = nullptr, uint32_t ticker_type = 0) - : perf_counter_enabled_(perf_level >= enable_level), + : perf_counter_enabled_((*perf_level) >= enable_level), use_cpu_time_(use_cpu_time), env_((perf_counter_enabled_ || statistics != nullptr) ? ((env != nullptr) ? env : Env::Default()) diff --git a/monitoring/thread_status_updater.cc b/monitoring/thread_status_updater.cc index cde44928b62..408d509a6d3 100644 --- a/monitoring/thread_status_updater.cc +++ b/monitoring/thread_status_updater.cc @@ -13,7 +13,8 @@ namespace rocksdb { #ifdef ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +photon::thread_local_ptr ThreadStatusUpdater::thread_status_data_ptr_(nullptr); +#define thread_status_data_ (*thread_status_data_ptr_) void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id) { diff --git a/monitoring/thread_status_updater.h b/monitoring/thread_status_updater.h index 6706d159dfb..a9fec89521c 100644 --- a/monitoring/thread_status_updater.h +++ b/monitoring/thread_status_updater.h @@ -196,7 +196,7 @@ class ThreadStatusUpdater { protected: #ifdef ROCKSDB_USING_THREAD_STATUS // The thread-local variable for storing thread status. - static __thread ThreadStatusData* thread_status_data_; + static photon::thread_local_ptr thread_status_data_ptr_; // Returns the pointer to the thread status data only when the // thread status data is non-null and has enable_tracking == true. @@ -205,7 +205,7 @@ class ThreadStatusUpdater { // Directly returns the pointer to thread_status_data_ without // checking whether enabling_tracking is true of not. ThreadStatusData* Get() { - return thread_status_data_; + return *thread_status_data_ptr_; } // The mutex that protects cf_info_map and db_key_map. diff --git a/monitoring/thread_status_util.cc b/monitoring/thread_status_util.cc index c2af0a57454..9782b15048a 100644 --- a/monitoring/thread_status_util.cc +++ b/monitoring/thread_status_util.cc @@ -11,9 +11,10 @@ namespace rocksdb { #ifdef ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = - nullptr; -__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; +photon::thread_local_ptr ThreadStatusUtil::thread_updater_local_cache_ptr_(nullptr); +photon::thread_local_ptr ThreadStatusUtil::thread_updater_initialized_ptr_(false); +#define thread_updater_initialized_ (*thread_updater_initialized_ptr_) +#define thread_updater_local_cache_ (*thread_updater_local_cache_ptr_) void ThreadStatusUtil::RegisterThread(const Env* env, ThreadStatus::ThreadType thread_type) { diff --git a/monitoring/thread_status_util.h b/monitoring/thread_status_util.h index a403435c3d0..c455b0a1fb9 100644 --- a/monitoring/thread_status_util.h +++ b/monitoring/thread_status_util.h @@ -94,7 +94,7 @@ class ThreadStatusUtil { // When this variable is set to true, thread_updater_local_cache_ // will not be updated until this variable is again set to false // in UnregisterThread(). - static __thread bool thread_updater_initialized_; + static photon::thread_local_ptr thread_updater_initialized_ptr_; // The thread-local cached ThreadStatusUpdater that caches the // thread_status_updater_ of the first Env that uses any ThreadStatusUtil @@ -109,7 +109,8 @@ class ThreadStatusUtil { // When thread_updater_initialized_ is set to true, this variable // will not be updated until this thread_updater_initialized_ is // again set to false in UnregisterThread(). - static __thread ThreadStatusUpdater* thread_updater_local_cache_; + static photon::thread_local_ptr thread_updater_local_cache_ptr_; + #else static bool thread_updater_initialized_; static ThreadStatusUpdater* thread_updater_local_cache_; diff --git a/photon-auto-convert.sh b/photon-auto-convert.sh new file mode 100755 index 00000000000..fa40d2b3007 --- /dev/null +++ b/photon-auto-convert.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -e + +cc_files=$(find . -type f -name "*.cc" -not -path "./build/*") +h_files=$(find . -type f -name "*.h" -not -path "./build/*") +files="${cc_files} ${h_files}" + +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's/std::mutex/photon::std::mutex/g' $files +sed -i 's/std::condition_variable/photon::std::condition_variable/g' $files +sed -i 's/std::lock_guard/photon::std::lock_guard/g' $files +sed -i 's/std::unique_lock/photon::std::unique_lock/g' $files +sed -i 's/std::thread/photon::std::thread/g' $files +sed -i 's/std::this_thread/photon::std::this_thread/g' $files diff --git a/port/port_posix.cc b/port/port_posix.cc index 80081e480e0..1cbc41e0935 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -51,29 +51,12 @@ static int PthreadCall(const char* label, int result) { } Mutex::Mutex(bool adaptive) { - (void) adaptive; -#ifdef ROCKSDB_PTHREAD_ADAPTIVE_MUTEX - if (!adaptive) { - PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); - } else { - pthread_mutexattr_t mutex_attr; - PthreadCall("init mutex attr", pthread_mutexattr_init(&mutex_attr)); - PthreadCall("set mutex attr", - pthread_mutexattr_settype(&mutex_attr, - PTHREAD_MUTEX_ADAPTIVE_NP)); - PthreadCall("init mutex", pthread_mutex_init(&mu_, &mutex_attr)); - PthreadCall("destroy mutex attr", - pthread_mutexattr_destroy(&mutex_attr)); - } -#else - PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); -#endif // ROCKSDB_PTHREAD_ADAPTIVE_MUTEX } -Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } +Mutex::~Mutex() { } void Mutex::Lock() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + mu_.lock(); #ifndef NDEBUG locked_ = true; #endif @@ -83,7 +66,7 @@ void Mutex::Unlock() { #ifndef NDEBUG locked_ = false; #endif - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + mu_.unlock(); } void Mutex::AssertHeld() { @@ -94,63 +77,57 @@ void Mutex::AssertHeld() { CondVar::CondVar(Mutex* mu) : mu_(mu) { - PthreadCall("init cv", pthread_cond_init(&cv_, nullptr)); } -CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } +CondVar::~CondVar() {} void CondVar::Wait() { #ifndef NDEBUG mu_->locked_ = false; #endif - PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); + cv_.wait(mu_->mu_); #ifndef NDEBUG mu_->locked_ = true; #endif } bool CondVar::TimedWait(uint64_t abs_time_us) { - struct timespec ts; - ts.tv_sec = static_cast(abs_time_us / 1000000); - ts.tv_nsec = static_cast((abs_time_us % 1000000) * 1000); - #ifndef NDEBUG mu_->locked_ = false; #endif - int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts); + auto abs_now_us = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + uint64_t timeout = abs_time_us > abs_now_us ? abs_time_us - abs_now_us : 0; + int ret = cv_.wait(mu_->mu_, timeout); #ifndef NDEBUG mu_->locked_ = true; #endif - if (err == ETIMEDOUT) { + if (ret != 0 || timeout == 0) { return true; } - if (err != 0) { - PthreadCall("timedwait", err); - } return false; } void CondVar::Signal() { - PthreadCall("signal", pthread_cond_signal(&cv_)); + cv_.notify_one(); } void CondVar::SignalAll() { - PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); + cv_.notify_all(); } RWMutex::RWMutex() { - PthreadCall("init mutex", pthread_rwlock_init(&mu_, nullptr)); } -RWMutex::~RWMutex() { PthreadCall("destroy mutex", pthread_rwlock_destroy(&mu_)); } +RWMutex::~RWMutex() { } -void RWMutex::ReadLock() { PthreadCall("read lock", pthread_rwlock_rdlock(&mu_)); } +void RWMutex::ReadLock() { mu_.lock(photon::RLOCK); } -void RWMutex::WriteLock() { PthreadCall("write lock", pthread_rwlock_wrlock(&mu_)); } +void RWMutex::WriteLock() { mu_.lock(photon::WLOCK); } -void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&mu_)); } +void RWMutex::ReadUnlock() { mu_.unlock(); } -void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); } +void RWMutex::WriteUnlock() { mu_.unlock(); } int PhysicalCoreID() { #if defined(ROCKSDB_SCHED_GETCPU_PRESENT) && defined(__x86_64__) && \ @@ -175,10 +152,6 @@ int PhysicalCoreID() { #endif } -void InitOnce(OnceType* once, void (*initializer)()) { - PthreadCall("once", pthread_once(once, initializer)); -} - void Crash(const std::string& srcfile, int srcline) { fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline); fflush(stdout); diff --git a/port/port_posix.h b/port/port_posix.h index 63d7239fe6d..9bbf2053f19 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -11,7 +11,11 @@ #pragma once -#include +#include +#include +#include +#include +#include // size_t printf formatting named in the manner of C99 standard formatting // strings such as PRIu64 // in fact, we could use that one @@ -49,7 +53,6 @@ #else #include #endif -#include #include #include @@ -114,7 +117,7 @@ class Mutex { private: friend class CondVar; - pthread_mutex_t mu_; + photon::mutex mu_; #ifndef NDEBUG bool locked_; #endif @@ -136,7 +139,7 @@ class RWMutex { void AssertHeld() { } private: - pthread_rwlock_t mu_; // the underlying platform mutex + photon::rwlock mu_; // the underlying platform mutex // No copying allowed RWMutex(const RWMutex&); @@ -153,7 +156,7 @@ class CondVar { void Signal(); void SignalAll(); private: - pthread_cond_t cv_; + photon::condition_variable cv_; Mutex* mu_; }; @@ -173,10 +176,6 @@ static inline void AsmVolatilePause() { // Returns -1 if not available on this platform extern int PhysicalCoreID(); -typedef pthread_once_t OnceType; -#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT -extern void InitOnce(OnceType* once, void (*initializer)()); - #ifndef CACHE_LINE_SIZE #if defined(__s390__) #define CACHE_LINE_SIZE 256U diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0cb4e0eb27e..ddccadf2904 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2947,7 +2947,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { } SetPerfLevel(static_cast (shared->perf_level)); - perf_context.EnablePerLevelPerfContext(); + perf_context->EnablePerLevelPerfContext(); thread->stats.Start(thread->tid); (arg->bm->*(arg->method))(thread); thread->stats.Stop(); diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc index cef77d7e75f..3b53e9fc9e3 100644 --- a/util/concurrent_arena.cc +++ b/util/concurrent_arena.cc @@ -15,7 +15,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread size_t ConcurrentArena::tls_cpuid = 0; +photon::thread_local_ptr ConcurrentArena::tls_cpuid(0); #endif namespace { @@ -39,7 +39,7 @@ ConcurrentArena::Shard* ConcurrentArena::Repick() { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we // have repicked - tls_cpuid = shard_and_index.second | shards_.Size(); + *tls_cpuid = shard_and_index.second | shards_.Size(); #endif return shard_and_index.first; } diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h index a6191100fd0..6605f67f839 100644 --- a/util/concurrent_arena.h +++ b/util/concurrent_arena.h @@ -95,7 +95,7 @@ class ConcurrentArena : public Allocator { }; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - static __thread size_t tls_cpuid; + static photon::thread_local_ptr tls_cpuid; #else enum ZeroFirstEnum : size_t { tls_cpuid = 0 }; #endif @@ -135,7 +135,7 @@ class ConcurrentArena : public Allocator { // concurrency zero unless it might actually confer an advantage. std::unique_lock arena_lock(arena_mutex_, std::defer_lock); if (bytes > shard_block_size_ / 4 || force_arena || - ((cpu = tls_cpuid) == 0 && + ((cpu = *tls_cpuid) == 0 && !shards_.AccessAtCore(0)->allocated_and_unused_.load( std::memory_order_relaxed) && arena_lock.try_lock())) { diff --git a/util/mutexlock.h b/util/mutexlock.h index 640cef3daf7..c4d4d693832 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -97,9 +97,9 @@ class WriteLock { // SpinMutex has very low overhead for low-contention cases. Method names // are chosen so you can use std::unique_lock or std::lock_guard with it. // -class SpinMutex { +class SpinMutexObsolete { public: - SpinMutex() : locked_(false) {} + SpinMutexObsolete() : locked_(false) {} bool try_lock() { auto currently_locked = locked_.load(std::memory_order_relaxed); @@ -128,4 +128,6 @@ class SpinMutex { std::atomic locked_; }; +using SpinMutex = std::mutex; + } // namespace rocksdb diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 967cc49945e..7bd66fb51d5 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -97,16 +97,6 @@ class RepeatableThread { } void thread() { -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) -#if __GLIBC_PREREQ(2, 12) - // Set thread name. - auto thread_handle = thread_.native_handle(); - int ret __attribute__((__unused__)) = - pthread_setname_np(thread_handle, thread_name_.c_str()); - assert(ret == 0); -#endif -#endif - assert(delay_us_ > 0); if (!wait(initial_delay_us_)) { return; diff --git a/util/thread_local.cc b/util/thread_local.cc index 7346eff11e8..21325520bdd 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -142,17 +142,17 @@ class ThreadLocalPtr::StaticMeta { port::Mutex mutex_; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL // Thread local storage - static __thread ThreadData* tls_; + static photon::thread_local_ptr tls_; #endif // Used to make thread exit trigger possible if !defined(OS_MACOSX). // Otherwise, used to retrieve thread data. - pthread_key_t pthread_key_; + photon::thread_key_t pthread_key_; }; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr; +photon::thread_local_ptr ThreadLocalPtr::StaticMeta::tls_(nullptr); #endif // Windows doesn't support a per-thread destructor with its @@ -285,7 +285,7 @@ void ThreadLocalPtr::StaticMeta::OnThreadExit(void* ptr) { // scope here in case this OnThreadExit is called after the main thread // dies. auto* inst = tls->inst; - pthread_setspecific(inst->pthread_key_, nullptr); + photon::thread_setspecific(inst->pthread_key_, nullptr); MutexLock l(inst->MemberMutex()); inst->RemoveThreadData(tls); @@ -309,35 +309,11 @@ ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0), head_(this), pthread_key_(0) { - if (pthread_key_create(&pthread_key_, &OnThreadExit) != 0) { + if (photon::thread_key_create(&pthread_key_, &OnThreadExit) != 0) { abort(); } - // OnThreadExit is not getting called on the main thread. - // Call through the static destructor mechanism to avoid memory leak. - // - // Caveats: ~A() will be invoked _after_ ~StaticMeta for the global - // singleton (destructors are invoked in reverse order of constructor - // _completion_); the latter must not mutate internal members. This - // cleanup mechanism inherently relies on use-after-release of the - // StaticMeta, and is brittle with respect to compiler-specific handling - // of memory backing destructed statically-scoped objects. Perhaps - // registering with atexit(3) would be more robust. - // -// This is not required on Windows. -#if !defined(OS_WIN) - static struct A { - ~A() { -#ifndef ROCKSDB_SUPPORT_THREAD_LOCAL - ThreadData* tls_ = - static_cast(pthread_getspecific(Instance()->pthread_key_)); -#endif - if (tls_) { - OnThreadExit(tls_); - } - } - } a; -#endif // !defined(OS_WIN) + // Photon's thread key has already supported destruction on main thread head_.next = &head_; head_.prev = &head_; @@ -373,27 +349,27 @@ ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() { static_cast(pthread_getspecific(Instance()->pthread_key_)); #endif - if (UNLIKELY(tls_ == nullptr)) { + if (UNLIKELY(*tls_ == nullptr)) { auto* inst = Instance(); - tls_ = new ThreadData(inst); + *tls_ = new ThreadData(inst); { // Register it in the global chain, needs to be done before thread exit // handler registration MutexLock l(Mutex()); - inst->AddThreadData(tls_); + inst->AddThreadData(*tls_); } // Even it is not OS_MACOSX, need to register value for pthread_key_ so that // its exit handler will be triggered. - if (pthread_setspecific(inst->pthread_key_, tls_) != 0) { + if (photon::thread_setspecific(inst->pthread_key_, *tls_) != 0) { { MutexLock l(Mutex()); - inst->RemoveThreadData(tls_); + inst->RemoveThreadData(*tls_); } - delete tls_; + delete *tls_; abort(); } } - return tls_; + return *tls_; } void* ThreadLocalPtr::StaticMeta::Get(uint32_t id) const { diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index acac0063bcd..1d3d4740a0f 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -231,31 +231,10 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { #ifdef OS_LINUX if (decrease_cpu_priority) { - setpriority( - PRIO_PROCESS, - // Current thread. - 0, - // Lowest priority possible. - 19); low_cpu_priority = true; } if (decrease_io_priority) { -#define IOPRIO_CLASS_SHIFT (13) -#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) - // Put schedule into IOPRIO_CLASS_IDLE class (lowest) - // These system calls only have an effect when used in conjunction - // with an I/O scheduler that supports I/O priorities. As at - // kernel 2.6.17 the only such scheduler is the Completely - // Fair Queuing (CFQ) I/O scheduler. - // To change scheduler: - // echo cfq > /sys/block//queue/schedule - // Tunables to consider: - // /sys/block//queue/slice_idle - // /sys/block//queue/slice_sync - syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS - 0, // current thread - IOPRIO_PRIO_VALUE(3, 0)); low_io_priority = true; } #else @@ -337,20 +316,6 @@ void ThreadPoolImpl::Impl::StartBGThreads() { port::Thread p_t(&BGThreadWrapper, new BGThreadMetadata(this, bgthreads_.size())); -// Set the thread name to aid debugging -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) -#if __GLIBC_PREREQ(2, 12) - auto th_handle = p_t.native_handle(); - std::string thread_priority = Env::PriorityToString(GetThreadPriority()); - std::ostringstream thread_name_stream; - thread_name_stream << "rocksdb:"; - for (char c : thread_priority) { - thread_name_stream << static_cast(tolower(c)); - } - thread_name_stream << bgthreads_.size(); - pthread_setname_np(th_handle, thread_name_stream.str().c_str()); -#endif -#endif bgthreads_.push_back(std::move(p_t)); } }