Skip to content

Commit

Permalink
Fix insert (#1675)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Make block version member private.
2. Fix block version file skip save after spill.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored Aug 20, 2024
1 parent fdf97ef commit bd40417
Show file tree
Hide file tree
Showing 35 changed files with 267 additions and 136 deletions.
54 changes: 54 additions & 0 deletions python/test_pysdk/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,57 @@ def test_insert_no_match_column(self, column_name, suffix):
res = db_obj.drop_table(
"test_insert_no_match_column"+suffix, ConflictType.Error)
assert res.error_code == ErrorCode.OK

@pytest.mark.slow
def test_insert_with_large_data(self, suffix):
total_row_count = 3,000,000

db_obj = self.infinity_obj.get_database("default_db")
db_obj.drop_table("hr_data_mix"+suffix, ConflictType.Ignore)
table_obj = db_obj.create_table("hr_data_mix" + suffix, {
"id": {"type": "varchar"},
"content": {"type": "varchar"},
"dense_vec": {"type": "vector, 1024, float"},
"sparse_vec": {"type": "sparse,250002,float,int"},
})

import json
import time
def read_jsonl(file_path):
data = []
with open(file_path, 'r') as file:
for line in file:
data.append(json.loads(line))
return data
data_array = read_jsonl("./test/data/jsonl/test_table.jsonl")
loop_count: int = total_row_count // len(data_array)

start = time.time()
for global_idx in range(loop_count):
insert_data = []
for local_idx, data in enumerate(data_array):

# each 1000, a duplicated row is generated
if local_idx == 9 and global_idx % 1000 == 0:
end = time.time()
print(f"ID: {global_idx}@{local_idx}, cost: {end - start}s")
data = data_array[0]
local_idx = 0

indices = []
values = []
for key, value in data['sparse_vec'].items():
indices.append(int(key))
values.append(value)

insert_data.append(
{
"id": f'{global_idx}@{local_idx}',
"content": data['content'],
"dense_vec": data['dense_vec'],
"sparse_vec": SparseVector(indices, values)
}
)
table_obj.insert(insert_data)
res = table_obj.output(["count(*)"]).to_pl()
assert res.height == 1 and res.width == 1 and res.item(0, 0) == total_row_count
2 changes: 1 addition & 1 deletion src/executor/operator/physical_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ SizeT PhysicalExport::ExportToPARQUET(QueryContext *query_context, ExportOperato
break;
}
default: {
column_vectors.emplace_back(block_entry->GetColumnBlockEntry(select_column_idx)->GetColumnVector(buffer_manager));
column_vectors.emplace_back(block_entry->GetColumnBlockEntry(select_column_idx)->GetConstColumnVector(buffer_manager));
if (column_vectors[block_column_idx].Size() != block_row_count) {
String error_message = "Unmatched row_count between block and block_column";
LOG_CRITICAL(error_message);
Expand Down
11 changes: 11 additions & 0 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,17 @@ String Config::LogFilePath() {
return fmt::format("{}/{}", global_options_.GetStringValue(GlobalOptionIndex::kLogDir), global_options_.GetStringValue(GlobalOptionIndex::kLogFileName));
}

void Config::SetLogToStdout(bool log_to_stdout) {
std::lock_guard<std::mutex> guard(mutex_);
BaseOption *base_option = global_options_.GetOptionByIndex(GlobalOptionIndex::kLogToStdout);
if (base_option->data_type_ != BaseOptionDataType::kBoolean) {
String error_message = "Attempt to set bool value to log to stdout data type option";
UnrecoverableError(error_message);
}
BooleanOption *log_to_stdout_option = static_cast<BooleanOption *>(base_option);
log_to_stdout_option->value_ = log_to_stdout;
}

bool Config::LogToStdout() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetBoolValue(GlobalOptionIndex::kLogToStdout);
Expand Down
3 changes: 3 additions & 0 deletions src/main/config.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public:
String LogFileName();
String LogDir();
String LogFilePath();

void SetLogToStdout(bool log_to_stdout);
bool LogToStdout();

i64 LogFileMaxSize();
i64 LogFileRotateCount();

Expand Down
18 changes: 11 additions & 7 deletions src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import buffer_handle;
import buffer_manager;
import infinity_exception;
import logger;
import var_file_worker;
import third_party;
import logger;
import file_worker_type;
Expand Down Expand Up @@ -111,7 +110,11 @@ bool BufferObj::Free() {
}
case BufferType::kEphemeral: {
type_ = BufferType::kTemp;
file_worker_->WriteToFile(true);
bool all_save = file_worker_->WriteToFile(true);
if (!all_save) {
String error_message = fmt::format("Spill to file failed: {}", GetFilename());
UnrecoverableError(error_message);
}
buffer_mgr_->AddTemp(this);
break;
}
Expand All @@ -121,15 +124,18 @@ bool BufferObj::Free() {
return true;
}

bool BufferObj::Save() {
bool BufferObj::Save(const FileWorkerSaveCtx &ctx) {
bool write = false;
std::unique_lock<std::mutex> locker(w_locker_);
if (type_ == BufferType::kEphemeral) {
switch (status_) {
case BufferStatus::kLoaded:
case BufferStatus::kUnloaded: {
LOG_TRACE(fmt::format("BufferObj::Save file: {}", GetFilename()));
file_worker_->WriteToFile(false);
bool all_save = file_worker_->WriteToFile(false, ctx);
if (all_save) {
type_ = BufferType::kPersistent;
}
write = true;
break;
}
Expand All @@ -147,8 +153,8 @@ bool BufferObj::Save() {
LOG_TRACE(fmt::format("BufferObj::Move file: {}", GetFilename()));
buffer_mgr_->MoveTemp(this);
file_worker_->MoveFile();
type_ = BufferType::kPersistent;
}
type_ = BufferType::kPersistent;
return write;
}

Expand Down Expand Up @@ -240,14 +246,12 @@ bool BufferObj::AddBufferSize(SizeT add_size) {
if (file_worker_->Type() != FileWorkerType::kVarFile) {
UnrecoverableError("Invalid file worker type");
}
auto *var_file_worker = static_cast<VarFileWorker *>(file_worker_.get());

bool free_success = buffer_mgr_->RequestSpace(add_size);
if (!free_success) {
String warn_msg = fmt::format("Request memory {} failed, current memory usage: {}", add_size, buffer_mgr_->memory_usage());
LOG_WARN(warn_msg);
}
var_file_worker->AddBufferSize(add_size);
return free_success;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public:
bool Free();

// called when checkpoint. or in "IMPORT" operator.
bool Save();
bool Save(const FileWorkerSaveCtx &ctx = {});

void PickForCleanup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public:
FileWorkerType Type() const override { return FileWorkerType::kIVFFlatIndexFile; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down Expand Up @@ -129,10 +129,11 @@ void AnnIVFFlatIndexFileWorker<DataType>::FreeInMemory() {
}

template <typename DataType>
void AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
auto *index = static_cast<AnnIVFFlatIndexData<DataType> *>(data_);
index->SaveIndexInner(*file_handler_);
prepare_success = true;
return true;
}

template <typename DataType>
Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void BMPIndexFileWorker::FreeInMemory() {
data_ = nullptr;
}

void BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
if (!data_) {
UnrecoverableError("Data is not allocated.");
}
Expand All @@ -99,6 +99,7 @@ void BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
},
*bmp_index);
prepare_success = true;
return true;
}

void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public:
SizeT GetMemoryCost() const override { return index_size_; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/data_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void DataFileWorker::FreeInMemory() {
}

// FIXME: to_spill
void DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
LocalFileSystem fs;
// File structure:
// - header: magic number
Expand Down Expand Up @@ -91,6 +91,7 @@ void DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
RecoverableError(status);
}
prepare_success = true; // Not run defer_fn
return true;
}

void DataFileWorker::ReadFromFileImpl(SizeT file_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/data_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public:
FileWorkerType Type() const override { return FileWorkerType::kDataFile; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ void EMVBIndexFileWorker::FreeInMemory() {
data_ = nullptr;
}

void EMVBIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool EMVBIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
auto *index = static_cast<EMVBIndex *>(data_);
index->SaveIndexInner(*file_handler_);
prepare_success = true;
return true;
}

void EMVBIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public:
FileWorkerType Type() const override { return FileWorkerType::kEMVBIndexFile; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down
5 changes: 3 additions & 2 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace infinity {

FileWorker::~FileWorker() = default;

void FileWorker::WriteToFile(bool to_spill) {
bool FileWorker::WriteToFile(bool to_spill, const FileWorkerSaveCtx &ctx) {
if (data_ == nullptr) {
String error_message = "No data will be written.";
UnrecoverableError(error_message);
Expand Down Expand Up @@ -64,7 +64,7 @@ void FileWorker::WriteToFile(bool to_spill) {
file_handler_ = nullptr;
});

WriteToFileImpl(to_spill, prepare_success);
bool all_save = WriteToFileImpl(to_spill, prepare_success, ctx);
if (prepare_success) {
if (to_spill) {
LOG_TRACE(fmt::format("Write to spill file {} finished. success {}", write_path, prepare_success));
Expand All @@ -78,6 +78,7 @@ void FileWorker::WriteToFile(bool to_spill) {
obj_addr_ = InfinityContext::instance().persistence_manager()->Persist(write_path);
fs.DeleteFile(write_path);
}
return all_save;
}

void FileWorker::ReadFromFile(bool from_spill) {
Expand Down
6 changes: 4 additions & 2 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import persistence_manager;

namespace infinity {

export struct FileWorkerSaveCtx {};

export class FileWorker {
public:
// spill_dir_ is not init here
Expand All @@ -33,7 +35,7 @@ public:
virtual ~FileWorker();

public:
void WriteToFile(bool to_spill);
[[nodiscard]] bool WriteToFile(bool to_spill, const FileWorkerSaveCtx &ctx = {});

void ReadFromFile(bool from_spill);

Expand Down Expand Up @@ -62,7 +64,7 @@ public:
void CleanupTempFile() const;

protected:
virtual void WriteToFileImpl(bool to_spill, bool &prepare_success) = 0;
virtual bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx = {}) = 0;

virtual void ReadFromFileImpl(SizeT file_size) = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void HnswFileWorker::FreeInMemory() {
data_ = nullptr;
}

void HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
if (!data_) {
String error_message = "WriteToFileImpl: Data is not allocated.";
UnrecoverableError(error_message);
Expand All @@ -106,6 +106,7 @@ void HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
},
*hnsw_index);
prepare_success = true;
return true;
}

void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import knn_expr;
import column_def;
import internal_types;
import file_worker_type;
import file_worker;

namespace infinity {

Expand Down Expand Up @@ -55,7 +56,7 @@ public:
SizeT GetMemoryCost() const override { return index_size_; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void RawFileWorker::FreeInMemory() {
data_ = nullptr;
}

void RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
bool RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
assert(data_ != nullptr && buffer_size_ > 0);
LocalFileSystem fs;
i64 nbytes = fs.Write(*file_handler_, data_, buffer_size_);
Expand All @@ -66,6 +66,7 @@ void RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
RecoverableError(status);
}
prepare_success = true; // Not run defer_fn
return true;
}

void RawFileWorker::ReadFromFileImpl(SizeT file_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public:
FileWorkerType Type() const override { return FileWorkerType::kRawFile; }

protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;
bool WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) override;

void ReadFromFileImpl(SizeT file_size) override;

Expand Down
Loading

0 comments on commit bd40417

Please sign in to comment.