Skip to content

Commit

Permalink
Don't write data to data_dir (#1865)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Purpose: leader and follower can share the same data, catalog,
persistence folder. Only WAL directory are different.

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
Co-authored-by: Zhichang Yu <[email protected]>
  • Loading branch information
JinHai-CN and yuzhichang authored Sep 16, 2024
1 parent 5be1b0c commit 2062275
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 303 deletions.
2 changes: 1 addition & 1 deletion scripts/Dockerfile_infinity
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ FROM debian:stable-slim
# https://docs.docker.com/reference/dockerfile/#copy
# If <src> is a directory, the entire contents of the directory are copied, including filesystem metadata.
# The directory itself isn't copied, only its contents.
COPY cmake-build-release/src/infinity /usr/bin
COPY cmake-build-reldeb/src/infinity /usr/bin

ENTRYPOINT ["/usr/bin/infinity"]
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ target_link_libraries(unit_test
event.a
)

add_dependencies(unit_test oatpp)

target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/lib")
target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/third_party/arrow/")
target_link_directories(unit_test PUBLIC "${CMAKE_BINARY_DIR}/third_party/snappy/")
Expand Down
1 change: 1 addition & 0 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ export namespace std {
using std::filesystem::read_symlink;
using std::filesystem::remove;
using std::filesystem::remove_all;
using std::filesystem::rename;

using std::filesystem::is_directory;
using std::filesystem::is_regular_file;
Expand Down
11 changes: 10 additions & 1 deletion src/common/utility/random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import third_party;
import logger;
import local_file_system;
import default_values;
import infinity_context;

namespace infinity {

Expand Down Expand Up @@ -51,12 +52,20 @@ SharedPtr<String> DetermineRandomString(const String &parent_dir, const String &
initialized = true;
srand(std::time(nullptr));
}

bool use_persistence_manager = InfinityContext::instance().persistence_manager();
bool created = false;
do {
rand = RandomString(DEFAULT_RANDOM_NAME_LEN);
result = fmt::format("{}_{}", rand, name);
temp = LocalFileSystem::ConcatenateFilePath(parent_dir, result);
++cnt;
} while (!fs.CreateDirectoryNoExp(temp));
if(!use_persistence_manager) {
created = fs.CreateDirectoryNoExp(temp);
} else {
created = true;
}
} while (!created);
LOG_DEBUG(fmt::format("Created directory {} in {} times", temp, cnt));
return MakeShared<String>(std::move(result));
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/utility/utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,14 @@ bool ParseIPPort(const String &str, String &ip, i64 &port) {
}
}

String StringTransform(const String &source, const String &from, const String &to) {
String ret(source);
size_t start_pos = 0;
while ((start_pos = ret.find(from, start_pos)) != String::npos) {
ret.replace(start_pos, from.length(), to);
start_pos += to.length();
}
return ret;
}

} // namespace infinity
1 change: 1 addition & 0 deletions src/common/utility/utility.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ IdentifierValidationStatus IdentifierValidation(const String &identifier);

bool ParseIPPort(const String &str, String &ip, i64 &port);

String StringTransform(const String &source, const String &from, const String &to);
}

8 changes: 7 additions & 1 deletion src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,13 @@ void PhysicalShow::ExecuteShowIndex(QueryContext *query_context, ShowOperatorSta
++column_id;
{
const String table_dir = fmt::format("{}/{}", InfinityContext::instance().config()->DataDir(), *table_index_info->index_entry_dir_);
const auto &index_size = Utility::FormatByteSize(LocalFileSystem::GetFolderSizeByPath(table_dir));
u64 index_dir_size = 0;
if (InfinityContext::instance().persistence_manager() == nullptr) {
index_dir_size = LocalFileSystem::GetFolderSizeByPath(table_dir);
} else {
// TODO: calculate the sum of object parts which's has the prefix table_dir
}
const auto &index_size = Utility::FormatByteSize(index_dir_size);
Value value = Value::MakeVarchar(index_size);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
Expand Down
121 changes: 72 additions & 49 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module;
module file_worker;

import stl;

import utility;
import infinity_exception;
import local_file_system;
import third_party;
Expand All @@ -38,47 +38,70 @@ bool FileWorker::WriteToFile(bool to_spill, const FileWorkerSaveCtx &ctx) {
String error_message = "No data will be written.";
UnrecoverableError(error_message);
}
LocalFileSystem fs;

String write_dir = ChooseFileDir(to_spill);
if (!fs.Exists(write_dir)) {
fs.CreateDirectory(write_dir);
}
String write_path = fmt::format("{}/{}", write_dir, *file_name_);
LocalFileSystem fs;

u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG;
auto [file_handler, status] = fs.OpenFile(write_path, flags, FileLockType::kWriteLock);
if (!status.ok()) {
UnrecoverableError(status.message());
}
file_handler_ = std::move(file_handler);
if(persistence_manager_ != nullptr && !to_spill) {
String write_dir = *file_dir_;
String write_path = Path(*data_dir_) / write_dir / *file_name_;
String tmp_write_path = Path(*temp_dir_) / StringTransform(write_path, "/", "_");

if (to_spill) {
auto local_file_handle = static_cast<LocalFileHandler *>(file_handler_.get());
LOG_TRACE(fmt::format("Open spill file: {}, fd: {}", write_path, local_file_handle->fd_));
}
bool prepare_success = false;
u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG;
auto [file_handler, status] = fs.OpenFile(tmp_write_path, flags, FileLockType::kWriteLock);
if (!status.ok()) {
UnrecoverableError(status.message());
}
file_handler_ = std::move(file_handler);
bool prepare_success = false;

DeferFn defer_fn([&]() {
fs.Close(*file_handler_);
file_handler_ = nullptr;
});
DeferFn defer_fn([&]() {
fs.Close(*file_handler_);
file_handler_ = nullptr;
});

bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx);
if (prepare_success) {
if (to_spill) {
LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success));
bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx);
if (prepare_success) {
fs.SyncFile(*file_handler_);
}
fs.SyncFile(*file_handler_);
}

bool use_object_cache = !to_spill && persistence_manager_ != nullptr;
if (use_object_cache) {
fs.SyncFile(*file_handler_);
obj_addr_ = persistence_manager_->Persist(write_path);
fs.DeleteFile(write_path);
obj_addr_ = persistence_manager_->Persist(write_path, tmp_write_path);
fs.DeleteFile(tmp_write_path);
return all_save;
} else {
String write_dir = ChooseFileDir(to_spill);
if (!fs.Exists(write_dir)) {
fs.CreateDirectory(write_dir);
}
String write_path = fmt::format("{}/{}", write_dir, *file_name_);

u8 flags = FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG;
auto [file_handler, status] = fs.OpenFile(write_path, flags, FileLockType::kWriteLock);
if (!status.ok()) {
UnrecoverableError(status.message());
}
file_handler_ = std::move(file_handler);

if (to_spill) {
auto local_file_handle = static_cast<LocalFileHandler *>(file_handler_.get());
LOG_TRACE(fmt::format("Open spill file: {}, fd: {}", write_path, local_file_handle->fd_));
}
bool prepare_success = false;

DeferFn defer_fn([&]() {
fs.Close(*file_handler_);
file_handler_ = nullptr;
});

bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx);
if (prepare_success) {
if (to_spill) {
LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success));
}
fs.SyncFile(*file_handler_);
}
return all_save;
}
return all_save;
}

void FileWorker::ReadFromFile(bool from_spill) {
Expand All @@ -87,12 +110,12 @@ void FileWorker::ReadFromFile(bool from_spill) {
bool use_object_cache = !from_spill && persistence_manager_ != nullptr;
read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_);
if (use_object_cache) {
obj_addr_ = persistence_manager_->GetObjFromLocalPath(read_path);
obj_addr_ = persistence_manager_->GetObjCache(read_path);
if (!obj_addr_.Valid()) {
String error_message = fmt::format("Failed to find object for local path {}", read_path);
UnrecoverableError(error_message);
}
read_path = persistence_manager_->GetObjCache(read_path);
read_path = persistence_manager_->GetObjPath(obj_addr_.obj_key_);
}
SizeT file_size = 0;
u8 flags = FileFlags::READ_FLAG;
Expand Down Expand Up @@ -124,20 +147,20 @@ void FileWorker::MoveFile() {
String src_path = fmt::format("{}/{}", ChooseFileDir(true), *file_name_);
String dest_dir = ChooseFileDir(false);
String dest_path = fmt::format("{}/{}", dest_dir, *file_name_);
if (!fs.Exists(src_path)) {
Status status = Status::FileNotFound(src_path);
RecoverableError(status);
}
if (!fs.Exists(dest_dir)) {
fs.CreateDirectory(dest_dir);
}
// if (fs.Exists(dest_path)) {
// UnrecoverableError(fmt::format("File {} was already been created before.", dest_path));
// }
fs.Rename(src_path, dest_path);
if (persistence_manager_ != nullptr) {
obj_addr_ = persistence_manager_->Persist(dest_path);
fs.DeleteFile(dest_path);
if (persistence_manager_ == nullptr) {
if (!fs.Exists(src_path)) {
Status status = Status::FileNotFound(src_path);
RecoverableError(status);
}
if (!fs.Exists(dest_dir)) {
fs.CreateDirectory(dest_dir);
}
// if (fs.Exists(dest_path)) {
// UnrecoverableError(fmt::format("File {} was already been created before.", dest_path));
// }
fs.Rename(src_path, dest_path);
} else {
obj_addr_ = persistence_manager_->Persist(dest_path, src_path);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/invertedindex/column_index_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ColumnIndexIterator::ColumnIndexIterator(const String &index_dir, const String &
if (use_object_cache) {
dict_file_path_ = dict_file;
posting_file_path_ = posting_file;
dict_file = pm->GetObjCache(dict_file);
posting_file = pm->GetObjCache(posting_file);
dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_);
posting_file = pm->GetObjPath(pm->GetObjCache(posting_file).obj_key_);
}

dict_reader_ = MakeShared<DictionaryReader>(dict_file, PostingFormatOption(flag));
Expand Down
45 changes: 23 additions & 22 deletions src/storage/invertedindex/column_index_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import logger;
import persistence_manager;
import infinity_context;
import defer_op;
import utility;

namespace infinity {
ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag) : index_dir_(index_dir), flag_(flag) {}
Expand All @@ -50,31 +51,26 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
String posting_file = index_prefix + POSTING_SUFFIX;
String column_length_file = index_prefix + LENGTH_SUFFIX;

String tmp_dict_file(dict_file);
String tmp_posting_file(posting_file);
String tmp_column_length_file(column_length_file);
String tmp_fst_file(fst_file);

// handle persistence obj_addrs
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
bool use_object_cache = pm != nullptr;
if (use_object_cache) {
pm->ObjCreateRefCount(dict_file);
pm->ObjCreateRefCount(posting_file);
pm->ObjCreateRefCount(column_length_file);
Path temp_dir = Path(InfinityContext::instance().config()->TempDir());
tmp_dict_file = temp_dir / StringTransform(tmp_dict_file, "/", "_");
tmp_posting_file = temp_dir / StringTransform(tmp_posting_file, "/", "_");
tmp_column_length_file = temp_dir / StringTransform(tmp_column_length_file, "/", "_");
tmp_fst_file = temp_dir / StringTransform(tmp_fst_file, "/", "_");
}

DeferFn defer_fn([&]() {
if (!use_object_cache) {
return;
}
pm->PutObjCache(posting_file);
pm->PutObjCache(dict_file);
pm->PutObjCache(column_length_file);
std::filesystem::remove(posting_file);
std::filesystem::remove(dict_file);
std::filesystem::remove(column_length_file);
});

SharedPtr<FileWriter> dict_file_writer = MakeShared<FileWriter>(fs_, dict_file, 1024);
SharedPtr<FileWriter> dict_file_writer = MakeShared<FileWriter>(fs_, tmp_dict_file, 1024);
TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_)));
posting_file_writer_ = MakeShared<FileWriter>(fs_, posting_file, 1024);
std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc);
posting_file_writer_ = MakeShared<FileWriter>(fs_, tmp_posting_file, 1024);
std::ofstream ofs(tmp_fst_file.c_str(), std::ios::binary | std::ios::trunc);
OstreamWriter wtr(ofs);
FstBuilder fst_builder(wtr);

Expand All @@ -100,7 +96,7 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
u32 id_offset = base_row_id - merge_base_rowid;

if (use_object_cache) {
column_len_file = pm->GetObjCache(column_len_file);
column_len_file = pm->GetObjPath(pm->GetObjCache(column_len_file).obj_key_);
}

auto [file_handler, status] = fs_.OpenFile(column_len_file, FileFlags::READ_FLAG, FileLockType::kNoLock);
Expand All @@ -126,7 +122,7 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
}
}

auto [file_handler, status] = fs_.OpenFile(column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock);
auto [file_handler, status] = fs_.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock);
if (!status.ok()) {
UnrecoverableError(status.message());
}
Expand All @@ -148,8 +144,13 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<Row
dict_file_writer->Sync();
posting_file_writer_->Sync();
fst_builder.Finish();
fs_.AppendFile(dict_file, fst_file);
fs_.DeleteFile(fst_file);
fs_.AppendFile(tmp_dict_file, tmp_fst_file);
fs_.DeleteFile(tmp_fst_file);
if (use_object_cache) {
pm->Persist(dict_file, tmp_dict_file, false);
pm->Persist(posting_file, tmp_posting_file, false);
pm->Persist(column_length_file, tmp_column_length_file, false);
}
}

void ColumnIndexMerger::MergeTerm(const String &term,
Expand Down
9 changes: 7 additions & 2 deletions src/storage/invertedindex/disk_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St
posting_file_.append(POSTING_SUFFIX);
String posting_file = posting_file_;
if (nullptr != pm) {
posting_file_obj_ = pm->GetObjCache(posting_file);
ObjAddr obj_addr = pm->GetObjCache(posting_file);
if (!obj_addr.Valid()) {
// Empty posting
return;
}
posting_file_obj_ = pm->GetObjPath(obj_addr.obj_key_);
posting_file = posting_file_obj_;
}
if (posting_file.empty() || std::filesystem::file_size(posting_file) == 0) {
Expand All @@ -67,7 +72,7 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St
dict_file_.append(DICT_SUFFIX);
String dict_file = dict_file_;
if (nullptr != pm) {
dict_file = pm->GetObjCache(dict_file);
dict_file = pm->GetObjPath(pm->GetObjCache(dict_file).obj_key_);
}
dict_reader_ = MakeShared<DictionaryReader>(dict_file, PostingFormatOption(flag));
}
Expand Down
Loading

0 comments on commit 2062275

Please sign in to comment.