Skip to content

Commit

Permalink
Fix: restart from compact and add test case. (#2363)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix bug in
https://github.com/infiniflow/infinity/actions/runs/12276806461/job/34254792578.
and add test case
Fix bug in
https://github.com/infiniflow/infinity/actions/runs/12287163577/job/34288693113
by rewrite test.
Revert change of config file of ci test.

Fix bug in
https://github.com/infiniflow/infinity/actions/runs/12291689168/job/34300899495
by change full checkpoint ts.
Fix infinity shutdown bug.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Dec 13, 2024
1 parent ba8dc1c commit 861c142
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 29 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ jobs:
echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV
echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV
sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s
if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then
echo "Minio container is not running"
exit 1
fi
- name: Start infinity debug version with minio
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -291,6 +295,10 @@ jobs:
echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV
echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV
sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s
if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then
echo "Minio container is not running"
exit 1
fi
- name: Start infinity release version with minio
if: ${{ !cancelled() && !failure() }}
Expand Down
2 changes: 0 additions & 2 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ log_level = "trace"

[storage]
persistence_dir = "/var/infinity/persistence"
compact_interval = "10s"
cleanup_interval = "0s"

[buffer]
buffer_manager_size = "8GB"
Expand Down
27 changes: 23 additions & 4 deletions python/restart_test/infinity_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,23 @@ def init(self, config_path: str | None = None):
self.process = subprocess.Popen(cmd, shell=True, env=my_env)
self.i += 1

def uninit(self):
def uninit(self, kill: bool = False, timeout: int = 60):
if self.process is None:
return
timeout = 60
pids = []
for child in psutil.Process(self.process.pid).children(recursive=True):
pids.append(child.pid)
if len(pids) == 0:
raise Exception("Cannot find infinity process.")

if kill:
os.system(f"kill -9 {' '.join(map(str, pids))}")
time.sleep(1)
while any(psutil.pid_exists(pid) for pid in pids):
time.sleep(1)
self.process = None
return

ret = os.system(f"bash {self.script_path} {timeout} {' '.join(map(str, pids))}")
if ret != 0:
raise Exception("An error occurred.")
Expand Down Expand Up @@ -122,7 +130,14 @@ def connect(self, uri: str):


def infinity_runner_decorator_factory(
config_path: str | None, uri: str, infinity_runner: InfinityRunner, shutdown_out: bool = False
config_path: str | None,
uri: str,
infinity_runner: InfinityRunner,
*,
shutdown_out: bool = False,
kill: bool = False,
terminate_timeout: int = 60,
check_kill: bool = True
):
def decorator(f):
def wrapper(*args, **kwargs):
Expand All @@ -136,7 +151,11 @@ def wrapper(*args, **kwargs):
except Exception:
if not shutdown_out:
raise
infinity_runner.uninit()
try:
infinity_runner.uninit(kill, terminate_timeout)
except Exception:
if check_kill:
raise

return wrapper

Expand Down
56 changes: 55 additions & 1 deletion python/restart_test/test_compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def part1(infinity_obj):
table_obj.import_data(dataset_path, import_options)
table_obj.compact()


part1()
import_time = 4

Expand All @@ -89,3 +88,58 @@ def part2(infinity_obj):
assert count_star == 9 * import_time

part2()

def test_compact_restart_repeatedly(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_compact/1.toml"
config2 = "test/data/config/restart_test/test_compact/2.toml"

uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

table_name = "test_compact2"
import_path = "test/data/csv/embedding_int_dim3.csv"
import_num = 1000
import_options = None
kill_num = 10
file_lines = 3

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(
table_name,
{
"col1": {"type": "int"},
"col2": {"type": "vector, 3, float"},
},
)

part1()

import_n = 0

@decorator2
def part2(infinity_obj):
nonlocal import_n
table_obj = infinity_obj.get_database("default_db").get_table(table_name)
data_dict, _, _ = table_obj.output(["count(*)"]).to_result()
count_star = data_dict["count(star)"][0]
assert count_star == import_n * file_lines

for i in range(import_num):
table_obj.import_data(import_path, import_options)
import_n += 1

for i in range(kill_num):
part2()

@decorator1
def part3(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name)

part3()
2 changes: 1 addition & 1 deletion python/restart_test/test_fulltext.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_fulltext(self, infinity_runner: InfinityRunner, config: str):
infinity_runner.clear()

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator2 = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

@decorator
def part1(infinity_obj):
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def insert_inner(
shutdown = False
error = False

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

def insert_func(table_obj):
nonlocal cur_insert_n, shutdown, error
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_insert_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def insert_import_inner(
logger = infinity_runner.logger
write_i = 0

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

def insert_import_func(table_obj):
nonlocal cur_n, insert_finish, shutdown, error, write_i
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_shutdown_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_shutdown_pytest(self, infinity_runner: InfinityRunner, pytest_mark: str
gen = self.run_pytest_seperately(test_dir, pytest_mark=pytest_mark)

decorator = infinity_runner_decorator_factory(
config, uri, infinity_runner, True
config, uri, infinity_runner, shutdown_out=True
)

def shutdown_func():
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void FileWorker::ReadFromFile(bool from_spill) {
SizeT file_size = 0;
auto [file_handle, status] = VirtualStore::Open(read_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
UnrecoverableError(fmt::format("Read path: {}, error: {}", read_path, status.message()));
}
if (use_object_cache) {
file_handle->Seek(obj_addr_.part_offset_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto begin_ts = op->begin_ts_;
std::string_view encode = *op->encode_;
MergeFlag merge_flag = op->merge_flag_;
if (op->commit_ts_ < full_ckp_commit_ts_) {
if (op->commit_ts_ <= full_ckp_commit_ts_) {
// Ignore the old txn
continue;
}
Expand Down
18 changes: 17 additions & 1 deletion src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,22 @@ SharedPtr<BlockEntry> SegmentEntry::GetBlockEntryByID(BlockID block_id) const {
return block_entries_[block_id];
}

SegmentStatus SegmentEntry::GetSaveStatus(TxnTimeStamp ts) const {
switch (status_) {
case SegmentStatus::kUnsealed:
case SegmentStatus::kSealed: {
return status_;
}
case SegmentStatus::kCompacting:
case SegmentStatus::kNoDelete: {
return SegmentStatus::kSealed;
}
case SegmentStatus::kDeprecated: {
return ts >= deprecate_ts_ ? SegmentStatus::kDeprecated : SegmentStatus::kSealed;
}
}
};

nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
nlohmann::json json_res;

Expand All @@ -554,7 +570,7 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
json_res["commit_ts"] = TxnTimeStamp(this->commit_ts_);
json_res["begin_ts"] = TxnTimeStamp(this->begin_ts_);
json_res["txn_id"] = TransactionID(this->txn_id_);
json_res["status"] = static_cast<std::underlying_type_t<SegmentStatus>>(this->status_);
json_res["status"] = static_cast<std::underlying_type_t<SegmentStatus>>(this->GetSaveStatus(max_commit_ts));
if (status_ != SegmentStatus::kUnsealed) {
LOG_TRACE(fmt::format("SegmentEntry::Serialize: Begin try to save FastRoughFilter to json file"));
this->GetFastRoughFilter()->SaveToJsonFile(json_res);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public:

static SharedPtr<SegmentEntry> Deserialize(const nlohmann::json &table_entry_json, TableEntry *table_entry, BufferManager *buffer_mgr);

SegmentStatus GetSaveStatus(TxnTimeStamp ts) const;

public:
void AddBlockReplay(SharedPtr<BlockEntry> block_entry);

Expand Down
3 changes: 2 additions & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ Status Storage::AdminToWriter() {
UnrecoverableError("Memory index tracer was initialized before.");
}
memory_index_tracer_ = MakeUnique<BGMemIndexTracer>(config_ptr_->MemIndexMemoryQuota(), new_catalog_.get(), txn_mgr_.get());
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();

bg_processor_->Start();

Expand Down Expand Up @@ -634,7 +635,6 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
LOG_WARN(fmt::format("Set unchanged mode"));
return Status::OK();
}
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();
switch (current_mode) {
case StorageMode::kUnInitialized: {
if (target_mode != StorageMode::kAdmin) {
Expand Down Expand Up @@ -732,6 +732,7 @@ Status Storage::AdminToReaderBottom(TxnTimeStamp system_start_ts) {
UnrecoverableError("Memory index tracer was initialized before.");
}
memory_index_tracer_ = MakeUnique<BGMemIndexTracer>(config_ptr_->MemIndexMemoryQuota(), new_catalog_.get(), txn_mgr_.get());
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();

new_catalog_->StartMemoryIndexCommit();
new_catalog_->MemIndexRecover(buffer_mgr_.get(), system_start_ts);
Expand Down
14 changes: 9 additions & 5 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import global_resource_usage;
namespace infinity {

TxnManager::TxnManager(BufferManager *buffer_mgr, WalManager *wal_mgr, TxnTimeStamp start_ts)
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), is_running_(false) {
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), max_committed_ts_(start_ts), is_running_(false) {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("TxnManager");
#endif
Expand Down Expand Up @@ -131,7 +131,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
current_ts_ += 2;
TxnTimeStamp commit_ts = current_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
committing_txns_.emplace(txn);
committing_txns_.emplace(commit_ts, txn);
txn->SetTxnWrite();
return commit_ts;
}
Expand All @@ -143,8 +143,7 @@ bool TxnManager::CheckTxnConflict(Txn *txn) {
{
std::lock_guard guard(locker_);
// LOG_INFO(fmt::format("Txn {}(commit_ts:{}) check conflict", txn->TxnID(), txn->CommitTS()));
for (Txn *committing_txn : committing_txns_) {
TxnTimeStamp committing_ts = committing_txn->CommitTS();
for (auto &[committing_ts, committing_txn] : committing_txns_) {
if (commit_ts > committing_ts) {
candidate_txns.push_back(committing_txn->shared_from_this());
min_checking_ts = std::min(min_checking_ts, committing_txn->BeginTS());
Expand Down Expand Up @@ -345,8 +344,9 @@ void TxnManager::CleanupTxn(Txn *txn) {
TransactionID txn_id = txn->TxnID();
{
// cleanup the txn from committing_txn and txm_map
auto commit_ts = txn->CommitTS();
std::lock_guard guard(locker_);
SizeT remove_n = committing_txns_.erase(txn);
SizeT remove_n = committing_txns_.erase(commit_ts);
if (remove_n == 0) {
UnrecoverableError("Txn not found in committing_txns_");
}
Expand All @@ -355,6 +355,10 @@ void TxnManager::CleanupTxn(Txn *txn) {
String error_message = fmt::format("Txn: {} not found in txn map", txn_id);
UnrecoverableError(error_message);
}

if (committing_txns_.empty() || committing_txns_.begin()->first > commit_ts) {
max_committed_ts_ = commit_ts;
}
}
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public:
// Only used by follower and learner when received the replicated log from leader
void SetStartTS(TxnTimeStamp new_start_ts) { current_ts_ = new_start_ts; }

TxnTimeStamp max_committed_ts() { return max_committed_ts_; }

private:
mutable std::mutex locker_{};
BufferManager *buffer_mgr_{};
Expand All @@ -113,12 +115,13 @@ private:
WalManager *wal_mgr_;

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> committing_txns_; // the txns in committing stage, can use flat_map
Map<TxnTimeStamp, Txn *> committing_txns_; // the txns in committing stage
Set<TxnTimeStamp> checking_ts_{}; // the begin ts of txn that is used to check conflict

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> current_ts_{}; // The next txn ts
Atomic<TxnTimeStamp> max_committed_ts_{};
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // current ckp begin ts, UNCOMMIT_TS if no ckp is happening, UNCOMMIT_TS is a maximum u64 integer

// For stop the txn manager
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ AddTableEntryOp::AddTableEntryOp(TableEntry *table_entry, TxnTimeStamp commit_ts
table_comment_(table_entry->GetTableComment()) {}

AddSegmentEntryOp::AddSegmentEntryOp(SegmentEntry *segment_entry, TxnTimeStamp commit_ts, String segment_filter_binary_data)
: CatalogDeltaOperation(CatalogDeltaOpType::ADD_SEGMENT_ENTRY, segment_entry, commit_ts), status_(segment_entry->status()),
: CatalogDeltaOperation(CatalogDeltaOpType::ADD_SEGMENT_ENTRY, segment_entry, commit_ts), status_(segment_entry->GetSaveStatus(commit_ts)),
column_count_(segment_entry->column_count()), row_count_(segment_entry->row_count()), // FIXME: use append_state
actual_row_count_(segment_entry->actual_row_count()), // FIXME: use append_state
row_capacity_(segment_entry->row_capacity()), min_row_ts_(segment_entry->min_row_ts()), max_row_ts_(segment_entry->max_row_ts()),
Expand Down
4 changes: 3 additions & 1 deletion src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,9 @@ void WalManager::FullCheckpointInner(Txn *txn) {
TxnTimeStamp last_ckp_ts = last_ckp_ts_;
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
auto [max_commit_ts, wal_size] = GetCommitState();
max_commit_ts = std::min(max_commit_ts, txn->BeginTS()); // txn commit after txn->BeginTS() should be ignored
// max_commit_ts = std::min(max_commit_ts, txn->BeginTS()); // txn commit after txn->BeginTS() should be ignored
TxnManager *txn_mgr = storage_->txn_manager();
max_commit_ts = txn_mgr->max_committed_ts();
// wal_size may be larger than the actual size. but it's ok. it only makes the swap of wal file a little bit later.

if (max_commit_ts == last_full_ckp_ts) {
Expand Down
22 changes: 22 additions & 0 deletions test/data/config/restart_test/test_compact/2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[general]
version = "0.5.0"
time_zone = "utc-8"

[network]
[log]
log_to_stdout = true
log_level = "trace"

[storage]
data_dir = "/var/infinity/data"
optimize_interval = "0s"
cleanup_interval = "2s"
compact_interval = "1s"
persistence_dir = ""

[buffer]
[wal]
delta_checkpoint_interval = "1s"
full_checkpoint_interval = "3s"

[resource]
Loading

0 comments on commit 861c142

Please sign in to comment.