From 2062275b95a01d4c32342407aae9fc4d5b8b4bac Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 16 Sep 2024 21:07:31 +0800 Subject: [PATCH] Don't write data to data_dir (#1865) ### What problem does this PR solve? Purpose: leader and follower can share the same data, catalog, persistence folder. Only WAL directory are different. ### Type of change - [x] Refactoring --------- Signed-off-by: Jin Hai Co-authored-by: Zhichang Yu --- scripts/Dockerfile_infinity | 2 +- src/CMakeLists.txt | 2 + src/common/stl.cppm | 1 + src/common/utility/random.cpp | 11 +- src/common/utility/utility.cpp | 10 ++ src/common/utility/utility.cppm | 1 + src/executor/operator/physical_show.cpp | 8 +- .../buffer/file_worker/file_worker.cpp | 121 +++++++------ .../invertedindex/column_index_iterator.cpp | 4 +- .../invertedindex/column_index_merger.cpp | 45 ++--- .../invertedindex/disk_segment_reader.cpp | 9 +- src/storage/invertedindex/memory_indexer.cpp | 107 ++++++------ src/storage/meta/entry/block_entry.cpp | 3 +- .../persistence/persistence_manager.cpp | 161 +++--------------- .../persistence/persistence_manager.cppm | 25 +-- .../storage/buffer/buffer_handle.cpp | 4 +- .../storage/invertedindex/posting_merger.cpp | 2 +- .../persistence/persistence_manager.cpp | 12 +- 18 files changed, 225 insertions(+), 303 deletions(-) diff --git a/scripts/Dockerfile_infinity b/scripts/Dockerfile_infinity index a67e44f50a..8653ad5116 100644 --- a/scripts/Dockerfile_infinity +++ b/scripts/Dockerfile_infinity @@ -3,6 +3,6 @@ FROM debian:stable-slim # https://docs.docker.com/reference/dockerfile/#copy # If is a directory, the entire contents of the directory are copied, including filesystem metadata. # The directory itself isn't copied, only its contents. -COPY cmake-build-release/src/infinity /usr/bin +COPY cmake-build-reldeb/src/infinity /usr/bin ENTRYPOINT ["/usr/bin/infinity"] diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e989599df1..0aa02176a2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -528,6 +528,8 @@ target_link_libraries(unit_test event.a ) +add_dependencies(unit_test oatpp) + target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/lib") target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/third_party/arrow/") target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/third_party/snappy/") diff --git a/src/common/stl.cppm b/src/common/stl.cppm index f97a4d2fe5..c3617bed5c 100644 --- a/src/common/stl.cppm +++ b/src/common/stl.cppm @@ -245,6 +245,7 @@ export namespace std { using std::filesystem::read_symlink; using std::filesystem::remove; using std::filesystem::remove_all; + using std::filesystem::rename; using std::filesystem::is_directory; using std::filesystem::is_regular_file; diff --git a/src/common/utility/random.cpp b/src/common/utility/random.cpp index fb8e028c2b..d80cc736d8 100644 --- a/src/common/utility/random.cpp +++ b/src/common/utility/random.cpp @@ -24,6 +24,7 @@ import third_party; import logger; import local_file_system; import default_values; +import infinity_context; namespace infinity { @@ -51,12 +52,20 @@ SharedPtr DetermineRandomString(const String &parent_dir, const String & initialized = true; srand(std::time(nullptr)); } + + bool use_persistence_manager = InfinityContext::instance().persistence_manager(); + bool created = false; do { rand = RandomString(DEFAULT_RANDOM_NAME_LEN); result = fmt::format("{}_{}", rand, name); temp = LocalFileSystem::ConcatenateFilePath(parent_dir, result); ++cnt; - } while (!fs.CreateDirectoryNoExp(temp)); + if(!use_persistence_manager) { + created = fs.CreateDirectoryNoExp(temp); + } else { + created = true; + } + } while (!created); LOG_DEBUG(fmt::format("Created directory {} in {} times", temp, cnt)); return MakeShared(std::move(result)); } diff --git a/src/common/utility/utility.cpp b/src/common/utility/utility.cpp index 618dd82966..5acab31265 100644 --- a/src/common/utility/utility.cpp +++ b/src/common/utility/utility.cpp @@ -114,4 +114,14 @@ bool ParseIPPort(const String &str, String &ip, i64 &port) { } } +String StringTransform(const String &source, const String &from, const String &to) { + String ret(source); + size_t start_pos = 0; + while ((start_pos = ret.find(from, start_pos)) != String::npos) { + ret.replace(start_pos, from.length(), to); + start_pos += to.length(); + } + return ret; +} + } // namespace infinity diff --git a/src/common/utility/utility.cppm b/src/common/utility/utility.cppm index c913977b57..484bb26656 100644 --- a/src/common/utility/utility.cppm +++ b/src/common/utility/utility.cppm @@ -39,5 +39,6 @@ IdentifierValidationStatus IdentifierValidation(const String &identifier); bool ParseIPPort(const String &str, String &ip, i64 &port); +String StringTransform(const String &source, const String &from, const String &to); } diff --git a/src/executor/operator/physical_show.cpp b/src/executor/operator/physical_show.cpp index e4345ae94f..6627f6840d 100644 --- a/src/executor/operator/physical_show.cpp +++ b/src/executor/operator/physical_show.cpp @@ -1084,7 +1084,13 @@ void PhysicalShow::ExecuteShowIndex(QueryContext *query_context, ShowOperatorSta ++column_id; { const String table_dir = fmt::format("{}/{}", InfinityContext::instance().config()->DataDir(), *table_index_info->index_entry_dir_); - const auto &index_size = Utility::FormatByteSize(LocalFileSystem::GetFolderSizeByPath(table_dir)); + u64 index_dir_size = 0; + if (InfinityContext::instance().persistence_manager() == nullptr) { + index_dir_size = LocalFileSystem::GetFolderSizeByPath(table_dir); + } else { + // TODO: calculate the sum of object parts which's has the prefix table_dir + } + const auto &index_size = Utility::FormatByteSize(index_dir_size); Value value = Value::MakeVarchar(index_size); ValueExpression value_expr(value); value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); diff --git a/src/storage/buffer/file_worker/file_worker.cpp b/src/storage/buffer/file_worker/file_worker.cpp index a0bd4f4985..f6c0fc98ac 100644 --- a/src/storage/buffer/file_worker/file_worker.cpp +++ b/src/storage/buffer/file_worker/file_worker.cpp @@ -17,7 +17,7 @@ module; module file_worker; import stl; - +import utility; import infinity_exception; import local_file_system; import third_party; @@ -38,47 +38,70 @@ bool FileWorker::WriteToFile(bool to_spill, const FileWorkerSaveCtx &ctx) { String error_message = "No data will be written."; UnrecoverableError(error_message); } - LocalFileSystem fs; - String write_dir = ChooseFileDir(to_spill); - if (!fs.Exists(write_dir)) { - fs.CreateDirectory(write_dir); - } - String write_path = fmt::format("{}/{}", write_dir, *file_name_); + LocalFileSystem fs; - u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG; - auto [file_handler, status] = fs.OpenFile(write_path, flags, FileLockType::kWriteLock); - if (!status.ok()) { - UnrecoverableError(status.message()); - } - file_handler_ = std::move(file_handler); + if(persistence_manager_ != nullptr && !to_spill) { + String write_dir = *file_dir_; + String write_path = Path(*data_dir_) / write_dir / *file_name_; + String tmp_write_path = Path(*temp_dir_) / StringTransform(write_path, "/", "_"); - if (to_spill) { - auto local_file_handle = static_cast(file_handler_.get()); - LOG_TRACE(fmt::format("Open spill file: {}, fd: {}", write_path, local_file_handle->fd_)); - } - bool prepare_success = false; + u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG; + auto [file_handler, status] = fs.OpenFile(tmp_write_path, flags, FileLockType::kWriteLock); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + file_handler_ = std::move(file_handler); + bool prepare_success = false; - DeferFn defer_fn([&]() { - fs.Close(*file_handler_); - file_handler_ = nullptr; - }); + DeferFn defer_fn([&]() { + fs.Close(*file_handler_); + file_handler_ = nullptr; + }); - bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx); - if (prepare_success) { - if (to_spill) { - LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success)); + bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx); + if (prepare_success) { + fs.SyncFile(*file_handler_); } - fs.SyncFile(*file_handler_); - } - bool use_object_cache = !to_spill && persistence_manager_ != nullptr; - if (use_object_cache) { fs.SyncFile(*file_handler_); - obj_addr_ = persistence_manager_->Persist(write_path); - fs.DeleteFile(write_path); + obj_addr_ = persistence_manager_->Persist(write_path, tmp_write_path); + fs.DeleteFile(tmp_write_path); + return all_save; + } else { + String write_dir = ChooseFileDir(to_spill); + if (!fs.Exists(write_dir)) { + fs.CreateDirectory(write_dir); + } + String write_path = fmt::format("{}/{}", write_dir, *file_name_); + + u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG; + auto [file_handler, status] = fs.OpenFile(write_path, flags, FileLockType::kWriteLock); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + file_handler_ = std::move(file_handler); + + if (to_spill) { + auto local_file_handle = static_cast(file_handler_.get()); + LOG_TRACE(fmt::format("Open spill file: {}, fd: {}", write_path, local_file_handle->fd_)); + } + bool prepare_success = false; + + DeferFn defer_fn([&]() { + fs.Close(*file_handler_); + file_handler_ = nullptr; + }); + + bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx); + if (prepare_success) { + if (to_spill) { + LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success)); + } + fs.SyncFile(*file_handler_); + } + return all_save; } - return all_save; } void FileWorker::ReadFromFile(bool from_spill) { @@ -87,12 +110,12 @@ void FileWorker::ReadFromFile(bool from_spill) { bool use_object_cache = !from_spill && persistence_manager_ != nullptr; read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_); if (use_object_cache) { - obj_addr_ = persistence_manager_->GetObjFromLocalPath(read_path); + obj_addr_ = persistence_manager_->GetObjCache(read_path); if (!obj_addr_.Valid()) { String error_message = fmt::format("Failed to find object for local path {}", read_path); UnrecoverableError(error_message); } - read_path = persistence_manager_->GetObjCache(read_path); + read_path = persistence_manager_->GetObjPath(obj_addr_.obj_key_); } SizeT file_size = 0; u8 flags = FileFlags::READ_FLAG; @@ -124,20 +147,20 @@ void FileWorker::MoveFile() { String src_path = fmt::format("{}/{}", ChooseFileDir(true), *file_name_); String dest_dir = ChooseFileDir(false); String dest_path = fmt::format("{}/{}", dest_dir, *file_name_); - if (!fs.Exists(src_path)) { - Status status = Status::FileNotFound(src_path); - RecoverableError(status); - } - if (!fs.Exists(dest_dir)) { - fs.CreateDirectory(dest_dir); - } - // if (fs.Exists(dest_path)) { - // UnrecoverableError(fmt::format("File {} was already been created before.", dest_path)); - // } - fs.Rename(src_path, dest_path); - if (persistence_manager_ != nullptr) { - obj_addr_ = persistence_manager_->Persist(dest_path); - fs.DeleteFile(dest_path); + if (persistence_manager_ == nullptr) { + if (!fs.Exists(src_path)) { + Status status = Status::FileNotFound(src_path); + RecoverableError(status); + } + if (!fs.Exists(dest_dir)) { + fs.CreateDirectory(dest_dir); + } + // if (fs.Exists(dest_path)) { + // UnrecoverableError(fmt::format("File {} was already been created before.", dest_path)); + // } + fs.Rename(src_path, dest_path); + } else { + obj_addr_ = persistence_manager_->Persist(dest_path, src_path); } } diff --git a/src/storage/invertedindex/column_index_iterator.cpp b/src/storage/invertedindex/column_index_iterator.cpp index eec94c2a57..a0b9dd54ff 100644 --- a/src/storage/invertedindex/column_index_iterator.cpp +++ b/src/storage/invertedindex/column_index_iterator.cpp @@ -33,8 +33,8 @@ ColumnIndexIterator::ColumnIndexIterator(const String &index_dir, const String & if (use_object_cache) { dict_file_path_ = dict_file; posting_file_path_ = posting_file; - dict_file = pm->GetObjCache(dict_file); - posting_file = pm->GetObjCache(posting_file); + dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_); + posting_file = pm->GetObjPath(pm->GetObjCache(posting_file).obj_key_); } dict_reader_ = MakeShared(dict_file, PostingFormatOption(flag)); diff --git a/src/storage/invertedindex/column_index_merger.cpp b/src/storage/invertedindex/column_index_merger.cpp index 95e7029054..8a615b4e87 100644 --- a/src/storage/invertedindex/column_index_merger.cpp +++ b/src/storage/invertedindex/column_index_merger.cpp @@ -30,6 +30,7 @@ import logger; import persistence_manager; import infinity_context; import defer_op; +import utility; namespace infinity { ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag) : index_dir_(index_dir), flag_(flag) {} @@ -50,31 +51,26 @@ void ColumnIndexMerger::Merge(const Vector &base_names, const VectorObjCreateRefCount(dict_file); - pm->ObjCreateRefCount(posting_file); - pm->ObjCreateRefCount(column_length_file); + Path temp_dir = Path(InfinityContext::instance().config()->TempDir()); + tmp_dict_file = temp_dir / StringTransform(tmp_dict_file, "/", "_"); + tmp_posting_file = temp_dir / StringTransform(tmp_posting_file, "/", "_"); + tmp_column_length_file = temp_dir / StringTransform(tmp_column_length_file, "/", "_"); + tmp_fst_file = temp_dir / StringTransform(tmp_fst_file, "/", "_"); } - DeferFn defer_fn([&]() { - if (!use_object_cache) { - return; - } - pm->PutObjCache(posting_file); - pm->PutObjCache(dict_file); - pm->PutObjCache(column_length_file); - std::filesystem::remove(posting_file); - std::filesystem::remove(dict_file); - std::filesystem::remove(column_length_file); - }); - - SharedPtr dict_file_writer = MakeShared(fs_, dict_file, 1024); + SharedPtr dict_file_writer = MakeShared(fs_, tmp_dict_file, 1024); TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_))); - posting_file_writer_ = MakeShared(fs_, posting_file, 1024); - std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc); + posting_file_writer_ = MakeShared(fs_, tmp_posting_file, 1024); + std::ofstream ofs(tmp_fst_file.c_str(), std::ios::binary | std::ios::trunc); OstreamWriter wtr(ofs); FstBuilder fst_builder(wtr); @@ -100,7 +96,7 @@ void ColumnIndexMerger::Merge(const Vector &base_names, const VectorGetObjCache(column_len_file); + column_len_file = pm->GetObjPath(pm->GetObjCache(column_len_file).obj_key_); } auto [file_handler, status] = fs_.OpenFile(column_len_file, FileFlags::READ_FLAG, FileLockType::kNoLock); @@ -126,7 +122,7 @@ void ColumnIndexMerger::Merge(const Vector &base_names, const Vector &base_names, const VectorSync(); posting_file_writer_->Sync(); fst_builder.Finish(); - fs_.AppendFile(dict_file, fst_file); - fs_.DeleteFile(fst_file); + fs_.AppendFile(tmp_dict_file, tmp_fst_file); + fs_.DeleteFile(tmp_fst_file); + if (use_object_cache) { + pm->Persist(dict_file, tmp_dict_file, false); + pm->Persist(posting_file, tmp_posting_file, false); + pm->Persist(column_length_file, tmp_column_length_file, false); + } } void ColumnIndexMerger::MergeTerm(const String &term, diff --git a/src/storage/invertedindex/disk_segment_reader.cpp b/src/storage/invertedindex/disk_segment_reader.cpp index 25d26cf510..2ce9fa7467 100644 --- a/src/storage/invertedindex/disk_segment_reader.cpp +++ b/src/storage/invertedindex/disk_segment_reader.cpp @@ -49,7 +49,12 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St posting_file_.append(POSTING_SUFFIX); String posting_file = posting_file_; if (nullptr != pm) { - posting_file_obj_ = pm->GetObjCache(posting_file); + ObjAddr obj_addr = pm->GetObjCache(posting_file); + if (!obj_addr.Valid()) { + // Empty posting + return; + } + posting_file_obj_ = pm->GetObjPath(obj_addr.obj_key_); posting_file = posting_file_obj_; } if (posting_file.empty() || std::filesystem::file_size(posting_file) == 0) { @@ -67,7 +72,7 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St dict_file_.append(DICT_SUFFIX); String dict_file = dict_file_; if (nullptr != pm) { - dict_file = pm->GetObjCache(dict_file); + dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_); } dict_reader_ = MakeShared(dict_file, PostingFormatOption(flag)); } diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index e303f7a61d..a6a5a9d824 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -65,6 +65,7 @@ import defer_op; import blocking_queue; import segment_index_entry; import persistence_manager; +import utility; namespace infinity { constexpr int MAX_TUPLE_LENGTH = 1024; // we assume that analyzed term, together with docid/offset info, will never exceed such length @@ -82,8 +83,8 @@ MemoryIndexer::MemoryIndexer(const String &index_dir, const String &base_name, R assert(std::filesystem::path(index_dir).is_absolute()); posting_table_ = MakeShared(); prepared_posting_ = MakeShared(posting_format_, column_lengths_); - Path path = Path(index_dir) / (base_name + ".tmp.merge"); - spill_full_path_ = path.string(); + spill_full_path_ = Path(index_dir) / (base_name + ".tmp.merge"); + spill_full_path_ = Path(InfinityContext::instance().config()->TempDir()) / StringTransform(spill_full_path_, "/", "_"); } MemoryIndexer::~MemoryIndexer() { @@ -261,25 +262,30 @@ void MemoryIndexer::Dump(bool offline, bool spill) { while (GetInflightTasks() > 0) { CommitSync(100); } - Path path = Path(index_dir_) / base_name_; - String index_prefix = path.string(); + LocalFileSystem fs; - String posting_file = index_prefix + POSTING_SUFFIX + (spill ? SPILL_SUFFIX : ""); - String dict_file = index_prefix + DICT_SUFFIX + (spill ? SPILL_SUFFIX : ""); + String posting_file = Path(index_dir_) / (base_name_ + POSTING_SUFFIX + (spill ? SPILL_SUFFIX : "")); + String dict_file = Path(index_dir_) / (base_name_ + DICT_SUFFIX + (spill ? SPILL_SUFFIX : "")); + String column_length_file = Path(index_dir_) / (base_name_ + LENGTH_SUFFIX + (spill ? SPILL_SUFFIX : "")); + String tmp_posting_file(posting_file); + String tmp_dict_file(dict_file); + String tmp_column_length_file(column_length_file); PersistenceManager *pm = InfinityContext::instance().persistence_manager(); bool use_object_cache = pm != nullptr && !spill; if (use_object_cache) { - pm->ObjCreateRefCount(posting_file); - pm->ObjCreateRefCount(dict_file); + Path tmp_dir = Path(InfinityContext::instance().config()->TempDir()); + tmp_posting_file = tmp_dir / StringTransform(tmp_posting_file, "/", "_"); + tmp_dict_file = tmp_dir / StringTransform(tmp_dict_file, "/", "_"); + tmp_column_length_file = tmp_dir / StringTransform(tmp_column_length_file, "/", "_"); } - SharedPtr posting_file_writer = MakeShared(fs, posting_file, 128000); - SharedPtr dict_file_writer = MakeShared(fs, dict_file, 128000); + SharedPtr posting_file_writer = MakeShared(fs, tmp_posting_file, 128000); + SharedPtr dict_file_writer = MakeShared(fs, tmp_dict_file, 128000); TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_))); - String fst_file = dict_file + ".fst"; - std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc); + String tmp_fst_file = tmp_dict_file + ".fst"; + std::ofstream ofs(tmp_fst_file.c_str(), std::ios::binary | std::ios::trunc); OstreamWriter wtr(ofs); FstBuilder fst_builder(wtr); @@ -300,26 +306,10 @@ void MemoryIndexer::Dump(bool offline, bool spill) { posting_file_writer->Sync(); dict_file_writer->Sync(); fst_builder.Finish(); - fs.AppendFile(dict_file, fst_file); - fs.DeleteFile(fst_file); - } - - String column_length_file = index_prefix + LENGTH_SUFFIX + (spill ? SPILL_SUFFIX : ""); - if (use_object_cache) { - pm->ObjCreateRefCount(column_length_file); + fs.AppendFile(tmp_dict_file, tmp_fst_file); + fs.DeleteFile(tmp_fst_file); } - DeferFn defer_fn([&]() { - if (!use_object_cache) { - return; - } - pm->PutObjCache(posting_file); - pm->PutObjCache(dict_file); - pm->PutObjCache(column_length_file); - std::filesystem::remove(posting_file); - std::filesystem::remove(dict_file); - std::filesystem::remove(column_length_file); - }); - auto [file_handler, status] = fs.OpenFile(column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); + auto [file_handler, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); if (!status.ok()) { UnrecoverableError(status.message()); } @@ -327,6 +317,11 @@ void MemoryIndexer::Dump(bool offline, bool spill) { Vector &column_length_array = column_lengths_.UnsafeVec(); fs.Write(*file_handler, &column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size()); fs.Close(*file_handler); + if (use_object_cache) { + pm->Persist(posting_file, tmp_posting_file, false); + pm->Persist(dict_file, tmp_dict_file, false); + pm->Persist(column_length_file, tmp_column_length_file, false); + } is_spilled_ = spill; Reset(); @@ -396,18 +391,26 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtrObjCreateRefCount(posting_file); - InfinityContext::instance().persistence_manager()->ObjCreateRefCount(dict_file); + Path tmp_dir = Path(InfinityContext::instance().config()->TempDir()); + tmp_posting_file = tmp_dir / StringTransform(tmp_posting_file, "/", "_"); + tmp_dict_file = tmp_dir / StringTransform(tmp_dict_file, "/", "_"); + tmp_column_length_file = tmp_dir / StringTransform(tmp_column_length_file, "/", "_"); } - SharedPtr posting_file_writer = MakeShared(fs, posting_file, 128000); - SharedPtr dict_file_writer = MakeShared(fs, dict_file, 128000); + SharedPtr posting_file_writer = MakeShared(fs, tmp_posting_file, 128000); + SharedPtr dict_file_writer = MakeShared(fs, tmp_dict_file, 128000); TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_))); - String fst_file = index_prefix + DICT_SUFFIX + ".fst"; - std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc); + String tmp_fst_file = tmp_dict_file + ".fst"; + std::ofstream ofs(tmp_fst_file.c_str(), std::ios::binary | std::ios::trunc); OstreamWriter wtr(ofs); FstBuilder fst_builder(wtr); @@ -478,27 +481,10 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtrSync(); dict_file_writer->Sync(); fst_builder.Finish(); - fs.AppendFile(dict_file, fst_file); - fs.DeleteFile(fst_file); + fs.AppendFile(tmp_dict_file, tmp_fst_file); + fs.DeleteFile(tmp_fst_file); - String column_length_file = index_prefix + LENGTH_SUFFIX; - - if (use_object_cache) { - InfinityContext::instance().persistence_manager()->ObjCreateRefCount(column_length_file); - } - DeferFn defer_fn([&]() { - if (!use_object_cache) { - return; - } - InfinityContext::instance().persistence_manager()->PutObjCache(posting_file); - InfinityContext::instance().persistence_manager()->PutObjCache(dict_file); - InfinityContext::instance().persistence_manager()->PutObjCache(column_length_file); - std::filesystem::remove(posting_file); - std::filesystem::remove(dict_file); - std::filesystem::remove(column_length_file); - }); - - auto [file_handler, status] = fs.OpenFile(column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); + auto [file_handler, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); if (!status.ok()) { UnrecoverableError(status.message()); } @@ -506,6 +492,11 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtr &unsafe_column_lengths = column_lengths_.UnsafeVec(); fs.Write(*file_handler, &unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size()); fs.Close(*file_handler); + if (use_object_cache) { + pm->Persist(posting_file, tmp_posting_file, false); + pm->Persist(dict_file, tmp_dict_file, false); + pm->Persist(column_length_file, tmp_column_length_file, false); + } } void MemoryIndexer::OfflineDump() { diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index af38e9e29f..a10b8131b6 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -577,7 +577,8 @@ SharedPtr BlockEntry::DetermineDir(const String &parent_dir, BlockID blo LocalFileSystem fs; SharedPtr relative_dir = MakeShared(fmt::format("{}/blk_{}", parent_dir, block_id)); String full_dir = Path(InfinityContext::instance().config()->DataDir()) / *relative_dir; - fs.CreateDirectoryNoExp(full_dir); + if (InfinityContext::instance().persistence_manager() == nullptr) + fs.CreateDirectoryNoExp(full_dir); return relative_dir; } diff --git a/src/storage/persistence/persistence_manager.cpp b/src/storage/persistence/persistence_manager.cpp index c06b2e96fb..68a5d36479 100644 --- a/src/storage/persistence/persistence_manager.cpp +++ b/src/storage/persistence/persistence_manager.cpp @@ -152,9 +152,9 @@ PersistenceManager::~PersistenceManager() { assert(sum_ref_count == 0); } -ObjAddr PersistenceManager::Persist(const String &file_path) { +ObjAddr PersistenceManager::Persist(const String &file_path, const String &tmp_file_path, bool try_compose) { std::error_code ec; - fs::path src_fp = file_path; + fs::path src_fp = tmp_file_path; String local_path = RemovePrefix(file_path); if (local_path.empty()) { @@ -185,13 +185,13 @@ ObjAddr PersistenceManager::Persist(const String &file_path) { String error_message = fmt::format("Persist skipped empty local path {}.", file_path); LOG_WARN(error_message); return ObjAddr(); - } else if (src_size >= object_size_limit_) { + } else if (!try_compose || src_size >= object_size_limit_) { String obj_key = ObjCreate(); fs::path dst_fp = workspace_; dst_fp.append(obj_key); - bool ok = fs::copy_file(src_fp, dst_fp, fs::copy_options::overwrite_existing, ec); - if (!ok) { - String error_message = fmt::format("Failed to copy file {}.", file_path); + fs::rename(src_fp, dst_fp, ec); + if (ec) { + String error_message = fmt::format("Failed to rename {} to {}.", src_fp.string(), dst_fp.string()); UnrecoverableError(error_message); } ObjAddr obj_addr(obj_key, 0, src_size); @@ -212,7 +212,12 @@ ObjAddr PersistenceManager::Persist(const String &file_path) { CurrentObjFinalizeNoLock(); } ObjAddr obj_addr(current_object_key_, current_object_size_, src_size); - CurrentObjAppendNoLock(file_path, src_size); + CurrentObjAppendNoLock(tmp_file_path, src_size); + fs::remove(tmp_file_path, ec); + if (ec) { + String error_message = fmt::format("Failed to remove {}.", tmp_file_path); + UnrecoverableError(error_message); + } local_path_obj_[local_path] = obj_addr; LOG_TRACE(fmt::format("Persist local path {} to composed ObjAddr ({}, {}, {})", @@ -224,60 +229,6 @@ ObjAddr PersistenceManager::Persist(const String &file_path) { } } -ObjAddr PersistenceManager::Persist(const char *data, SizeT src_size) { - fs::path dst_fp = workspace_; - if (src_size == 0) { - String error_message = fmt::format("Persist skipped empty data."); - LOG_WARN(error_message); - return ObjAddr(); - } else if (src_size >= object_size_limit_) { - String obj_key = ObjCreate(); - dst_fp.append(obj_key); - std::ofstream outFile(dst_fp, std::ios::app); - if (!outFile.is_open()) { - String error_message = fmt::format("Failed to open file {}.", dst_fp.string()); - UnrecoverableError(error_message); - } - outFile.write(data, src_size); - outFile.close(); - ObjAddr obj_addr(obj_key, 0, src_size); - std::lock_guard lock(mtx_); - objects_.emplace(obj_key, ObjStat(src_size, 1, 0)); - LOG_TRACE(fmt::format("Persist added dedicated object {}", obj_key)); - return obj_addr; - } else { - dst_fp.append(current_object_key_); - std::lock_guard lock(mtx_); - if (int(src_size) > CurrentObjRoomNoLock()) { - CurrentObjFinalizeNoLock(); - } - ObjAddr obj_addr(current_object_key_, current_object_size_, src_size); - std::ofstream outFile(dst_fp, std::ios::app); - if (!outFile.is_open()) { - String error_message = fmt::format("Failed to open file {}.", dst_fp.string()); - UnrecoverableError(error_message); - } - outFile.write(data, src_size); - current_object_size_ += src_size; - current_object_parts_++; - if (current_object_size_ >= object_size_limit_) { - if (current_object_parts_ > 1) { - // Add footer to composed object -- format version 1 - const u32 compose_format = 1; - outFile.write((char *)&compose_format, sizeof(u32)); - } - - objects_.emplace(current_object_key_, ObjStat(src_size, current_object_parts_, 0)); - LOG_TRACE(fmt::format("Persist added composed object {}", current_object_key_)); - current_object_key_ = ObjCreate(); - current_object_size_ = 0; - current_object_parts_ = 0; - } - outFile.close(); - return obj_addr; - } -} - // TODO: // - Upload the finalized object to object store in background. void PersistenceManager::CurrentObjFinalize(bool validate) { @@ -345,7 +296,7 @@ void PersistenceManager::CurrentObjFinalizeNoLock() { } } -String PersistenceManager::GetObjCache(const String &file_path) { +ObjAddr PersistenceManager::GetObjCache(const String &file_path) { String local_path = RemovePrefix(file_path); if (local_path.empty()) { String error_message = fmt::format("Failed to find local path of {}", local_path); @@ -357,27 +308,12 @@ String PersistenceManager::GetObjCache(const String &file_path) { if (it == local_path_obj_.end()) { String error_message = fmt::format("GetObjCache Failed to find object for local path {}", local_path); LOG_WARN(error_message); - return ""; + return ObjAddr(); } auto oit = objects_.find(it->second.obj_key_); if (oit != objects_.end()) { oit->second.ref_count_++; } - return fs::path(workspace_).append(it->second.obj_key_).string(); -} - -ObjAddr PersistenceManager::GetObjFromLocalPath(const String &file_path) { - String local_path = RemovePrefix(file_path); - if (local_path.empty()) { - String error_message = fmt::format("Failed to find local path of {}", local_path); - UnrecoverableError(error_message); - } - - std::lock_guard lock(mtx_); - auto it = local_path_obj_.find(local_path); - if (it == local_path_obj_.end()) { - return ObjAddr(); - } return it->second; } @@ -394,6 +330,7 @@ void PersistenceManager::PutObjCache(const String &file_path) { String error_message = fmt::format("Failed to find file_path: {} stored object", local_path); UnrecoverableError(error_message); } + assert(it->second.part_size_ != 0); auto oit = objects_.find(it->second.obj_key_); if (oit == objects_.end()) { return; @@ -401,72 +338,18 @@ void PersistenceManager::PutObjCache(const String &file_path) { assert(oit->second.ref_count_ > 0); oit->second.ref_count_--; - if (it->second.part_size_ == 0 && it->second.part_offset_ == 0) { - assert(oit->second.ref_count_ == 0); - // For large files linked, fill in the file size when putting to ensure obj valid - // There's no footer in dedicated objects. - String obj_full_path = fs::path(workspace_).append(it->second.obj_key_).string(); - oit->second.obj_size_ = fs::file_size(obj_full_path); - oit->second.parts_ = 1; - it->second.part_size_ = oit->second.obj_size_; - - if (oit->second.obj_size_ == 0) { - // Avoid to persist empty objects. - objects_.erase(oit); - local_path_obj_.erase(it); - String error_message = fmt::format("PutObjCache skipped empty local path {}", file_path); - LOG_WARN(error_message); - } - } } String PersistenceManager::ObjCreate() { return UUID().to_string(); } -ObjAddr PersistenceManager::ObjCreateRefCount(const String &file_path) { - String obj_key = ObjCreate(); - ObjAddr obj_addr = ObjAddr(obj_key, 0, 0); - { - std::lock_guard lock(mtx_); - objects_.emplace(obj_key, ObjStat(0, 1, 1)); - LOG_TRACE(fmt::format("ObjCreateRefCount added dedicated {}, path: {}", obj_key, file_path)); - } - - fs::path src_fp = workspace_; - fs::path dst_fp = file_path; - src_fp.append(obj_key); - try { - if (fs::exists(dst_fp)) { - fs::remove(dst_fp); - } - fs::create_symlink(src_fp, dst_fp); - } catch (const fs::filesystem_error &e) { - String error_message = fmt::format("Failed to link file {}.", file_path); - UnrecoverableError(error_message); - } - std::lock_guard lock(mtx_); - String local_path = RemovePrefix(file_path); - if (local_path.empty()) { - String error_message = fmt::format("Failed to find local path of {}", local_path); - UnrecoverableError(error_message); - } - local_path_obj_[local_path] = obj_addr; - LOG_TRACE(fmt::format("ObjCreateRefCount local path {} to dedicated ObjAddr ({}, {}, {})", - local_path, - obj_addr.obj_key_, - obj_addr.part_offset_, - obj_addr.part_size_)); - return obj_addr; -} - int PersistenceManager::CurrentObjRoomNoLock() { return int(object_size_limit_) - int(current_object_size_); } -void PersistenceManager::CurrentObjAppendNoLock(const String &file_path, SizeT file_size) { - fs::path src_fp = file_path; - fs::path dst_fp = workspace_; - dst_fp.append(current_object_key_); +void PersistenceManager::CurrentObjAppendNoLock(const String &tmp_file_path, SizeT file_size) { + fs::path src_fp = tmp_file_path; + fs::path dst_fp = Path(workspace_) / current_object_key_; std::ifstream srcFile(src_fp, std::ios::binary); if (!srcFile.is_open()) { - String error_message = fmt::format("Failed to open source file {}", file_path); + String error_message = fmt::format("Failed to open source file {}", tmp_file_path); UnrecoverableError(error_message); } std::ofstream dstFile(dst_fp, std::ios::binary | std::ios::app); @@ -727,7 +610,7 @@ void AddrSerializer::Initialize(PersistenceManager *pm, const Vector &pa } for (const String &path : path) { paths_.push_back(path); - ObjAddr obj_addr = pm->GetObjFromLocalPath(path); + ObjAddr obj_addr = pm->GetObjCache(path); obj_addrs_.push_back(obj_addr); if (!obj_addr.Valid()) { // In ImportWal, version file is not flushed here, set before write wal @@ -736,6 +619,7 @@ void AddrSerializer::Initialize(PersistenceManager *pm, const Vector &pa } else { ObjStat obj_stat = pm->GetObjStatByObjAddr(obj_addr); obj_stats_.push_back(obj_stat); + pm->PutObjCache(path); } } } @@ -748,13 +632,14 @@ void AddrSerializer::InitializeValid(PersistenceManager *pm) { if (obj_addrs_[i].Valid()) { continue; } - ObjAddr obj_addr = pm->GetObjFromLocalPath(paths_[i]); + ObjAddr obj_addr = pm->GetObjCache(paths_[i]); obj_addrs_[i] = obj_addr; if (!obj_addr.Valid()) { UnrecoverableError(fmt::format("Invalid object address for path {}", paths_[i])); } else { ObjStat obj_stat = pm->GetObjStatByObjAddr(obj_addr); obj_stats_[i] = obj_stat; + pm->PutObjCache(paths_[i]); } } } diff --git a/src/storage/persistence/persistence_manager.cppm b/src/storage/persistence/persistence_manager.cppm index 0d36a819a7..290d74a93f 100644 --- a/src/storage/persistence/persistence_manager.cppm +++ b/src/storage/persistence/persistence_manager.cppm @@ -78,30 +78,16 @@ public: ~PersistenceManager(); - /** - * For composed objects - */ // Create new object or append to current object, and returns the location. - ObjAddr Persist(const String &file_path); - - // Create new object or append to current object, and returns the location. - ObjAddr Persist(const char *data, SizeT len); + // file_path is the key of local_path_obj_ and may not exist. tmp_file_path is the file which contains the data to be persisted. + // tmp_file_path will be deleted after its data be persisted. + ObjAddr Persist(const String &file_path, const String &tmp_file_path, bool try_compose = true); // Force finalize current object. Subsequent append on the finalized object is forbidden. void CurrentObjFinalize(bool validate = false); - /** - * For dedicated objects - */ - ObjAddr ObjCreateRefCount(const String &file_path); - - /** - * For composed and dedicated objects - */ // Download the whole object from object store if it's not in cache. Increase refcount and return the cached object file path. - String GetObjCache(const String &local_path); - - ObjAddr GetObjFromLocalPath(const String &file_path); + ObjAddr GetObjCache(const String &local_path); void PutObjCache(const String &file_path); @@ -110,6 +96,7 @@ public: /** * Utils */ + String GetObjPath(const String &obj_key) { return std::filesystem::path(workspace_).append(obj_key).string(); } nlohmann::json Serialize(); void Deserialize(const nlohmann::json &obj); @@ -125,7 +112,7 @@ private: // Append file to the current object. // It finalize current object if new size exceeds the size limit. - void CurrentObjAppendNoLock(const String &file_path, SizeT file_size); + void CurrentObjAppendNoLock(const String &tmp_file_path, SizeT file_size); // Finalize current object. void CurrentObjFinalizeNoLock(); diff --git a/src/unit_test/storage/buffer/buffer_handle.cpp b/src/unit_test/storage/buffer/buffer_handle.cpp index 7fda16d77a..ef722225b4 100644 --- a/src/unit_test/storage/buffer/buffer_handle.cpp +++ b/src/unit_test/storage/buffer/buffer_handle.cpp @@ -37,9 +37,9 @@ TEST_P(BufferHandleTest, test1) { SizeT memory_limit = 1024; String data_dir(GetFullDataDir()); - auto temp_dir = MakeShared(data_dir + "/spill"); + auto temp_dir = MakeShared(GetFullTmpDir()); auto base_dir = MakeShared(GetFullDataDir()); - auto persistence_dir = MakeShared(data_dir + "/persistence"); + auto persistence_dir = MakeShared(GetFullPersistDir()); UniquePtr persistence_manager = MakeUnique(*persistence_dir, *base_dir, DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT); BufferManager buffer_manager(memory_limit, base_dir, temp_dir, persistence_manager.get()); diff --git a/src/unit_test/storage/invertedindex/posting_merger.cpp b/src/unit_test/storage/invertedindex/posting_merger.cpp index e4e3cfd427..1909ef22d8 100644 --- a/src/unit_test/storage/invertedindex/posting_merger.cpp +++ b/src/unit_test/storage/invertedindex/posting_merger.cpp @@ -151,7 +151,7 @@ TEST_P(PostingMergerTest, Basic) { String real_column_len_file = column_len_file; PersistenceManager *pm = InfinityContext::instance().persistence_manager(); if (pm != nullptr) { - real_column_len_file = pm->GetObjCache(real_column_len_file); + real_column_len_file = pm->GetObjPath(pm->GetObjCache(real_column_len_file).obj_key_); } RowID base_row_id = row_ids[i]; u32 id_offset = base_row_id - merge_base_rowid; diff --git a/src/unit_test/storage/persistence/persistence_manager.cpp b/src/unit_test/storage/persistence/persistence_manager.cpp index 6c0c0ad33e..f743fa4554 100644 --- a/src/unit_test/storage/persistence/persistence_manager.cpp +++ b/src/unit_test/storage/persistence/persistence_manager.cpp @@ -29,8 +29,8 @@ class PersistenceManagerTest : public BaseTest { }; void PersistenceManagerTest::CheckObjData(const String& local_file_path, const String& data) { - String obj_path = pm_->GetObjCache(local_file_path); - auto obj_addr = pm_->GetObjFromLocalPath(local_file_path); + auto obj_addr = pm_->GetObjCache(local_file_path); + String obj_path = pm_->GetObjPath(obj_addr.obj_key_); fs::path obj_fp(obj_path); ASSERT_TRUE(fs::exists(obj_fp)); ASSERT_EQ(obj_addr.part_size_, data.size()); @@ -56,7 +56,7 @@ TEST_F(PersistenceManagerTest, PersistFileBasic) { String persist_str = "Persistence Manager Test"; out_file << persist_str; out_file.close(); - ObjAddr obj_addr = pm_->Persist(file_path); + ObjAddr obj_addr = pm_->Persist(file_path, file_path); ASSERT_TRUE(obj_addr.Valid()); ASSERT_EQ(obj_addr.part_size_, persist_str.size()); pm_->CurrentObjFinalize(); @@ -78,7 +78,7 @@ TEST_F(PersistenceManagerTest, PersistMultiFile) { file_paths.push_back(file_path); persist_strs.push_back(persist_str); - ObjAddr obj_addr = pm_->Persist(file_path); + ObjAddr obj_addr = pm_->Persist(file_path, file_path); ASSERT_TRUE(obj_addr.Valid()); ASSERT_EQ(obj_addr.part_size_, persist_str.size()); obj_addrs.push_back(obj_addr); @@ -109,7 +109,7 @@ TEST_F(PersistenceManagerTest, PersistFileMultiThread) { persist_strs.push_back(persist_str); threads.emplace_back([this, file_path, persist_str, &obj_addrs, &obj_mutex]() { - ObjAddr obj_addr = pm_->Persist(file_path); + ObjAddr obj_addr = pm_->Persist(file_path, file_path); ASSERT_TRUE(obj_addr.Valid()); ASSERT_EQ(obj_addr.part_size_, persist_str.size()); std::unique_lock lock(obj_mutex); @@ -144,7 +144,7 @@ TEST_F(PersistenceManagerTest, CleanupBasic) { file_paths.push_back(file_path); persist_strs.push_back(persist_str); - ObjAddr obj_addr = pm_->Persist(file_path); + ObjAddr obj_addr = pm_->Persist(file_path, file_path); ASSERT_TRUE(obj_addr.Valid()); ASSERT_EQ(obj_addr.part_size_, persist_str.size()); obj_addrs.push_back(obj_addr);