diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index bf52d55b8a..07effc5dc7 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -503,7 +503,76 @@ Status Storage::WriterToReader() { current_storage_mode_ = StorageMode::kReadable; return Status::OK(); } -Status UnInitFromWriter() { return Status::OK(); } +Status Storage::UnInitFromWriter() { + + if (periodic_trigger_thread_ != nullptr) { + periodic_trigger_thread_->Stop(); + periodic_trigger_thread_.reset(); + } + + if (compact_processor_ != nullptr) { + compact_processor_->Stop(); // Different from Readable + compact_processor_.reset(); // Different from Readable + } + + if (bg_processor_ != nullptr) { + bg_processor_->Stop(); + bg_processor_.reset(); + } + + new_catalog_.reset(); + + memory_index_tracer_.reset(); + + if (wal_mgr_ != nullptr) { + wal_mgr_->Stop(); + wal_mgr_.reset(); + } + + switch (config_ptr_->StorageType()) { + case StorageType::kLocal: { + // Not init remote store + break; + } + case StorageType::kMinio: { + if (object_storage_processor_ != nullptr) { + object_storage_processor_->Stop(); + object_storage_processor_.reset(); + VirtualStore::UnInitRemoteStore(); + } + break; + } + default: { + UnrecoverableError(fmt::format("Unsupported storage type: {}.", ToString(config_ptr_->StorageType()))); + } + } + + if (txn_mgr_ != nullptr) { + txn_mgr_->Stop(); + txn_mgr_.reset(); + } + + if (buffer_mgr_ != nullptr) { + buffer_mgr_->Stop(); + buffer_mgr_.reset(); + } + + if (result_cache_manager_ != nullptr) { + result_cache_manager_.reset(); + } + + if (persistence_manager_ != nullptr) { + persistence_manager_.reset(); + } + + if (cleanup_info_tracer_ != nullptr) { + cleanup_info_tracer_.reset(); + } + + std::unique_lock lock(mutex_); + current_storage_mode_ = StorageMode::kUnInitialized; + return Status::OK(); +} ResultCacheManager *Storage::result_cache_manager() const noexcept { if (config_ptr_->ResultCache() != "on") { @@ -583,7 +652,7 @@ Status Storage::SetStorageMode(StorageMode target_mode) { case StorageMode::kWritable: { switch (target_mode) { case StorageMode::kUnInitialized: { - break; + return UnInitFromWriter(); } case StorageMode::kAdmin: { return WriterToAdmin(); @@ -595,117 +664,6 @@ Status Storage::SetStorageMode(StorageMode target_mode) { UnrecoverableError("Attempt to set storage mode from Writable to Writable"); } } - - if (target_mode == StorageMode::kWritable) { - UnrecoverableError("Attempt to set storage mode from Writable to Writable"); - } - - if (target_mode == StorageMode::kUnInitialized or target_mode == StorageMode::kAdmin) { - - if (periodic_trigger_thread_ != nullptr) { - periodic_trigger_thread_->Stop(); - periodic_trigger_thread_.reset(); - } - - if (compact_processor_ != nullptr) { - compact_processor_->Stop(); // Different from Readable - compact_processor_.reset(); // Different from Readable - } - - if (bg_processor_ != nullptr) { - bg_processor_->Stop(); - bg_processor_.reset(); - } - - new_catalog_.reset(); - - memory_index_tracer_.reset(); - - if (wal_mgr_ != nullptr) { - wal_mgr_->Stop(); - wal_mgr_.reset(); - } - - if (target_mode == StorageMode::kUnInitialized) { - switch (config_ptr_->StorageType()) { - case StorageType::kLocal: { - // Not init remote store - break; - } - case StorageType::kMinio: { - if (object_storage_processor_ != nullptr) { - object_storage_processor_->Stop(); - object_storage_processor_.reset(); - VirtualStore::UnInitRemoteStore(); - } - break; - } - default: { - UnrecoverableError(fmt::format("Unsupported storage type: {}.", ToString(config_ptr_->StorageType()))); - } - } - } - - if (txn_mgr_ != nullptr) { - txn_mgr_->Stop(); - txn_mgr_.reset(); - } - - if (buffer_mgr_ != nullptr) { - buffer_mgr_->Stop(); - buffer_mgr_.reset(); - } - - if (result_cache_manager_ != nullptr) { - result_cache_manager_.reset(); - } - - if (target_mode == StorageMode::kUnInitialized) { - if (persistence_manager_ != nullptr) { - persistence_manager_.reset(); - } - } - - if (cleanup_info_tracer_ != nullptr) { - cleanup_info_tracer_.reset(); - } - - if (target_mode == StorageMode::kAdmin) { - // wal_manager stop won't reset many member. We need to recreate the wal_manager object. - wal_mgr_ = MakeUnique(this, - config_ptr_->WALDir(), - config_ptr_->DataDir(), - config_ptr_->WALCompactThreshold(), - config_ptr_->DeltaCheckpointThreshold(), - config_ptr_->FlushMethodAtCommit()); - } - } - - if (target_mode == StorageMode::kReadable) { - if (periodic_trigger_thread_ != nullptr) { - periodic_trigger_thread_->Stop(); - periodic_trigger_thread_.reset(); - } - - if (compact_processor_ != nullptr) { - compact_processor_->Stop(); // Different from Readable - compact_processor_.reset(); // Different from Readable - } - - i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0; - - periodic_trigger_thread_ = MakeUnique(); - periodic_trigger_thread_->cleanup_trigger_ = - MakeShared(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get()); - bg_processor_->SetCleanupTrigger(periodic_trigger_thread_->cleanup_trigger_); - periodic_trigger_thread_->Start(); - } - - { - std::unique_lock lock(mutex_); - current_storage_mode_ = target_mode; - } - break; } }