diff --git a/python/restart_test/test_alter.py b/python/restart_test/test_alter.py index 3810ee6149..507456d698 100644 --- a/python/restart_test/test_alter.py +++ b/python/restart_test/test_alter.py @@ -229,3 +229,48 @@ def part2(infinity_obj): db_obj.drop_table(table_name) part2() + + def test_restart_after_alter_and_checkpoint(self, infinity_runner: InfinityRunner): + table_name = "test_alter4" + config = "test/data/config/restart_test/test_alter/1.toml" + + infinity_runner.clear() + uri = common_values.TEST_LOCAL_HOST + data_dir = "/var/infinity/data" + + decorator = infinity_runner_decorator_factory(config, uri, infinity_runner) + + @decorator + def part1(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + db_obj.drop_table(table_name, ConflictType.Ignore) + table_obj = db_obj.create_table( + table_name, + { + "c1": {"type": "int"}, + "c2": {"type": "int"}, + "c3": {"type": "varchar"}, + }, + ) + table_obj.insert([{"c1": 1, "c2": 2, "c3": "test"}]) + + table_obj.add_columns({"c4": {"type": "varchar", "default": "tttt"}}) + table_obj.drop_columns(["c2"]) + + infinity_obj.flush_data() + + table_obj.drop_columns(["c3"]) + + infinity_obj.flush_delta() + + part1() + + @decorator + def part2(infinity_obj): + dropped_column_dirs = pathlib.Path(data_dir).rglob("1.col") + assert len(list(dropped_column_dirs)) == 0 + + dropped_column_dirs = pathlib.Path(data_dir).rglob("2.col") + assert len(list(dropped_column_dirs)) == 0 + + part2() diff --git a/python/restart_test/test_compact.py b/python/restart_test/test_compact.py index 154a64ddb8..186fbae3a8 100644 --- a/python/restart_test/test_compact.py +++ b/python/restart_test/test_compact.py @@ -37,3 +37,55 @@ def part2(infinity_obj): pass part2() + + def test_restart_compact_index(self, infinity_runner: InfinityRunner): + config = "test/data/config/restart_test/test_compact/1.toml" + uri = common_values.TEST_LOCAL_HOST + infinity_runner.clear() + + decorator = infinity_runner_decorator_factory(config, uri, infinity_runner) + + table_name = "test_compact1" + dataset_path = "test/data/csv/enwiki_9.csv" + import_options = {"delimiter": "\t", "file_type": "csv"} + + @decorator + def part1(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + db_obj.drop_table(table_name, ConflictType.Ignore) + table_obj = db_obj.create_table( + table_name, + { + "doctitle": {"type": "varchar"}, + "docdate": {"type": "varchar"}, + "body": {"type": "varchar"}, + }, + ) + table_obj.create_index( + "ft_index", index.IndexInfo("body", index.IndexType.FullText) + ) + table_obj.import_data(dataset_path, import_options) + table_obj.import_data(dataset_path, import_options) + table_obj.compact() + + infinity_obj.flush_data() + + table_obj.import_data(dataset_path, import_options) + table_obj.compact() + infinity_obj.flush_delta() + + table_obj.import_data(dataset_path, import_options) + table_obj.compact() + + + part1() + import_time = 4 + + @decorator + def part2(infinity_obj): + table_obj = infinity_obj.get_database("default_db").get_table(table_name) + data_dict, data_type_dict, _ = table_obj.output(["count(*)"]).to_result() + count_star = data_dict["count(star)"][0] + assert count_star == 9 * import_time + + part2() diff --git a/src/executor/operator/physical_compact_finish.cpp b/src/executor/operator/physical_compact_finish.cpp index 852dbae8fa..341352b57e 100644 --- a/src/executor/operator/physical_compact_finish.cpp +++ b/src/executor/operator/physical_compact_finish.cpp @@ -33,6 +33,8 @@ import internal_types; import infinity_context; import infinity_exception; import status; +import txn_store; +import segment_index_entry; namespace infinity { @@ -79,6 +81,29 @@ void PhysicalCompactFinish::SaveSegmentData(QueryContext *query_context, const C } LOG_DEBUG(ss.str()); + for (const auto &compact_segment_data : compact_state_data->segment_data_list_) { + TxnStore *txn_store = txn->txn_store(); + TxnTableStore *txn_table_store = txn_store->GetTxnTableStore(table_entry); + auto index_map = table_entry->IndexMetaMap(); + for (const auto &[index_name, index_meta] : *index_map) { + auto [table_index_entry, status] = index_meta->GetEntryNolock(txn->TxnID(), txn->BeginTS()); + if (!status.ok()) { + continue; + } + Vector segment_index_entries; + auto &segment_index_map = table_index_entry->index_by_segment(); + for (const auto *old_segment : compact_segment_data.old_segments_) { + auto iter = segment_index_map.find(old_segment->segment_id()); + if (iter == segment_index_map.end()) { + continue; + } + auto *segment_index_entry = iter->second.get(); + segment_index_entries.push_back(segment_index_entry); + } + txn_table_store->AddSegmentIndexesStore(table_index_entry, std::move(segment_index_entries)); + } + } + txn->Compact(table_entry, std::move(segment_data), compact_type_); } diff --git a/src/storage/buffer/buffer_obj.cppm b/src/storage/buffer/buffer_obj.cppm index 45c79a00de..909b1f2d9a 100644 --- a/src/storage/buffer/buffer_obj.cppm +++ b/src/storage/buffer/buffer_obj.cppm @@ -158,7 +158,7 @@ protected: private: u32 id_; - u32 obj_rc_ = 1; + u32 obj_rc_ = 0; }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 958f562f3c..71d7347897 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -910,6 +910,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe auto min_ts = add_segment_index_entry_op->min_ts_; auto max_ts = add_segment_index_entry_op->max_ts_; auto next_chunk_id = add_segment_index_entry_op->next_chunk_id_; + auto deprecate_ts = add_segment_index_entry_op->deprecate_ts_; auto *db_entry = this->GetDatabaseReplay(db_name, txn_id, begin_ts); auto *table_entry = db_entry->GetTableReplay(table_name, txn_id, begin_ts); @@ -917,7 +918,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe if (auto iter = table_entry->segment_map_.find(segment_id); iter != table_entry->segment_map_.end()) { auto *table_index_entry = table_entry->GetIndexReplay(index_name, txn_id, begin_ts); auto *segment_entry = iter->second.get(); - if (segment_entry->status() == SegmentStatus::kDeprecated) { + if (merge_flag != MergeFlag::kDelete && segment_entry->status() == SegmentStatus::kDeprecated) { String error_message = fmt::format("Segment {} is deprecated", segment_id); UnrecoverableError(error_message); } @@ -930,14 +931,15 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe next_chunk_id, txn_id, begin_ts, - commit_ts); + commit_ts, + deprecate_ts); if (merge_flag == MergeFlag::kNew) { bool insert_ok = table_index_entry->index_by_segment().insert({segment_id, std::move(segment_index_entry)}).second; if (!insert_ok) { String error_message = fmt::format("Segment index {} is already in the catalog", segment_id); UnrecoverableError(error_message); } - } else if (merge_flag == MergeFlag::kUpdate) { + } else if (merge_flag == MergeFlag::kUpdate || merge_flag == MergeFlag::kDelete) { auto iter = table_index_entry->index_by_segment().find(segment_id); if (iter == table_index_entry->index_by_segment().end()) { String error_message = fmt::format("Segment index {} is not found", segment_id); diff --git a/src/storage/meta/entry/block_column_entry.cpp b/src/storage/meta/entry/block_column_entry.cpp index 389630d766..cb00683947 100644 --- a/src/storage/meta/entry/block_column_entry.cpp +++ b/src/storage/meta/entry/block_column_entry.cpp @@ -67,14 +67,16 @@ BlockColumnEntry::BlockColumnEntry(const BlockColumnEntry &other) std::shared_lock lock(other.mutex_); outline_buffers_ = other.outline_buffers_; last_chunk_offset_ = other.last_chunk_offset_; -} -UniquePtr BlockColumnEntry::Clone(BlockEntry *block_entry) const { - auto ret = UniquePtr(new BlockColumnEntry(*this)); buffer_->AddObjRc(); for (auto *outline_buffer : outline_buffers_) { outline_buffer->AddObjRc(); } +} + +UniquePtr BlockColumnEntry::Clone(BlockEntry *block_entry) const { + auto ret = UniquePtr(new BlockColumnEntry(*this)); + ret->block_entry_ = block_entry; return ret; } @@ -105,6 +107,7 @@ UniquePtr BlockColumnEntry::NewBlockColumnEntry(const BlockEnt buffer_mgr->persistence_manager()); block_column_entry->buffer_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + block_column_entry->buffer_->AddObjRc(); return block_column_entry; } @@ -132,6 +135,7 @@ UniquePtr BlockColumnEntry::NewReplayBlockColumnEntry(const Bl buffer_manager->persistence_manager()); column_entry->buffer_ = buffer_manager->GetBufferObject(std::move(file_worker), true /*restart*/); + column_entry->buffer_->AddObjRc(); if (next_outline_idx > 0) { SizeT buffer_size = last_chunk_offset; @@ -142,6 +146,7 @@ UniquePtr BlockColumnEntry::NewReplayBlockColumnEntry(const Bl buffer_size, buffer_manager->persistence_manager()); auto *buffer_obj = buffer_manager->GetBufferObject(std::move(outline_buffer_file_worker), true /*restart*/); + buffer_obj->AddObjRc(); column_entry->outline_buffers_.push_back(buffer_obj); } column_entry->last_chunk_offset_ = last_chunk_offset; @@ -171,6 +176,7 @@ ColumnVector BlockColumnEntry::GetColumnVectorInner(BufferManager *buffer_mgr, c 0, buffer_mgr->persistence_manager()); this->buffer_ = buffer_mgr->GetBufferObject(std::move(file_worker)); + buffer_->AddObjRc(); } ColumnVector column_vector(column_type_); @@ -293,6 +299,7 @@ void BlockColumnEntry::Cleanup(CleanupInfoTracer *info_tracer, [[maybe_unused]] String file_path = outline_buffer->GetFilename(); info_tracer->AddCleanupInfo(std::move(file_path)); } + outline_buffer = nullptr; } } } diff --git a/src/storage/meta/entry/block_column_entry.cppm b/src/storage/meta/entry/block_column_entry.cppm index 6f03c4177f..a1d94d96ab 100644 --- a/src/storage/meta/entry/block_column_entry.cppm +++ b/src/storage/meta/entry/block_column_entry.cppm @@ -100,6 +100,7 @@ public: void AppendOutlineBuffer(BufferObj *buffer) { std::unique_lock lock(mutex_); outline_buffers_.emplace_back(buffer); + buffer->AddObjRc(); } BufferObj *GetOutlineBuffer(SizeT idx) const { diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index cb07fc5dfe..4d44c9a68b 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -75,11 +75,12 @@ BlockEntry::BlockEntry(const BlockEntry &other) checkpoint_ts_ = other.checkpoint_ts_; using_txn_id_ = other.using_txn_id_; checkpoint_row_count_ = other.checkpoint_row_count_; + + version_buffer_object_->AddObjRc(); } UniquePtr BlockEntry::Clone(SegmentEntry *segment_entry) const { auto ret = UniquePtr(new BlockEntry(*this)); - version_buffer_object_->AddObjRc(); ret->segment_entry_ = segment_entry; for (auto &column : columns_) { ret->columns_.emplace_back(column->Clone(ret.get())); @@ -112,6 +113,7 @@ BlockEntry::NewBlockEntry(const SegmentEntry *segment_entry, BlockID block_id, T block_entry->row_capacity_, buffer_mgr->persistence_manager()); block_entry->version_buffer_object_ = buffer_mgr->AllocateBufferObject(std::move(version_file_worker)); + block_entry->version_buffer_object_->AddObjRc(); return block_entry; } @@ -147,6 +149,7 @@ UniquePtr BlockEntry::NewReplayBlockEntry(const SegmentEntry *segmen row_capacity, buffer_mgr->persistence_manager()); block_entry->version_buffer_object_ = buffer_mgr->GetBufferObject(std::move(version_file_worker)); + block_entry->version_buffer_object_->AddObjRc(); block_entry->checkpoint_ts_ = check_point_ts; block_entry->checkpoint_row_count_ = checkpoint_row_count; diff --git a/src/storage/meta/entry/chunk_index_entry.cpp b/src/storage/meta/entry/chunk_index_entry.cpp index 2af34b4647..6c12c170e5 100644 --- a/src/storage/meta/entry/chunk_index_entry.cpp +++ b/src/storage/meta/entry/chunk_index_entry.cpp @@ -73,13 +73,14 @@ ChunkIndexEntry::~ChunkIndexEntry() {} ChunkIndexEntry::ChunkIndexEntry(const ChunkIndexEntry &other) : BaseEntry(other), chunk_id_(other.chunk_id_), segment_index_entry_(other.segment_index_entry_), base_name_(other.base_name_), - base_rowid_(other.base_rowid_), row_count_(other.row_count_), deprecate_ts_(other.deprecate_ts_.load()), buffer_obj_(other.buffer_obj_) {} + base_rowid_(other.base_rowid_), row_count_(other.row_count_), deprecate_ts_(other.deprecate_ts_.load()), buffer_obj_(other.buffer_obj_) { + if (buffer_obj_) { + buffer_obj_->AddObjRc(); + } +} UniquePtr ChunkIndexEntry::Clone(SegmentIndexEntry *segment_index_entry) const { auto ret = UniquePtr(new ChunkIndexEntry(*this)); - if (buffer_obj_ != nullptr) { - buffer_obj_->AddObjRc(); - } ret->segment_index_entry_ = segment_index_entry; return ret; } @@ -110,6 +111,7 @@ SharedPtr ChunkIndexEntry::NewHnswIndexChunkIndexEntry(ChunkID buffer_mgr->persistence_manager(), index_size); chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -132,6 +134,7 @@ SharedPtr ChunkIndexEntry::NewFtChunkIndexEntry(SegmentIndexEnt row_count * sizeof(u32), buffer_mgr->persistence_manager()); chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -159,6 +162,7 @@ SharedPtr ChunkIndexEntry::NewSecondaryIndexChunkIndexEntry(Chu row_count, buffer_mgr->persistence_manager()); chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -185,6 +189,7 @@ SharedPtr ChunkIndexEntry::NewIVFIndexChunkIndexEntry(ChunkID c column_def, buffer_mgr->persistence_manager()); chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -213,6 +218,7 @@ SharedPtr ChunkIndexEntry::NewEMVBIndexChunkIndexEntry(ChunkID segment_start_offset, buffer_mgr->persistence_manager()); chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -241,6 +247,7 @@ SharedPtr ChunkIndexEntry::NewBMPIndexChunkIndexEntry(ChunkID c buffer_mgr->persistence_manager(), index_size); chunk_index_entry->buffer_obj_ = buffer_mgr->AllocateBufferObject(std::move(file_worker)); + chunk_index_entry->buffer_obj_->AddObjRc(); } return chunk_index_entry; } @@ -341,6 +348,9 @@ SharedPtr ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chu UnrecoverableError(fmt::format("Unsupported index type: {}", index_base->ToString())); } } + if (chunk_index_entry->buffer_obj_) { + chunk_index_entry->buffer_obj_->AddObjRc(); + } chunk_index_entry->commit_ts_ = commit_ts; chunk_index_entry->deprecate_ts_ = deprecate_ts; return chunk_index_entry; diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index ecd55660c2..e1603a74ad 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -154,7 +154,8 @@ SharedPtr SegmentIndexEntry::NewReplaySegmentIndexEntry(Table u32 next_chunk_id, TransactionID txn_id, TxnTimeStamp begin_ts, - TxnTimeStamp commit_ts) { + TxnTimeStamp commit_ts, + TxnTimeStamp deprecate_ts) { auto [segment_row_count, status] = table_entry->GetSegmentRowCountBySegmentID(segment_id); if (!status.ok()) { UnrecoverableError(status.message()); @@ -171,6 +172,7 @@ SharedPtr SegmentIndexEntry::NewReplaySegmentIndexEntry(Table segment_index_entry->commit_ts_.store(commit_ts); segment_index_entry->buffer_manager_ = buffer_manager; + segment_index_entry->deprecate_ts_.store(deprecate_ts); return segment_index_entry; } @@ -181,6 +183,7 @@ void SegmentIndexEntry::UpdateSegmentIndexReplay(SharedPtr ne min_ts_ = new_entry->min_ts_; max_ts_ = new_entry->max_ts_; next_chunk_id_ = new_entry->next_chunk_id_.load(); + deprecate_ts_ = new_entry->deprecate_ts_.load(); } // String SegmentIndexEntry::IndexFileName(SegmentID segment_id) { return fmt::format("seg{}.idx", segment_id); } @@ -1175,6 +1178,7 @@ nlohmann::json SegmentIndexEntry::Serialize(TxnTimeStamp max_commit_ts) { } index_entry_json["ft_column_len_sum"] = this->ft_column_len_sum_; index_entry_json["ft_column_len_cnt"] = this->ft_column_len_cnt_; + index_entry_json["deprecate_ts"] = this->deprecate_ts_.load(); } return index_entry_json; @@ -1210,6 +1214,13 @@ UniquePtr SegmentIndexEntry::Deserialize(const nlohmann::json segment_index_entry->ft_column_len_sum_ = index_entry_json["ft_column_len_sum"]; segment_index_entry->ft_column_len_cnt_ = index_entry_json["ft_column_len_cnt"]; + if (index_entry_json["deprecate_ts"].is_null() || index_entry_json["deprecate_ts"] == UNCOMMIT_TS) { + segment_index_entry->deleted_ = false; + } else { + segment_index_entry->deleted_ = true; + segment_index_entry->deprecate_ts_ = index_entry_json["deprecate_ts"]; + } + return segment_index_entry; } @@ -1233,4 +1244,13 @@ Pair SegmentIndexEntry::GetFulltextColumnLenInfo() { void SegmentIndexEntry::SetMemoryIndexer(UniquePtr &&memory_indexer) { memory_indexer_ = std::move(memory_indexer); } +void SegmentIndexEntry::SetDeprecated(TxnTimeStamp deprecate_ts) { + std::unique_lock lock(rw_locker_); + for (auto &chunk_index_entry : chunk_index_entries_) { + chunk_index_entry->DeprecateChunk(deprecate_ts); + } + this->deleted_ = true; + this->deprecate_ts_ = deprecate_ts; +} + } // namespace infinity diff --git a/src/storage/meta/entry/segment_index_entry.cppm b/src/storage/meta/entry/segment_index_entry.cppm index cd9170a992..7d3cd87894 100644 --- a/src/storage/meta/entry/segment_index_entry.cppm +++ b/src/storage/meta/entry/segment_index_entry.cppm @@ -73,7 +73,8 @@ public: u32 next_chunk_id, TransactionID txn_id, TxnTimeStamp begin_ts, - TxnTimeStamp commit_ts); + TxnTimeStamp commit_ts, + TxnTimeStamp deprecate_ts); void UpdateSegmentIndexReplay(SharedPtr new_entry); @@ -110,6 +111,7 @@ public: inline TxnTimeStamp min_ts() const { return min_ts_; } inline TxnTimeStamp max_ts() const { return max_ts_; } inline ChunkID next_chunk_id() const { return next_chunk_id_; } + inline TxnTimeStamp deprecate_ts() const { return deprecate_ts_; } SharedPtr index_dir() const { return index_dir_; } // MemIndexInsert is non-blocking. Caller must ensure there's no RowID gap between each call. @@ -230,6 +232,13 @@ public: ChunkID GetNextChunkID() { return next_chunk_id_++; } + void SetDeprecated(TxnTimeStamp deprecate_ts); + + bool CheckDeprecate(TxnTimeStamp ts) { + TxnTimeStamp deprecate_ts = deprecate_ts_.load(); + return ts >= deprecate_ts; + } + private: explicit SegmentIndexEntry(TableIndexEntry *table_index_entry, SegmentID segment_id); @@ -265,6 +274,7 @@ private: u64 ft_column_len_sum_{}; // increase only u32 ft_column_len_cnt_{}; // increase only + Atomic deprecate_ts_ = UNCOMMIT_TS; public: bool TrySetOptimizing(); diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 80b876149f..faddca5f30 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -573,6 +573,7 @@ Status TableEntry::CommitCompact(TransactionID txn_id, TxnTimeStamp commit_ts, T auto [table_index_entry, status] = table_index_meta->GetEntryNolock(txn_id, commit_ts); if (!status.ok()) continue; + table_index_entry->CommitCompact(txn_id, commit_ts, compact_store); const IndexBase *index_base = table_index_entry->index_base(); switch (index_base->index_type_) { case IndexType::kFullText: { @@ -828,7 +829,8 @@ void TableEntry::MemIndexRecover(BufferManager *buffer_manager, TxnTimeStamp ts) 0 /*next_chunk_id*/, 0 /*txn_id*/, ts /*begin_ts*/, - ts /*commit_ts*/); + ts /*commit_ts*/, + UNCOMMIT_TS /*deprecate_ts*/); table_index_entry->index_by_segment().emplace(segment_id, segment_index_entry); } else { segment_index_entry = iter->second; diff --git a/src/storage/meta/entry/table_index_entry.cpp b/src/storage/meta/entry/table_index_entry.cpp index 366668d74b..03891d8709 100644 --- a/src/storage/meta/entry/table_index_entry.cpp +++ b/src/storage/meta/entry/table_index_entry.cpp @@ -206,6 +206,29 @@ void TableIndexEntry::CommitCreateIndex(TxnIndexStore *txn_index_store, TxnTimeS // } // } +void TableIndexEntry::CommitCompact([[maybe_unused]] TransactionID txn_id, TxnTimeStamp commit_ts, TxnCompactStore &compact_store) { + std::unique_lock w_lock(rw_locker_); + for (const auto &[segment_store, old_segments] : compact_store.compact_data_) { + auto *segment_entry = segment_store.segment_entry_; + + auto iter = index_by_segment_.find(segment_entry->segment_id()); + if (iter == index_by_segment_.end()) { + continue; + } + [[maybe_unused]] auto *segment_index_entry = iter->second.get(); + + for (auto *old_segment : old_segments) { + auto iter = index_by_segment_.find(old_segment->segment_id()); + if (iter == index_by_segment_.end()) { + continue; + } + auto *old_segment_index_entry = iter->second.get(); + old_segment_index_entry->SetDeprecated(commit_ts); + } + } +} + + nlohmann::json TableIndexEntry::Serialize(TxnTimeStamp max_commit_ts) { nlohmann::json json; @@ -225,7 +248,7 @@ nlohmann::json TableIndexEntry::Serialize(TxnTimeStamp max_commit_ts) { std::shared_lock r_lock(rw_locker_); for (const auto &[segment_id, index_entry] : this->index_by_segment_) { - if (index_entry->commit_ts_ <= max_commit_ts) { + if (index_entry->commit_ts_ <= max_commit_ts && !index_entry->deleted_) { segment_index_entry_candidates.push_back(index_entry); } } @@ -455,9 +478,17 @@ Vector TableIndexEntry::GetFilePath(TransactionID txn_id, TxnTimeStamp b } void TableIndexEntry::PickCleanup(CleanupScanner *scanner) { + TxnTimeStamp visible_ts = scanner->visible_ts(); std::shared_lock r_lock(rw_locker_); - for (auto &[segment_id, segment_index_entry] : index_by_segment_) { - segment_index_entry->PickCleanup(scanner); + for (auto iter = index_by_segment_.begin(); iter != index_by_segment_.end();) { + auto &[segment_id, segment_index_entry] = *iter; + if (segment_index_entry->CheckDeprecate(visible_ts)) { + scanner->AddEntry(std::move(segment_index_entry)); + iter = index_by_segment_.erase(iter); + } else { + segment_index_entry->PickCleanup(scanner); + ++iter; + } } } diff --git a/src/storage/meta/entry/table_index_entry.cppm b/src/storage/meta/entry/table_index_entry.cppm index 729b001b8e..e3ae5e20e4 100644 --- a/src/storage/meta/entry/table_index_entry.cppm +++ b/src/storage/meta/entry/table_index_entry.cppm @@ -145,6 +145,8 @@ public: void CommitCreateIndex(TxnIndexStore *txn_index_store, TxnTimeStamp commit_ts, bool is_replay = false); + void CommitCompact(TransactionID txn_id, TxnTimeStamp commit_ts, TxnCompactStore &compact_store); + // void RollbackPopulateIndex(TxnIndexStore *txn_index_store, Txn *txn); // replay diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index b39a0a1180..2f3c3dc9e5 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -362,6 +362,24 @@ MergeFlag CatalogDeltaOperation::NextDeleteFlag(MergeFlag new_merge_flag) const return MergeFlag::kInvalid; }; +void CatalogDeltaOperation::CheckDelete() { + if (type_ == CatalogDeltaOpType::ADD_SEGMENT_ENTRY) { + auto *add_segment_op = static_cast(this); + if (add_segment_op->status_ == SegmentStatus::kDeprecated) { + add_segment_op->merge_flag_ = MergeFlag::kDelete; + } + } else if (type_ == CatalogDeltaOpType::ADD_CHUNK_INDEX_ENTRY) { + auto *add_chunk_index_op = static_cast(this); + if (add_chunk_index_op->deprecate_ts_ != UNCOMMIT_TS) { + add_chunk_index_op->merge_flag_ = MergeFlag::kDelete; + LOG_DEBUG(fmt::format("Delete chunk: {} at {}", *encode_, add_chunk_index_op->deprecate_ts_)); + } + } else if (type_ == CatalogDeltaOpType::ADD_SEGMENT_INDEX_ENTRY) { + [[maybe_unused]] auto *add_segment_index_op = static_cast(this); + } +} + + AddDBEntryOp::AddDBEntryOp(DBEntry *db_entry, TxnTimeStamp commit_ts) : CatalogDeltaOperation(CatalogDeltaOpType::ADD_DATABASE_ENTRY, db_entry, commit_ts), db_entry_dir_(db_entry->db_entry_dir()), comment_(db_entry->db_comment_ptr()) {} @@ -402,7 +420,8 @@ AddTableIndexEntryOp::AddTableIndexEntryOp(TableIndexEntry *table_index_entry, T AddSegmentIndexEntryOp::AddSegmentIndexEntryOp(SegmentIndexEntry *segment_index_entry, TxnTimeStamp commit_ts) : CatalogDeltaOperation(CatalogDeltaOpType::ADD_SEGMENT_INDEX_ENTRY, segment_index_entry, commit_ts), segment_index_entry_(segment_index_entry), - min_ts_(segment_index_entry->min_ts()), max_ts_(segment_index_entry->max_ts()), next_chunk_id_(segment_index_entry->next_chunk_id()) {} + min_ts_(segment_index_entry->min_ts()), max_ts_(segment_index_entry->max_ts()), next_chunk_id_(segment_index_entry->next_chunk_id()), + deprecate_ts_(segment_index_entry->deprecate_ts()) {} AddChunkIndexEntryOp::AddChunkIndexEntryOp(ChunkIndexEntry *chunk_index_entry, TxnTimeStamp commit_ts) : CatalogDeltaOperation(CatalogDeltaOpType::ADD_CHUNK_INDEX_ENTRY, chunk_index_entry, commit_ts), base_name_(chunk_index_entry->base_name_), @@ -539,6 +558,7 @@ UniquePtr AddSegmentIndexEntryOp::ReadAdv(const char *&p add_segment_index_op->min_ts_ = ReadBufAdv(ptr); add_segment_index_op->max_ts_ = ReadBufAdv(ptr); add_segment_index_op->next_chunk_id_ = ReadBufAdv(ptr); + add_segment_index_op->deprecate_ts_ = ReadBufAdv(ptr); return add_segment_index_op; } @@ -624,6 +644,7 @@ SizeT AddSegmentIndexEntryOp::GetSizeInBytes() const { auto total_size = sizeof(CatalogDeltaOpType) + GetBaseSizeInBytes(); total_size += sizeof(TxnTimeStamp) + sizeof(TxnTimeStamp); total_size += sizeof(ChunkID); + total_size += sizeof(TxnTimeStamp); return total_size; } @@ -719,6 +740,7 @@ void AddSegmentIndexEntryOp::WriteAdv(char *&buf) const { WriteBufAdv(buf, this->min_ts_); WriteBufAdv(buf, this->max_ts_); WriteBufAdv(buf, this->next_chunk_id_); + WriteBufAdv(buf, this->deprecate_ts_); } void AddChunkIndexEntryOp::WriteAdv(char *&buf) const { @@ -799,11 +821,12 @@ const String AddTableIndexEntryOp::ToString() const { } const String AddSegmentIndexEntryOp::ToString() const { - return fmt::format("AddSegmentIndexEntryOp {} min_ts: {} max_ts: {}, next_chunk_id: {}", + return fmt::format("AddSegmentIndexEntryOp {} min_ts: {} max_ts: {}, next_chunk_id: {}, deprecate_ts: {}", CatalogDeltaOperation::ToString(), min_ts_, max_ts_, - next_chunk_id_); + next_chunk_id_, + deprecate_ts_); } const String AddChunkIndexEntryOp::ToString() const { @@ -868,7 +891,7 @@ bool AddTableIndexEntryOp::operator==(const CatalogDeltaOperation &rhs) const { bool AddSegmentIndexEntryOp::operator==(const CatalogDeltaOperation &rhs) const { auto *rhs_op = dynamic_cast(&rhs); return rhs_op != nullptr && CatalogDeltaOperation::operator==(rhs) && min_ts_ == rhs_op->min_ts_ && max_ts_ == rhs_op->max_ts_ && - next_chunk_id_ == rhs_op->next_chunk_id_; + next_chunk_id_ == rhs_op->next_chunk_id_ && deprecate_ts_ == rhs_op->deprecate_ts_; } bool AddChunkIndexEntryOp::operator==(const CatalogDeltaOperation &rhs) const { @@ -1242,18 +1265,8 @@ void GlobalCatalogDeltaEntry::AddDeltaEntryInner(CatalogDeltaEntry *delta_entry) max_commit_ts_ = std::max(max_commit_ts_, max_commit_ts); for (auto &new_op : delta_entry->operations()) { - if (new_op->type_ == CatalogDeltaOpType::ADD_SEGMENT_ENTRY) { - auto *add_segment_op = static_cast(new_op.get()); - if (add_segment_op->status_ == SegmentStatus::kDeprecated) { - add_segment_op->merge_flag_ = MergeFlag::kDelete; - } - } else if (new_op->type_ == CatalogDeltaOpType::ADD_CHUNK_INDEX_ENTRY) { - auto *add_chunk_index_op = static_cast(new_op.get()); - if (add_chunk_index_op->deprecate_ts_ != UNCOMMIT_TS) { - add_chunk_index_op->merge_flag_ = MergeFlag::kDelete; - LOG_DEBUG(fmt::format("Delete chunk: {} at {}", *new_op->encode_, add_chunk_index_op->deprecate_ts_)); - } - } + new_op->CheckDelete(); + const String &encode = *new_op->encode_; if (encode.empty()) { String error_message = "encode is empty"; diff --git a/src/storage/wal/catalog_delta_entry.cppm b/src/storage/wal/catalog_delta_entry.cppm index dd06550778..798c5a5201 100644 --- a/src/storage/wal/catalog_delta_entry.cppm +++ b/src/storage/wal/catalog_delta_entry.cppm @@ -112,6 +112,8 @@ public: MergeFlag NextDeleteFlag(MergeFlag new_merge_flag) const; + void CheckDelete(); + public: TxnTimeStamp begin_ts_{0}; TransactionID txn_id_{0}; @@ -306,6 +308,7 @@ public: TxnTimeStamp min_ts_{0}; TxnTimeStamp max_ts_{0}; ChunkID next_chunk_id_{0}; + TxnTimeStamp deprecate_ts_{0}; }; /// class AddSegmentColumnEntryOperation diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index d49d8a1c56..59a52289fa 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -1330,7 +1330,8 @@ void WalManager::WalCmdDumpIndexReplay(WalCmdDumpIndex &cmd, TransactionID txn_i 0 /*next_chunk_id*/, txn_id /*txn_id*/, commit_ts /*begin_ts*/, - commit_ts); + commit_ts, + UNCOMMIT_TS /*deprecate_ts*/); index_by_segment.emplace(cmd.segment_id_, segment_index_entry_ptr); segment_index_entry = segment_index_entry_ptr.get(); } diff --git a/src/unit_test/storage/buffer/buffer_manager.cpp b/src/unit_test/storage/buffer/buffer_manager.cpp index 0809454274..a7d58b5b72 100644 --- a/src/unit_test/storage/buffer/buffer_manager.cpp +++ b/src/unit_test/storage/buffer/buffer_manager.cpp @@ -116,6 +116,7 @@ TEST_F(BufferManagerTest, cleanup_test) { auto file_name = MakeShared(fmt::format("file_{}", i)); auto file_worker = MakeUnique(data_dir_, temp_dir_, MakeShared(""), file_name, file_size, buffer_mgr.persistence_manager()); auto *buffer_obj = buffer_mgr.AllocateBufferObject(std::move(file_worker)); + buffer_obj->AddObjRc(); buffer_objs.push_back(buffer_obj); { auto buffer_handle = buffer_obj->Load(); @@ -374,6 +375,7 @@ class Test1Obj : public TestObj { file_info.file_size_ = file_size; auto file_worker = MakeUnique(data_dir_, temp_dir_, MakeShared(""), file_name, file_size, nullptr); file_info.buffer_obj_ = buffer_mgr_->AllocateBufferObject(std::move(file_worker)); + file_info.buffer_obj_->AddObjRc(); } else { auto file_worker = MakeUnique(data_dir_, temp_dir_, MakeShared(""), file_name, file_info.file_size_, nullptr); file_info.buffer_obj_ = buffer_mgr_->GetBufferObject(std::move(file_worker)); @@ -449,6 +451,7 @@ class Test2Obj : public TestObj { if (alloc_new) { auto file_worker = MakeUnique(data_dir_, temp_dir_, MakeShared(""), file_name, 0, nullptr); file_info.buffer_obj_ = buffer_mgr_->AllocateBufferObject(std::move(file_worker)); + file_info.buffer_obj_->AddObjRc(); } else { auto file_worker = MakeUnique(data_dir_, temp_dir_, MakeShared(""), file_name, file_info.file_size_, nullptr); file_info.buffer_obj_ = buffer_mgr_->GetBufferObject(std::move(file_worker));