Skip to content

Commit

Permalink
Fix restart (infiniflow#2356)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix restart problem after checkpoint & alter
Fix restart problem after checkpoint & compact
Add coresponding test case.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored and vsian committed Dec 13, 2024
1 parent 19cfc28 commit 6deaa6a
Show file tree
Hide file tree
Showing 18 changed files with 265 additions and 35 deletions.
45 changes: 45 additions & 0 deletions python/restart_test/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,48 @@ def part2(infinity_obj):
db_obj.drop_table(table_name)

part2()

def test_restart_after_alter_and_checkpoint(self, infinity_runner: InfinityRunner):
table_name = "test_alter4"
config = "test/data/config/restart_test/test_alter/1.toml"

infinity_runner.clear()
uri = common_values.TEST_LOCAL_HOST
data_dir = "/var/infinity/data"

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)

@decorator
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(
table_name,
{
"c1": {"type": "int"},
"c2": {"type": "int"},
"c3": {"type": "varchar"},
},
)
table_obj.insert([{"c1": 1, "c2": 2, "c3": "test"}])

table_obj.add_columns({"c4": {"type": "varchar", "default": "tttt"}})
table_obj.drop_columns(["c2"])

infinity_obj.flush_data()

table_obj.drop_columns(["c3"])

infinity_obj.flush_delta()

part1()

@decorator
def part2(infinity_obj):
dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col")
assert len(list(dropped_column_dirs)) == 0

dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col")
assert len(list(dropped_column_dirs)) == 0

part2()
52 changes: 52 additions & 0 deletions python/restart_test/test_compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,55 @@ def part2(infinity_obj):
pass

part2()

def test_restart_compact_index(self, infinity_runner: InfinityRunner):
config = "test/data/config/restart_test/test_compact/1.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)

table_name = "test_compact1"
dataset_path = "test/data/csv/enwiki_9.csv"
import_options = {"delimiter": "\t", "file_type": "csv"}

@decorator
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(
table_name,
{
"doctitle": {"type": "varchar"},
"docdate": {"type": "varchar"},
"body": {"type": "varchar"},
},
)
table_obj.create_index(
"ft_index", index.IndexInfo("body", index.IndexType.FullText)
)
table_obj.import_data(dataset_path, import_options)
table_obj.import_data(dataset_path, import_options)
table_obj.compact()

infinity_obj.flush_data()

table_obj.import_data(dataset_path, import_options)
table_obj.compact()
infinity_obj.flush_delta()

table_obj.import_data(dataset_path, import_options)
table_obj.compact()


part1()
import_time = 4

@decorator
def part2(infinity_obj):
table_obj = infinity_obj.get_database("default_db").get_table(table_name)
data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result()
count_star = data_dict["count(star)"][0]
assert count_star == 9 * import_time

part2()
25 changes: 25 additions & 0 deletions src/executor/operator/physical_compact_finish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import internal_types;
import infinity_context;
import infinity_exception;
import status;
import txn_store;
import segment_index_entry;

namespace infinity {

Expand Down Expand Up @@ -79,6 +81,29 @@ void PhysicalCompactFinish::SaveSegmentData(QueryContext *query_context, const C
}
LOG_DEBUG(ss.str());

for (const auto &compact_segment_data : compact_state_data->segment_data_list_) {
TxnStore *txn_store = txn->txn_store();
TxnTableStore *txn_table_store = txn_store->GetTxnTableStore(table_entry);
auto index_map = table_entry->IndexMetaMap();
for (const auto &[index_name, index_meta] : *index_map) {
auto [table_index_entry, status] = index_meta->GetEntryNolock(txn->TxnID(), txn->BeginTS());
if (!status.ok()) {
continue;
}
Vector<SegmentIndexEntry *> segment_index_entries;
auto &segment_index_map = table_index_entry->index_by_segment();
for (const auto *old_segment : compact_segment_data.old_segments_) {
auto iter = segment_index_map.find(old_segment->segment_id());
if (iter == segment_index_map.end()) {
continue;
}
auto *segment_index_entry = iter->second.get();
segment_index_entries.push_back(segment_index_entry);
}
txn_table_store->AddSegmentIndexesStore(table_index_entry, std::move(segment_index_entries));
}
}

txn->Compact(table_entry, std::move(segment_data), compact_type_);
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected:
private:
u32 id_;

u32 obj_rc_ = 1;
u32 obj_rc_ = 0;
};

} // namespace infinity
8 changes: 5 additions & 3 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,14 +910,15 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto min_ts = add_segment_index_entry_op->min_ts_;
auto max_ts = add_segment_index_entry_op->max_ts_;
auto next_chunk_id = add_segment_index_entry_op->next_chunk_id_;
auto deprecate_ts = add_segment_index_entry_op->deprecate_ts_;

auto *db_entry = this->GetDatabaseReplay(db_name, txn_id, begin_ts);
auto *table_entry = db_entry->GetTableReplay(table_name, txn_id, begin_ts);

if (auto iter = table_entry->segment_map_.find(segment_id); iter != table_entry->segment_map_.end()) {
auto *table_index_entry = table_entry->GetIndexReplay(index_name, txn_id, begin_ts);
auto *segment_entry = iter->second.get();
if (segment_entry->status() == SegmentStatus::kDeprecated) {
if (merge_flag != MergeFlag::kDelete && segment_entry->status() == SegmentStatus::kDeprecated) {
String error_message = fmt::format("Segment {} is deprecated", segment_id);
UnrecoverableError(error_message);
}
Expand All @@ -930,14 +931,15 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
next_chunk_id,
txn_id,
begin_ts,
commit_ts);
commit_ts,
deprecate_ts);
if (merge_flag == MergeFlag::kNew) {
bool insert_ok = table_index_entry->index_by_segment().insert({segment_id, std::move(segment_index_entry)}).second;
if (!insert_ok) {
String error_message = fmt::format("Segment index {} is already in the catalog", segment_id);
UnrecoverableError(error_message);
}
} else if (merge_flag == MergeFlag::kUpdate) {
} else if (merge_flag == MergeFlag::kUpdate || merge_flag == MergeFlag::kDelete) {
auto iter = table_index_entry->index_by_segment().find(segment_id);
if (iter == table_index_entry->index_by_segment().end()) {
String error_message = fmt::format("Segment index {} is not found", segment_id);
Expand Down
13 changes: 10 additions & 3 deletions src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ BlockColumnEntry::BlockColumnEntry(const BlockColumnEntry &other)
std::shared_lock lock(other.mutex_);
outline_buffers_ = other.outline_buffers_;
last_chunk_offset_ = other.last_chunk_offset_;
}

UniquePtr<BlockColumnEntry> BlockColumnEntry::Clone(BlockEntry *block_entry) const {
auto ret = UniquePtr<BlockColumnEntry>(new BlockColumnEntry(*this));
buffer_->AddObjRc();
for (auto *outline_buffer : outline_buffers_) {
outline_buffer->AddObjRc();
}
}

UniquePtr<BlockColumnEntry> BlockColumnEntry::Clone(BlockEntry *block_entry) const {
auto ret = UniquePtr<BlockColumnEntry>(new BlockColumnEntry(*this));

ret->block_entry_ = block_entry;
return ret;
}
Expand Down Expand Up @@ -105,6 +107,7 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::NewBlockColumnEntry(const BlockEnt
buffer_mgr->persistence_manager());

block_column_entry->buffer_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
block_column_entry->buffer_->AddObjRc();
return block_column_entry;
}

Expand Down Expand Up @@ -132,6 +135,7 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::NewReplayBlockColumnEntry(const Bl
buffer_manager->persistence_manager());

column_entry->buffer_ = buffer_manager->GetBufferObject(std::move(file_worker), true /*restart*/);
column_entry->buffer_->AddObjRc();

if (next_outline_idx > 0) {
SizeT buffer_size = last_chunk_offset;
Expand All @@ -142,6 +146,7 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::NewReplayBlockColumnEntry(const Bl
buffer_size,
buffer_manager->persistence_manager());
auto *buffer_obj = buffer_manager->GetBufferObject(std::move(outline_buffer_file_worker), true /*restart*/);
buffer_obj->AddObjRc();
column_entry->outline_buffers_.push_back(buffer_obj);
}
column_entry->last_chunk_offset_ = last_chunk_offset;
Expand Down Expand Up @@ -171,6 +176,7 @@ ColumnVector BlockColumnEntry::GetColumnVectorInner(BufferManager *buffer_mgr, c
0,
buffer_mgr->persistence_manager());
this->buffer_ = buffer_mgr->GetBufferObject(std::move(file_worker));
buffer_->AddObjRc();
}

ColumnVector column_vector(column_type_);
Expand Down Expand Up @@ -293,6 +299,7 @@ void BlockColumnEntry::Cleanup(CleanupInfoTracer *info_tracer, [[maybe_unused]]
String file_path = outline_buffer->GetFilename();
info_tracer->AddCleanupInfo(std::move(file_path));
}
outline_buffer = nullptr;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/meta/entry/block_column_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public:
void AppendOutlineBuffer(BufferObj *buffer) {
std::unique_lock lock(mutex_);
outline_buffers_.emplace_back(buffer);
buffer->AddObjRc();
}

BufferObj *GetOutlineBuffer(SizeT idx) const {
Expand Down
5 changes: 4 additions & 1 deletion src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ BlockEntry::BlockEntry(const BlockEntry &other)
checkpoint_ts_ = other.checkpoint_ts_;
using_txn_id_ = other.using_txn_id_;
checkpoint_row_count_ = other.checkpoint_row_count_;

version_buffer_object_->AddObjRc();
}

UniquePtr<BlockEntry> BlockEntry::Clone(SegmentEntry *segment_entry) const {
auto ret = UniquePtr<BlockEntry>(new BlockEntry(*this));
version_buffer_object_->AddObjRc();
ret->segment_entry_ = segment_entry;
for (auto &column : columns_) {
ret->columns_.emplace_back(column->Clone(ret.get()));
Expand Down Expand Up @@ -112,6 +113,7 @@ BlockEntry::NewBlockEntry(const SegmentEntry *segment_entry, BlockID block_id, T
block_entry->row_capacity_,
buffer_mgr->persistence_manager());
block_entry->version_buffer_object_ = buffer_mgr->AllocateBufferObject(std::move(version_file_worker));
block_entry->version_buffer_object_->AddObjRc();
return block_entry;
}

Expand Down Expand Up @@ -147,6 +149,7 @@ UniquePtr<BlockEntry> BlockEntry::NewReplayBlockEntry(const SegmentEntry *segmen
row_capacity,
buffer_mgr->persistence_manager());
block_entry->version_buffer_object_ = buffer_mgr->GetBufferObject(std::move(version_file_worker));
block_entry->version_buffer_object_->AddObjRc();

block_entry->checkpoint_ts_ = check_point_ts;
block_entry->checkpoint_row_count_ = checkpoint_row_count;
Expand Down
18 changes: 14 additions & 4 deletions src/storage/meta/entry/chunk_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ ChunkIndexEntry::~ChunkIndexEntry() {}

ChunkIndexEntry::ChunkIndexEntry(const ChunkIndexEntry &other)
: BaseEntry(other), chunk_id_(other.chunk_id_), segment_index_entry_(other.segment_index_entry_), base_name_(other.base_name_),
base_rowid_(other.base_rowid_), row_count_(other.row_count_), deprecate_ts_(other.deprecate_ts_.load()), buffer_obj_(other.buffer_obj_) {}
base_rowid_(other.base_rowid_), row_count_(other.row_count_), deprecate_ts_(other.deprecate_ts_.load()), buffer_obj_(other.buffer_obj_) {
if (buffer_obj_) {
buffer_obj_->AddObjRc();
}
}

UniquePtr<ChunkIndexEntry> ChunkIndexEntry::Clone(SegmentIndexEntry *segment_index_entry) const {
auto ret = UniquePtr<ChunkIndexEntry>(new ChunkIndexEntry(*this));
if (buffer_obj_ != nullptr) {
buffer_obj_->AddObjRc();
}
ret->segment_index_entry_ = segment_index_entry;
return ret;
}
Expand Down Expand Up @@ -110,6 +111,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewHnswIndexChunkIndexEntry(ChunkID
buffer_mgr->persistence_manager(),
index_size);
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand All @@ -132,6 +134,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewFtChunkIndexEntry(SegmentIndexEnt
row_count * sizeof(u32),
buffer_mgr->persistence_manager());
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand Down Expand Up @@ -159,6 +162,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewSecondaryIndexChunkIndexEntry(Chu
row_count,
buffer_mgr->persistence_manager());
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand All @@ -185,6 +189,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewIVFIndexChunkIndexEntry(ChunkID c
column_def,
buffer_mgr->persistence_manager());
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand Down Expand Up @@ -213,6 +218,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewEMVBIndexChunkIndexEntry(ChunkID
segment_start_offset,
buffer_mgr->persistence_manager());
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand Down Expand Up @@ -241,6 +247,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewBMPIndexChunkIndexEntry(ChunkID c
buffer_mgr->persistence_manager(),
index_size);
chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker));
chunk_index_entry->buffer_obj_->AddObjRc();
}
return chunk_index_entry;
}
Expand Down Expand Up @@ -341,6 +348,9 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chu
UnrecoverableError(fmt::format("Unsupported index type: {}", index_base->ToString()));
}
}
if (chunk_index_entry->buffer_obj_) {
chunk_index_entry->buffer_obj_->AddObjRc();
}
chunk_index_entry->commit_ts_ = commit_ts;
chunk_index_entry->deprecate_ts_ = deprecate_ts;
return chunk_index_entry;
Expand Down
Loading

0 comments on commit 6deaa6a

Please sign in to comment.