From 372507c4f6da0053efcdcdcd3768fff98b1b79d2 Mon Sep 17 00:00:00 2001 From: Beka Davis <31743465+bekadavis9@users.noreply.github.com> Date: Thu, 16 May 2024 16:16:46 -0400 Subject: [PATCH] Remove StorageManager::vfs() (#4982) Remove `StorageManager::vfs()` [sc-47537] --- TYPE: NO_HISTORY DESC: Remove `StorageManager::vfs()`. --- tiledb/api/c_api/context/context_api.cc | 5 +- .../storage_manager_override.h | 3 - tiledb/sm/array/test/unit_consistency.cc | 8 +- .../consolidator/array_meta_consolidator.cc | 16 ++-- .../sm/consolidator/commits_consolidator.cc | 15 ++- tiledb/sm/consolidator/consolidator.cc | 12 +-- tiledb/sm/consolidator/consolidator.h | 4 +- .../sm/consolidator/fragment_consolidator.cc | 27 +++--- .../fragment_meta_consolidator.cc | 13 +-- .../consolidator/group_meta_consolidator.cc | 13 ++- tiledb/sm/group/group.cc | 4 +- .../deletes_and_updates.cc | 2 +- tiledb/sm/query/readers/filtered_data.h | 10 +- tiledb/sm/query/strategy_base.h | 18 ++-- .../sm/query/writers/global_order_writer.cc | 25 ++--- tiledb/sm/query/writers/ordered_writer.cc | 6 +- tiledb/sm/query/writers/unordered_writer.cc | 5 +- tiledb/sm/query/writers/writer_base.cc | 31 +++---- tiledb/sm/storage_manager/storage_manager.cc | 93 ++++++++++--------- .../storage_manager_canonical.h | 5 - 20 files changed, 148 insertions(+), 167 deletions(-) diff --git a/tiledb/api/c_api/context/context_api.cc b/tiledb/api/c_api/context/context_api.cc index 9ff7b8de04f..4ea72796c6d 100644 --- a/tiledb/api/c_api/context/context_api.cc +++ b/tiledb/api/c_api/context/context_api.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022-2023 TileDB, Inc. + * @copyright Copyright (c) 2022-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -108,7 +108,8 @@ capi_return_t tiledb_ctx_get_last_error( capi_return_t tiledb_ctx_is_supported_fs( tiledb_ctx_t* ctx, tiledb_filesystem_t fs, int32_t* is_supported) { ensure_output_pointer_is_valid(is_supported); - *is_supported = (int32_t)ctx->storage_manager()->vfs()->supports_fs( + + *is_supported = (int32_t)ctx->context().resources().vfs().supports_fs( static_cast(fs)); return TILEDB_OK; } diff --git a/tiledb/api/c_api_test_support/storage_manager_stub/storage_manager_override.h b/tiledb/api/c_api_test_support/storage_manager_stub/storage_manager_override.h index 351b91f5522..8d123cd21fb 100644 --- a/tiledb/api/c_api_test_support/storage_manager_stub/storage_manager_override.h +++ b/tiledb/api/c_api_test_support/storage_manager_stub/storage_manager_override.h @@ -68,9 +68,6 @@ class StorageManagerStub { inline common::ThreadPool* io_tp() { return &resources_.io_tp(); } - inline VFS* vfs() { - throw std::logic_error("StorageManagerStub does not instantiate a VFS"); - } inline Status cancel_all_tasks() { return Status{}; }; diff --git a/tiledb/sm/array/test/unit_consistency.cc b/tiledb/sm/array/test/unit_consistency.cc index c2e2b7cf228..212a24dfbc4 100644 --- a/tiledb/sm/array/test/unit_consistency.cc +++ b/tiledb/sm/array/test/unit_consistency.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022 TileDB, Inc. + * @copyright Copyright (c) 2022-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -186,7 +186,7 @@ TEST_CASE( REQUIRE(x.is_open(uri) == false); // Clean up - REQUIRE(sm.vfs()->remove_dir(uri).ok()); + REQUIRE(resources.vfs().remove_dir(uri).ok()); } TEST_CASE( @@ -227,7 +227,7 @@ TEST_CASE( // Clean up for (auto uri : uris) { - REQUIRE(sm.vfs()->remove_dir(uri).ok()); + REQUIRE(resources.vfs().remove_dir(uri).ok()); } } @@ -283,5 +283,5 @@ TEST_CASE( REQUIRE(array.get()->close().ok()); REQUIRE(x.registry_size() == 0); REQUIRE(x.is_open(uri) == false); - REQUIRE(sm.vfs()->remove_dir(uri).ok()); + REQUIRE(resources.vfs().remove_dir(uri).ok()); } diff --git a/tiledb/sm/consolidator/array_meta_consolidator.cc b/tiledb/sm/consolidator/array_meta_consolidator.cc index 0efb104da0b..11a77d2f16f 100644 --- a/tiledb/sm/consolidator/array_meta_consolidator.cc +++ b/tiledb/sm/consolidator/array_meta_consolidator.cc @@ -112,9 +112,8 @@ Status ArrayMetaConsolidator::consolidate( throw_if_not_ok(array_for_writes.close()); // Write vacuum file - RETURN_NOT_OK( - storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size())); - RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri)); + throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size())); + throw_if_not_ok(resources_.vfs().close_file(vac_uri)); return Status::Ok(); } @@ -126,18 +125,15 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) { } // Get the array metadata URIs and vacuum file URIs to be vacuum - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_.vfs(); auto compute_tp = storage_manager_->compute_tp(); auto array_dir = ArrayDirectory( - storage_manager_->resources(), - URI(array_name), - 0, - std::numeric_limits::max()); + resources_, URI(array_name), 0, std::numeric_limits::max()); // Delete the array metadata and vacuum files - vfs->remove_files(compute_tp, array_dir.array_meta_uris_to_vacuum()); - vfs->remove_files(compute_tp, array_dir.array_meta_vac_uris_to_vacuum()); + vfs.remove_files(compute_tp, array_dir.array_meta_uris_to_vacuum()); + vfs.remove_files(compute_tp, array_dir.array_meta_vac_uris_to_vacuum()); } /* ****************************** */ diff --git a/tiledb/sm/consolidator/commits_consolidator.cc b/tiledb/sm/consolidator/commits_consolidator.cc index b01167d9348..8bc3c6b9a17 100644 --- a/tiledb/sm/consolidator/commits_consolidator.cc +++ b/tiledb/sm/consolidator/commits_consolidator.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022 TileDB, Inc. + * @copyright Copyright (c) 2022-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -82,7 +82,7 @@ Status CommitsConsolidator::consolidate( // Get the array uri to consolidate from the array directory. auto array_dir = ArrayDirectory( - storage_manager_->resources(), + resources_, URI(array_name), 0, utils::time::timestamp_now_ms(), @@ -96,7 +96,7 @@ Status CommitsConsolidator::consolidate( // Get the file name. auto& to_consolidate = array_dir.commit_uris_to_consolidate(); Consolidator::write_consolidated_commits_file( - write_version, array_dir, to_consolidate, storage_manager_); + write_version, array_dir, to_consolidate, resources_); return Status::Ok(); } @@ -109,18 +109,17 @@ void CommitsConsolidator::vacuum(const char* array_name) { // Get the array metadata URIs and vacuum file URIs to be vacuum ArrayDirectory array_dir( - storage_manager_->resources(), + resources_, URI(array_name), 0, utils::time::timestamp_now_ms(), ArrayDirectoryMode::COMMITS); // Delete the commits and vacuum files - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_.vfs(); auto compute_tp = storage_manager_->compute_tp(); - vfs->remove_files(compute_tp, array_dir.commit_uris_to_vacuum()); - vfs->remove_files( - compute_tp, array_dir.consolidated_commits_uris_to_vacuum()); + vfs.remove_files(compute_tp, array_dir.commit_uris_to_vacuum()); + vfs.remove_files(compute_tp, array_dir.consolidated_commits_uris_to_vacuum()); } } // namespace tiledb::sm diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index a29d493263a..1a6fe605016 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -256,7 +256,7 @@ void Consolidator::write_consolidated_commits_file( format_version_t write_version, ArrayDirectory array_dir, const std::vector& commit_uris, - StorageManager* storage_manager) { + ContextResources& resources) { // Compute the file name. auto name = storage_format::generate_consolidated_fragment_name( commit_uris.front(), commit_uris.back(), write_version); @@ -274,7 +274,7 @@ void Consolidator::write_consolidated_commits_file( // the size variable. if (stdx::string::ends_with( uri.to_string(), constants::delete_file_suffix)) { - throw_if_not_ok(storage_manager->vfs()->file_size(uri, &file_sizes[i])); + throw_if_not_ok(resources.vfs().file_size(uri, &file_sizes[i])); total_size += file_sizes[i]; total_size += sizeof(storage_size_t); } @@ -295,8 +295,8 @@ void Consolidator::write_consolidated_commits_file( uri.to_string(), constants::delete_file_suffix)) { memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t)); file_index += sizeof(storage_size_t); - throw_if_not_ok(storage_manager->vfs()->read( - uri, 0, &data[file_index], file_sizes[i])); + throw_if_not_ok( + resources.vfs().read(uri, 0, &data[file_index], file_sizes[i])); file_index += file_sizes[i]; } } @@ -305,9 +305,9 @@ void Consolidator::write_consolidated_commits_file( URI consolidated_commits_uri = array_dir.get_commits_dir(write_version) .join_path(name + constants::con_commits_file_suffix); - throw_if_not_ok(storage_manager->vfs()->write( + throw_if_not_ok(resources.vfs().write( consolidated_commits_uri, data.data(), data.size())); - throw_if_not_ok(storage_manager->vfs()->close_file(consolidated_commits_uri)); + throw_if_not_ok(resources.vfs().close_file(consolidated_commits_uri)); } void Consolidator::array_vacuum( diff --git a/tiledb/sm/consolidator/consolidator.h b/tiledb/sm/consolidator/consolidator.h index 0b4f0ef4424..1c7d10ae238 100644 --- a/tiledb/sm/consolidator/consolidator.h +++ b/tiledb/sm/consolidator/consolidator.h @@ -181,13 +181,13 @@ class Consolidator { * @param write_version Write version. * @param array_dir ArrayDirectory where the data is stored. * @param commit_uris Commit files to include. - * @param storage_manager The storage manager. + * @param resources The context resources. */ static void write_consolidated_commits_file( format_version_t write_version, ArrayDirectory array_dir, const std::vector& commit_uris, - StorageManager* storage_manager); + ContextResources& resources); /** * Cleans up the array, such as its consolidated fragments and array diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 4b4f55cc5a9..374597efdcb 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -446,7 +446,7 @@ void FragmentConsolidator::vacuum(const char* array_name) { array_dir.write_commit_ignore_file(commit_uris_to_ignore); } - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_.vfs(); auto compute_tp = storage_manager_->compute_tp(); // Delete fragment directories @@ -455,22 +455,22 @@ void FragmentConsolidator::vacuum(const char* array_name) { // Remove the commit file, if present. auto commit_uri = array_dir.get_commit_uri(fragment_uris_to_vacuum[i]); bool is_file = false; - RETURN_NOT_OK(vfs->is_file(commit_uri, &is_file)); + throw_if_not_ok(vfs.is_file(commit_uri, &is_file)); if (is_file) { - RETURN_NOT_OK(vfs->remove_file(commit_uri)); + throw_if_not_ok(vfs.remove_file(commit_uri)); } bool is_dir = false; - RETURN_NOT_OK(vfs->is_dir(fragment_uris_to_vacuum[i], &is_dir)); + throw_if_not_ok(vfs.is_dir(fragment_uris_to_vacuum[i], &is_dir)); if (is_dir) { - RETURN_NOT_OK(vfs->remove_dir(fragment_uris_to_vacuum[i])); + throw_if_not_ok(vfs.remove_dir(fragment_uris_to_vacuum[i])); } return Status::Ok(); })); // Delete the vacuum files. - vfs->remove_files( + vfs.remove_files( compute_tp, filtered_fragment_uris.fragment_vac_uris_to_vacuum()); } @@ -585,10 +585,9 @@ Status FragmentConsolidator::consolidate_internal( auto st = query_w->finalize(); if (!st.ok()) { bool is_dir = false; - throw_if_not_ok( - storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir)); + throw_if_not_ok(resources_.vfs().is_dir(*new_fragment_uri, &is_dir)); if (is_dir) - throw_if_not_ok(storage_manager_->vfs()->remove_dir(*new_fragment_uri)); + throw_if_not_ok(resources_.vfs().remove_dir(*new_fragment_uri)); return st; } @@ -600,10 +599,9 @@ Status FragmentConsolidator::consolidate_internal( to_consolidate); if (!st.ok()) { bool is_dir = false; - throw_if_not_ok( - storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir)); + throw_if_not_ok(resources_.vfs().is_dir(*new_fragment_uri, &is_dir)); if (is_dir) - throw_if_not_ok(storage_manager_->vfs()->remove_dir(*new_fragment_uri)); + throw_if_not_ok(resources_.vfs().remove_dir(*new_fragment_uri)); return st; } @@ -1016,9 +1014,8 @@ Status FragmentConsolidator::write_vacuum_file( } auto data = ss.str(); - RETURN_NOT_OK( - storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size())); - RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri)); + throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size())); + throw_if_not_ok(resources_.vfs().close_file(vac_uri)); return Status::Ok(); } diff --git a/tiledb/sm/consolidator/fragment_meta_consolidator.cc b/tiledb/sm/consolidator/fragment_meta_consolidator.cc index 4afa82423b2..47cf67ed2ca 100644 --- a/tiledb/sm/consolidator/fragment_meta_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_meta_consolidator.cc @@ -97,7 +97,7 @@ Status FragmentMetaConsolidator::consolidate( first, last, write_version); auto frag_md_uri = array_dir.get_fragment_metadata_dir(write_version); - RETURN_NOT_OK(storage_manager_->vfs()->create_dir(frag_md_uri)); + throw_if_not_ok(resources_.vfs().create_dir(frag_md_uri)); uri = URI(frag_md_uri.to_string() + name + constants::meta_file_suffix); // Get the consolidated fragment metadata version @@ -171,10 +171,10 @@ Status FragmentMetaConsolidator::consolidate( EncryptionKey enc_key; RETURN_NOT_OK(enc_key.set_key(encryption_type, encryption_key, key_length)); - GenericTileIO tile_io(storage_manager_->resources(), uri); + GenericTileIO tile_io(resources_, uri); [[maybe_unused]] uint64_t nbytes = 0; tile_io.write_generic(tile, enc_key, &nbytes); - RETURN_NOT_OK(storage_manager_->vfs()->close_file(uri)); + throw_if_not_ok(resources_.vfs().close_file(uri)); return Status::Ok(); } @@ -202,7 +202,7 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) { } } - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_.vfs(); auto compute_tp = storage_manager_->compute_tp(); // Vacuum @@ -211,8 +211,9 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) { auto& uri = fragment_meta_uris[i]; FragmentID fragment_id{uri}; auto timestamp_range{fragment_id.timestamp_range()}; - if (timestamp_range.second != t_latest) - RETURN_NOT_OK(vfs->remove_file(uri)); + if (timestamp_range.second != t_latest) { + throw_if_not_ok(vfs.remove_file(uri)); + } return Status::Ok(); })); } diff --git a/tiledb/sm/consolidator/group_meta_consolidator.cc b/tiledb/sm/consolidator/group_meta_consolidator.cc index a8f3c8e9988..2849110112f 100644 --- a/tiledb/sm/consolidator/group_meta_consolidator.cc +++ b/tiledb/sm/consolidator/group_meta_consolidator.cc @@ -99,9 +99,8 @@ Status GroupMetaConsolidator::consolidate( throw_if_not_ok(group_for_writes.close()); // Write vacuum file - RETURN_NOT_OK( - storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size())); - RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri)); + throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size())); + throw_if_not_ok(resources_.vfs().close_file(vac_uri)); return Status::Ok(); } @@ -113,12 +112,12 @@ void GroupMetaConsolidator::vacuum(const char* group_name) { } // Get the group metadata URIs and vacuum file URIs to be vacuumed - auto vfs = storage_manager_->vfs(); + auto& vfs = resources_.vfs(); auto compute_tp = storage_manager_->compute_tp(); GroupDirectory group_dir; try { group_dir = GroupDirectory( - vfs, + &vfs, compute_tp, URI(group_name), 0, @@ -128,8 +127,8 @@ void GroupMetaConsolidator::vacuum(const char* group_name) { } // Delete the group metadata and vacuum files - vfs->remove_files(compute_tp, group_dir.group_meta_uris_to_vacuum()); - vfs->remove_files(compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); + vfs.remove_files(compute_tp, group_dir.group_meta_uris_to_vacuum()); + vfs.remove_files(compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); } /* ****************************** */ diff --git a/tiledb/sm/group/group.cc b/tiledb/sm/group/group.cc index c6389d4e1d1..81b592b3e21 100644 --- a/tiledb/sm/group/group.cc +++ b/tiledb/sm/group/group.cc @@ -163,7 +163,7 @@ Status Group::open( try { group_dir_ = make_shared( HERE(), - storage_manager_->vfs(), + &resources_.vfs(), storage_manager_->compute_tp(), group_uri_, timestamp_start, @@ -177,7 +177,7 @@ Status Group::open( try { group_dir_ = make_shared( HERE(), - storage_manager_->vfs(), + &resources_.vfs(), storage_manager_->compute_tp(), group_uri_, timestamp_start, diff --git a/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc b/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc index df73c3e95ea..e57b67d1ca9 100644 --- a/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc +++ b/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc @@ -147,7 +147,7 @@ Status DeletesAndUpdates::dowork() { // Create the commit URI if needed. auto& array_dir = array_->array_directory(); auto commit_uri = array_dir.get_commits_dir(write_version); - RETURN_NOT_OK(storage_manager_->vfs()->create_dir(commit_uri)); + throw_if_not_ok(resources_.vfs().create_dir(commit_uri)); // Serialize the negated condition (aud update values if they are not empty) // and write to disk. diff --git a/tiledb/sm/query/readers/filtered_data.h b/tiledb/sm/query/readers/filtered_data.h index da4b5c9c9ef..7642e87ce54 100644 --- a/tiledb/sm/query/readers/filtered_data.h +++ b/tiledb/sm/query/readers/filtered_data.h @@ -189,7 +189,8 @@ class FilteredData { StorageManager* storage_manager, std::vector& read_tasks, shared_ptr memory_tracker) - : memory_tracker_(memory_tracker) + : resources_(storage_manager->resources()) + , memory_tracker_(memory_tracker) , fixed_data_blocks_( memory_tracker_->get_resource(MemoryType::FILTERED_DATA)) , var_data_blocks_( @@ -396,8 +397,8 @@ class FilteredData { URI uri{file_uri(fragment_metadata_[block.frag_idx()].get(), type)}; auto task = storage_manager_->io_tp()->execute([this, offset, data, size, uri]() { - RETURN_NOT_OK( - storage_manager_->vfs()->read(uri, offset, data, size, false)); + throw_if_not_ok( + resources_.vfs().read(uri, offset, data, size, false)); return Status::Ok(); }); read_tasks_.push_back(std::move(task)); @@ -601,6 +602,9 @@ class FilteredData { /* PRIVATE ATTRIBUTES */ /* ********************************* */ + /** Resources used to perform operations. */ + ContextResources& resources_; + /** Memory tracker for the filtered data. */ shared_ptr memory_tracker_; diff --git a/tiledb/sm/query/strategy_base.h b/tiledb/sm/query/strategy_base.h index 122ad3864d6..1a62e36980b 100644 --- a/tiledb/sm/query/strategy_base.h +++ b/tiledb/sm/query/strategy_base.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2022 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -40,8 +40,7 @@ #include "tiledb/sm/storage_manager/context_resources.h" #include "tiledb/sm/storage_manager/storage_manager.h" -namespace tiledb { -namespace sm { +namespace tiledb::sm { class OpenedArray; class ArraySchema; @@ -100,7 +99,7 @@ class StrategyParams { /* ********************************* */ /** - * Accessor for the resources + * Accessor for the resources. */ inline ContextResources& resources() { return resources_; @@ -175,9 +174,7 @@ class StrategyParams { /* PRIVATE ATTRIBUTES */ /* ********************************* */ - /** - * Resources used to perform operations - */ + /** Resources used to perform operations. */ ContextResources& resources_; /** Array Memory tracker. */ @@ -277,9 +274,7 @@ class StrategyBase { /* PROTECTED ATTRIBUTES */ /* ********************************* */ - /** - * Resources used for operations - */ + /** Resources used for operations. */ ContextResources& resources_; /** The array memory tracker. */ @@ -345,7 +340,6 @@ class StrategyBase { void get_dim_attr_stats() const; }; -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm #endif // TILEDB_STRATEGY_BASE_H diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 86bb48ba503..08b6aa358db 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2023 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -246,7 +246,7 @@ GlobalOrderWriter::multipart_upload_state(bool client) { for (const auto& name : buffer_names()) { auto uri = meta->uri(name); - auto&& [st2, state] = storage_manager_->vfs()->multipart_upload_state(uri); + auto&& [st2, state] = resources_.vfs().multipart_upload_state(uri); RETURN_NOT_OK_TUPLE(st2, {}); // If there is no entry for this uri, probably multipart upload is disabled // or no write was issued so far @@ -257,8 +257,7 @@ GlobalOrderWriter::multipart_upload_state(bool client) { if (array_schema_.var_size(name)) { auto var_uri = meta->var_uri(name); - auto&& [st, var_state] = - storage_manager_->vfs()->multipart_upload_state(var_uri); + auto&& [st, var_state] = resources_.vfs().multipart_upload_state(var_uri); RETURN_NOT_OK_TUPLE(st, {}); result[var_uri.remove_trailing_slash().last_path_part()] = std::move(*var_state); @@ -266,7 +265,7 @@ GlobalOrderWriter::multipart_upload_state(bool client) { if (array_schema_.is_nullable(name)) { auto validity_uri = meta->validity_uri(name); auto&& [st, val_state] = - storage_manager_->vfs()->multipart_upload_state(validity_uri); + resources_.vfs().multipart_upload_state(validity_uri); RETURN_NOT_OK_TUPLE(st, {}); result[validity_uri.remove_trailing_slash().last_path_part()] = std::move(*val_state); @@ -288,8 +287,7 @@ Status GlobalOrderWriter::set_multipart_upload_state( // uri in this case holds only the buffer name auto absolute_uri = global_write_state_->frag_meta_->fragment_uri().join_path(uri); - return storage_manager_->vfs()->set_multipart_upload_state( - absolute_uri, state); + return resources_.vfs().set_multipart_upload_state(absolute_uri, state); } /* ****************************** */ @@ -509,13 +507,13 @@ void GlobalOrderWriter::clean_up() { // Cleanup the fragment we are currently writing. There is a chance that the // URI is empty if creating the first fragment had failed. if (!uri.empty()) { - throw_if_not_ok(storage_manager_->vfs()->remove_dir(uri)); + throw_if_not_ok(resources_.vfs().remove_dir(uri)); } global_write_state_.reset(nullptr); // Cleanup all fragments pending commit. for (auto& uri : frag_uris_to_commit_) { - throw_if_not_ok(storage_manager_->vfs()->remove_dir(uri)); + throw_if_not_ok(resources_.vfs().remove_dir(uri)); } frag_uris_to_commit_.clear(); } @@ -709,7 +707,7 @@ Status GlobalOrderWriter::finalize_global_write_state() { // Write either one commit file or a consolidated commit file if multiple // fragments were written. if (frag_uris_to_commit_.size() == 0) { - RETURN_NOT_OK(storage_manager_->vfs()->touch(commit_uri)); + throw_if_not_ok(resources_.vfs().touch(commit_uri)); } else { std::vector commit_uris; commit_uris.reserve(frag_uris_to_commit_.size() + 1); @@ -720,10 +718,7 @@ Status GlobalOrderWriter::finalize_global_write_state() { auto write_version = array_->array_schema_latest().write_version(); Consolidator::write_consolidated_commits_file( - write_version, - array_->array_directory(), - commit_uris, - storage_manager_); + write_version, array_->array_directory(), commit_uris, resources_); } // Delete global write state @@ -861,7 +856,7 @@ Status GlobalOrderWriter::global_write_handle_last_tile() { void GlobalOrderWriter::nuke_global_write_state() { auto meta = global_write_state_->frag_meta_; throw_if_not_ok(close_files(meta)); - throw_if_not_ok(storage_manager_->vfs()->remove_dir(meta->fragment_uri())); + throw_if_not_ok(resources_.vfs().remove_dir(meta->fragment_uri())); global_write_state_.reset(nullptr); } diff --git a/tiledb/sm/query/writers/ordered_writer.cc b/tiledb/sm/query/writers/ordered_writer.cc index 1cce7f7c959..657c9db6674 100644 --- a/tiledb/sm/query/writers/ordered_writer.cc +++ b/tiledb/sm/query/writers/ordered_writer.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2023 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -150,7 +150,7 @@ std::string OrderedWriter::name() { void OrderedWriter::clean_up() { if (frag_uri_.has_value()) { - throw_if_not_ok(storage_manager_->vfs()->remove_dir(frag_uri_.value())); + throw_if_not_ok(resources_.vfs().remove_dir(frag_uri_.value())); } } @@ -279,7 +279,7 @@ Status OrderedWriter::ordered_write() { // The following will make the fragment visible URI commit_uri = array_->array_directory().get_commit_uri(frag_uri_.value()); - RETURN_NOT_OK(storage_manager_->vfs()->touch(commit_uri)); + throw_if_not_ok(resources_.vfs().touch(commit_uri)); return Status::Ok(); } diff --git a/tiledb/sm/query/writers/unordered_writer.cc b/tiledb/sm/query/writers/unordered_writer.cc index ee8ecb0d1e7..509fdd1dcaa 100644 --- a/tiledb/sm/query/writers/unordered_writer.cc +++ b/tiledb/sm/query/writers/unordered_writer.cc @@ -181,7 +181,7 @@ Status UnorderedWriter::alloc_frag_meta() { void UnorderedWriter::clean_up() { if (frag_uri_.has_value()) { - throw_if_not_ok(storage_manager_->vfs()->remove_dir(frag_uri_.value())); + throw_if_not_ok(resources_.vfs().remove_dir(frag_uri_.value())); } } @@ -732,8 +732,7 @@ Status UnorderedWriter::unordered_write() { // The following will make the fragment visible URI commit_uri = array_->array_directory().get_commit_uri(frag_uri_.value()); - - RETURN_NOT_OK(storage_manager_->vfs()->touch(commit_uri)); + throw_if_not_ok(resources_.vfs().touch(commit_uri)); // Clear some data to prevent it from being serialized. cell_pos_.clear(); diff --git a/tiledb/sm/query/writers/writer_base.cc b/tiledb/sm/query/writers/writer_base.cc index d297410969c..2f3a2a5fa76 100644 --- a/tiledb/sm/query/writers/writer_base.cc +++ b/tiledb/sm/query/writers/writer_base.cc @@ -312,7 +312,7 @@ void WriterBase::refresh_config() { shared_ptr WriterBase::create_fragment_metadata() { return make_shared( HERE(), - &storage_manager_->resources(), + &resources_, query_memory_tracker_, array_->array_schema_latest().write_version()); } @@ -613,14 +613,14 @@ Status WriterBase::close_files(shared_ptr meta) const { storage_manager_->io_tp(), 0, file_uris.size(), [&](uint64_t i) { const auto& file_uri = file_uris[i]; if (layout_ == Layout::GLOBAL_ORDER && remote_query()) { - storage_manager_->vfs()->finalize_and_close_file(file_uri); + resources_.vfs().finalize_and_close_file(file_uri); } else { - RETURN_NOT_OK(storage_manager_->vfs()->close_file(file_uri)); + throw_if_not_ok(resources_.vfs().close_file(file_uri)); } return Status::Ok(); }); - RETURN_NOT_OK(status); + throw_if_not_ok(status); return Status::Ok(); } @@ -782,11 +782,11 @@ Status WriterBase::create_fragment( // Create the directories. // Create the fragment directory, the directory for the new fragment // URI, and the commit directory. - throw_if_not_ok(storage_manager_->vfs()->create_dir( - array_dir.get_fragments_dir(write_version))); - throw_if_not_ok(storage_manager_->vfs()->create_dir(fragment_uri_)); - throw_if_not_ok(storage_manager_->vfs()->create_dir( - array_dir.get_commits_dir(write_version))); + throw_if_not_ok( + resources_.vfs().create_dir(array_dir.get_fragments_dir(write_version))); + throw_if_not_ok(resources_.vfs().create_dir(fragment_uri_)); + throw_if_not_ok( + resources_.vfs().create_dir(array_dir.get_commits_dir(write_version))); // Create fragment metadata. auto timestamp_range = std::pair(timestamp, timestamp); @@ -795,7 +795,7 @@ Status WriterBase::create_fragment( buffers_.count(constants::delete_timestamps) != 0; frag_meta = make_shared( HERE(), - &storage_manager_->resources(), + &resources_, array_->array_schema_latest_ptr(), fragment_uri_, timestamp_range, @@ -1125,7 +1125,7 @@ Status WriterBase::write_tiles( ++i, ++tile_id) { auto& tile = (*tiles)[i]; auto& t = var_size ? tile.offset_tile() : tile.fixed_tile(); - RETURN_NOT_OK(storage_manager_->vfs()->write( + throw_if_not_ok(resources_.vfs().write( uri, t.filtered_buffer().data(), t.filtered_buffer().size(), @@ -1135,7 +1135,7 @@ Status WriterBase::write_tiles( if (var_size) { auto& t_var = tile.var_tile(); - RETURN_NOT_OK(storage_manager_->vfs()->write( + throw_if_not_ok(resources_.vfs().write( var_uri, t_var.filtered_buffer().data(), t_var.filtered_buffer().size(), @@ -1160,7 +1160,7 @@ Status WriterBase::write_tiles( if (nullable) { auto& t_val = tile.validity_tile(); - RETURN_NOT_OK(storage_manager_->vfs()->write( + throw_if_not_ok(resources_.vfs().write( validity_uri, t_val.filtered_buffer().data(), t_val.filtered_buffer().size(), @@ -1189,11 +1189,10 @@ Status WriterBase::write_tiles( // requirement of remote global order writes, it should only be // done if this code is executed as a result of a remote query if (remote_query()) { - RETURN_NOT_OK( - storage_manager_->vfs()->flush_multipart_file_buffer(u)); + throw_if_not_ok(resources_.vfs().flush_multipart_file_buffer(u)); } } else { - RETURN_NOT_OK(storage_manager_->vfs()->close_file(u)); + throw_if_not_ok(resources_.vfs().close_file(u)); } } } diff --git a/tiledb/sm/storage_manager/storage_manager.cc b/tiledb/sm/storage_manager/storage_manager.cc index 801b26f3c1d..4201aac467d 100644 --- a/tiledb/sm/storage_manager/storage_manager.cc +++ b/tiledb/sm/storage_manager/storage_manager.cc @@ -174,41 +174,41 @@ Status StorageManagerCanonical::array_create( array_schema->check(config_); // Create array directory - RETURN_NOT_OK(vfs()->create_dir(array_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_uri)); // Create array schema directory URI array_schema_dir_uri = array_uri.join_path(constants::array_schema_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_schema_dir_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_schema_dir_uri)); // Create the enumerations directory inside the array schema directory URI array_enumerations_uri = array_schema_dir_uri.join_path(constants::array_enumerations_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_enumerations_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_enumerations_uri)); // Create commit directory URI array_commit_uri = array_uri.join_path(constants::array_commits_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_commit_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_commit_uri)); // Create fragments directory URI array_fragments_uri = array_uri.join_path(constants::array_fragments_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_fragments_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_fragments_uri)); // Create array metadata directory URI array_metadata_uri = array_uri.join_path(constants::array_metadata_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_metadata_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_metadata_uri)); // Create fragment metadata directory URI array_fragment_metadata_uri = array_uri.join_path(constants::array_fragment_meta_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_fragment_metadata_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_fragment_metadata_uri)); // Create dimension label directory URI array_dimension_labels_uri = array_uri.join_path(constants::array_dimension_labels_dir_name); - RETURN_NOT_OK(vfs()->create_dir(array_dimension_labels_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_dimension_labels_uri)); // Get encryption key from config Status st; @@ -241,7 +241,7 @@ Status StorageManagerCanonical::array_create( // Store array schema if (!st.ok()) { - throw_if_not_ok(vfs()->remove_dir(array_uri)); + throw_if_not_ok(resources_.vfs().remove_dir(array_uri)); return st; } @@ -362,7 +362,7 @@ Status StorageManagerCanonical::array_upgrade_version( // Create array schema directory if necessary URI array_schema_dir_uri = array_uri.join_path(constants::array_schema_dir_name); - auto st = vfs()->create_dir(array_schema_dir_uri); + auto st = resources_.vfs().create_dir(array_schema_dir_uri); RETURN_NOT_OK_ELSE(st, logger_->status_no_return_value(st)); // Store array schema @@ -373,20 +373,22 @@ Status StorageManagerCanonical::array_upgrade_version( URI array_commit_uri = array_uri.join_path(constants::array_commits_dir_name); RETURN_NOT_OK_ELSE( - vfs()->create_dir(array_commit_uri), + resources_.vfs().create_dir(array_commit_uri), logger_->status_no_return_value(st)); // Create fragments directory if necessary URI array_fragments_uri = array_uri.join_path(constants::array_fragments_dir_name); - st = vfs()->create_dir(array_fragments_uri); - RETURN_NOT_OK_ELSE(st, logger_->status_no_return_value(st)); + RETURN_NOT_OK_ELSE( + resources_.vfs().create_dir(array_fragments_uri), + logger_->status_no_return_value(st)); // Create fragment metadata directory if necessary URI array_fragment_metadata_uri = array_uri.join_path(constants::array_fragment_meta_dir_name); - st = vfs()->create_dir(array_fragment_metadata_uri); - RETURN_NOT_OK_ELSE(st, logger_->status_no_return_value(st)); + RETURN_NOT_OK_ELSE( + resources_.vfs().create_dir(array_fragment_metadata_uri), + logger_->status_no_return_value(st)); } return Status::Ok(); @@ -427,7 +429,7 @@ Status StorageManagerCanonical::cancel_all_tasks() { if (handle_cancel) { // Cancel any queued tasks. cancelable_tasks_.cancel_all_tasks(); - throw_if_not_ok(resources().vfs().cancel_all_tasks()); + throw_if_not_ok(resources_.vfs().cancel_all_tasks()); // Wait for in-progress queries to finish. wait_for_zero_in_progress(); @@ -464,7 +466,7 @@ Status StorageManagerCanonical::object_remove(const char* path) const { std::string("Cannot remove object '") + path + "'; Invalid TileDB object")); - return vfs()->remove_dir(uri); + return resources_.vfs().remove_dir(uri); } Status StorageManagerCanonical::object_move( @@ -486,7 +488,7 @@ Status StorageManagerCanonical::object_move( std::string("Cannot move object '") + old_path + "'; Invalid TileDB object")); - return vfs()->move_dir(old_uri, new_uri); + return resources_.vfs().move_dir(old_uri, new_uri); } const std::unordered_map& @@ -518,19 +520,19 @@ Status StorageManagerCanonical::group_create(const std::string& group_uri) { } // Create group directory - RETURN_NOT_OK(vfs()->create_dir(uri)); + throw_if_not_ok(resources_.vfs().create_dir(uri)); // Create group file URI group_filename = uri.join_path(constants::group_filename); - RETURN_NOT_OK(vfs()->touch(group_filename)); + throw_if_not_ok(resources_.vfs().touch(group_filename)); // Create metadata folder - RETURN_NOT_OK( - vfs()->create_dir(uri.join_path(constants::group_metadata_dir_name))); + throw_if_not_ok(resources_.vfs().create_dir( + uri.join_path(constants::group_metadata_dir_name))); // Create group detail folder - RETURN_NOT_OK( - vfs()->create_dir(uri.join_path(constants::group_detail_dir_name))); + throw_if_not_ok(resources_.vfs().create_dir( + uri.join_path(constants::group_detail_dir_name))); return Status::Ok(); } @@ -550,16 +552,16 @@ bool StorageManagerCanonical::is_array(const URI& uri) const { } else { // Check if the schema directory exists or not bool dir_exists = false; - throw_if_not_ok(vfs()->is_dir( + throw_if_not_ok(resources_.vfs().is_dir( uri.join_path(constants::array_schema_dir_name), &dir_exists)); if (dir_exists) { return true; } - bool schema_exists = false; // If there is no schema directory, we check schema file - throw_if_not_ok(vfs()->is_file( + bool schema_exists = false; + throw_if_not_ok(resources_.vfs().is_file( uri.join_path(constants::array_schema_filename), &schema_exists)); return schema_exists; } @@ -575,7 +577,7 @@ Status StorageManagerCanonical::is_group(const URI& uri, bool* is_group) const { *is_group = *exists; } else { // Check for new group details directory - RETURN_NOT_OK(vfs()->is_dir( + throw_if_not_ok(resources_.vfs().is_dir( uri.join_path(constants::group_detail_dir_name), is_group)); if (*is_group) { @@ -583,8 +585,8 @@ Status StorageManagerCanonical::is_group(const URI& uri, bool* is_group) const { } // Fall back to older group file to check for legacy (pre-format 12) groups - RETURN_NOT_OK( - vfs()->is_file(uri.join_path(constants::group_filename), is_group)); + throw_if_not_ok(resources_.vfs().is_file( + uri.join_path(constants::group_filename), is_group)); } return Status::Ok(); } @@ -651,7 +653,7 @@ Status StorageManagerCanonical::object_type( } else if (!uri.is_tiledb()) { // For non public cloud backends, listing a non-directory is an error. bool is_dir = false; - RETURN_NOT_OK(vfs()->is_dir(uri, &is_dir)); + throw_if_not_ok(resources_.vfs().is_dir(uri, &is_dir)); if (!is_dir) { *type = ObjectType::INVALID; return Status::Ok(); @@ -684,7 +686,7 @@ Status StorageManagerCanonical::object_iter_begin( // Get all contents of path std::vector uris; - RETURN_NOT_OK(vfs()->ls(path_uri, &uris)); + throw_if_not_ok(resources_.vfs().ls(path_uri, &uris)); // Create a new object iterator *obj_iter = tdb_new(ObjectIter); @@ -716,7 +718,7 @@ Status StorageManagerCanonical::object_iter_begin( // Get all contents of path std::vector uris; - RETURN_NOT_OK(vfs()->ls(path_uri, &uris)); + throw_if_not_ok(resources_.vfs().ls(path_uri, &uris)); // Create a new object iterator *obj_iter = tdb_new(ObjectIter); @@ -768,7 +770,7 @@ Status StorageManagerCanonical::object_iter_next_postorder( do { obj_num = obj_iter->objs_.size(); std::vector uris; - RETURN_NOT_OK(vfs()->ls(obj_iter->objs_.front(), &uris)); + throw_if_not_ok(resources_.vfs().ls(obj_iter->objs_.front(), &uris)); obj_iter->expanded_.front() = true; // Push the new TileDB objects in the front of the iterator's list @@ -815,7 +817,7 @@ Status StorageManagerCanonical::object_iter_next_preorder( // Get all contents of the next URI std::vector uris; - RETURN_NOT_OK(vfs()->ls(front_uri, &uris)); + throw_if_not_ok(resources_.vfs().ls(front_uri, &uris)); // Push the new TileDB objects in the front of the iterator's list ObjectType obj_type; @@ -870,19 +872,22 @@ Status StorageManagerCanonical::store_array_schema( // Delete file if it exists already bool exists; - RETURN_NOT_OK(vfs()->is_file(schema_uri, &exists)); - if (exists) - RETURN_NOT_OK(vfs()->remove_file(schema_uri)); + throw_if_not_ok(resources_.vfs().is_file(schema_uri, &exists)); + if (exists) { + throw_if_not_ok(resources_.vfs().remove_file(schema_uri)); + } // Check if the array schema directory exists // If not create it, this is caused by a pre-v10 array bool schema_dir_exists = false; URI array_schema_dir_uri = array_schema->array_uri().join_path(constants::array_schema_dir_name); - RETURN_NOT_OK(vfs()->is_dir(array_schema_dir_uri, &schema_dir_exists)); + throw_if_not_ok( + resources_.vfs().is_dir(array_schema_dir_uri, &schema_dir_exists)); - if (!schema_dir_exists) - RETURN_NOT_OK(vfs()->create_dir(array_schema_dir_uri)); + if (!schema_dir_exists) { + throw_if_not_ok(resources_.vfs().create_dir(array_schema_dir_uri)); + } GenericTileIO::store_data(resources_, schema_uri, tile, encryption_key); @@ -892,11 +897,11 @@ Status StorageManagerCanonical::store_array_schema( bool enumerations_dir_exists = false; URI array_enumerations_dir_uri = array_schema_dir_uri.join_path(constants::array_enumerations_dir_name); - RETURN_NOT_OK( - vfs()->is_dir(array_enumerations_dir_uri, &enumerations_dir_exists)); + throw_if_not_ok(resources_.vfs().is_dir( + array_enumerations_dir_uri, &enumerations_dir_exists)); if (!enumerations_dir_exists) { - RETURN_NOT_OK(vfs()->create_dir(array_enumerations_dir_uri)); + throw_if_not_ok(resources_.vfs().create_dir(array_enumerations_dir_uri)); } // Serialize all enumerations into the `__enumerations` directory diff --git a/tiledb/sm/storage_manager/storage_manager_canonical.h b/tiledb/sm/storage_manager/storage_manager_canonical.h index 5b949d08908..0dad1293b5e 100644 --- a/tiledb/sm/storage_manager/storage_manager_canonical.h +++ b/tiledb/sm/storage_manager/storage_manager_canonical.h @@ -407,11 +407,6 @@ class StorageManagerCanonical { return resources_; } - /** Returns the virtual filesystem object. */ - [[nodiscard]] inline VFS* vfs() const { - return &(resources_.vfs()); - } - /** Returns the internal logger object. */ shared_ptr logger() const;