Skip to content

Commit

Permalink
Refactor storage: writer->un-init (#2257)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

refactor storage: writer->un-init

### Type of change

- [x] Refactoring

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Nov 17, 2024
1 parent d7e16c8 commit 3bfb954
Showing 1 changed file with 71 additions and 113 deletions.
184 changes: 71 additions & 113 deletions src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,76 @@ Status Storage::WriterToReader() {
current_storage_mode_ = StorageMode::kReadable;
return Status::OK();
}
Status UnInitFromWriter() { return Status::OK(); }
Status Storage::UnInitFromWriter() {

if (periodic_trigger_thread_ != nullptr) {
periodic_trigger_thread_->Stop();
periodic_trigger_thread_.reset();
}

if (compact_processor_ != nullptr) {
compact_processor_->Stop(); // Different from Readable
compact_processor_.reset(); // Different from Readable
}

if (bg_processor_ != nullptr) {
bg_processor_->Stop();
bg_processor_.reset();
}

new_catalog_.reset();

memory_index_tracer_.reset();

if (wal_mgr_ != nullptr) {
wal_mgr_->Stop();
wal_mgr_.reset();
}

switch (config_ptr_->StorageType()) {
case StorageType::kLocal: {
// Not init remote store
break;
}
case StorageType::kMinio: {
if (object_storage_processor_ != nullptr) {
object_storage_processor_->Stop();
object_storage_processor_.reset();
VirtualStore::UnInitRemoteStore();
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported storage type: {}.", ToString(config_ptr_->StorageType())));
}
}

if (txn_mgr_ != nullptr) {
txn_mgr_->Stop();
txn_mgr_.reset();
}

if (buffer_mgr_ != nullptr) {
buffer_mgr_->Stop();
buffer_mgr_.reset();
}

if (result_cache_manager_ != nullptr) {
result_cache_manager_.reset();
}

if (persistence_manager_ != nullptr) {
persistence_manager_.reset();
}

if (cleanup_info_tracer_ != nullptr) {
cleanup_info_tracer_.reset();
}

std::unique_lock<std::mutex> lock(mutex_);
current_storage_mode_ = StorageMode::kUnInitialized;
return Status::OK();
}

ResultCacheManager *Storage::result_cache_manager() const noexcept {
if (config_ptr_->ResultCache() != "on") {
Expand Down Expand Up @@ -583,7 +652,7 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
case StorageMode::kWritable: {
switch (target_mode) {
case StorageMode::kUnInitialized: {
break;
return UnInitFromWriter();
}
case StorageMode::kAdmin: {
return WriterToAdmin();
Expand All @@ -595,117 +664,6 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
UnrecoverableError("Attempt to set storage mode from Writable to Writable");
}
}

if (target_mode == StorageMode::kWritable) {
UnrecoverableError("Attempt to set storage mode from Writable to Writable");
}

if (target_mode == StorageMode::kUnInitialized or target_mode == StorageMode::kAdmin) {

if (periodic_trigger_thread_ != nullptr) {
periodic_trigger_thread_->Stop();
periodic_trigger_thread_.reset();
}

if (compact_processor_ != nullptr) {
compact_processor_->Stop(); // Different from Readable
compact_processor_.reset(); // Different from Readable
}

if (bg_processor_ != nullptr) {
bg_processor_->Stop();
bg_processor_.reset();
}

new_catalog_.reset();

memory_index_tracer_.reset();

if (wal_mgr_ != nullptr) {
wal_mgr_->Stop();
wal_mgr_.reset();
}

if (target_mode == StorageMode::kUnInitialized) {
switch (config_ptr_->StorageType()) {
case StorageType::kLocal: {
// Not init remote store
break;
}
case StorageType::kMinio: {
if (object_storage_processor_ != nullptr) {
object_storage_processor_->Stop();
object_storage_processor_.reset();
VirtualStore::UnInitRemoteStore();
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported storage type: {}.", ToString(config_ptr_->StorageType())));
}
}
}

if (txn_mgr_ != nullptr) {
txn_mgr_->Stop();
txn_mgr_.reset();
}

if (buffer_mgr_ != nullptr) {
buffer_mgr_->Stop();
buffer_mgr_.reset();
}

if (result_cache_manager_ != nullptr) {
result_cache_manager_.reset();
}

if (target_mode == StorageMode::kUnInitialized) {
if (persistence_manager_ != nullptr) {
persistence_manager_.reset();
}
}

if (cleanup_info_tracer_ != nullptr) {
cleanup_info_tracer_.reset();
}

if (target_mode == StorageMode::kAdmin) {
// wal_manager stop won't reset many member. We need to recreate the wal_manager object.
wal_mgr_ = MakeUnique<WalManager>(this,
config_ptr_->WALDir(),
config_ptr_->DataDir(),
config_ptr_->WALCompactThreshold(),
config_ptr_->DeltaCheckpointThreshold(),
config_ptr_->FlushMethodAtCommit());
}
}

if (target_mode == StorageMode::kReadable) {
if (periodic_trigger_thread_ != nullptr) {
periodic_trigger_thread_->Stop();
periodic_trigger_thread_.reset();
}

if (compact_processor_ != nullptr) {
compact_processor_->Stop(); // Different from Readable
compact_processor_.reset(); // Different from Readable
}

i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0;

periodic_trigger_thread_ = MakeUnique<PeriodicTriggerThread>();
periodic_trigger_thread_->cleanup_trigger_ =
MakeShared<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get());
bg_processor_->SetCleanupTrigger(periodic_trigger_thread_->cleanup_trigger_);
periodic_trigger_thread_->Start();
}

{
std::unique_lock<std::mutex> lock(mutex_);
current_storage_mode_ = target_mode;
}

break;
}
}
Expand Down

0 comments on commit 3bfb954

Please sign in to comment.