Skip to content

Commit

Permalink
Remove StorageManager::vfs() (#4982)
Browse files Browse the repository at this point in the history
Remove `StorageManager::vfs()`

[sc-47537]

---
TYPE: NO_HISTORY
DESC: Remove `StorageManager::vfs()`.
  • Loading branch information
bekadavis9 authored May 16, 2024
1 parent 93dab0a commit 372507c
Show file tree
Hide file tree
Showing 20 changed files with 148 additions and 167 deletions.
5 changes: 3 additions & 2 deletions tiledb/api/c_api/context/context_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2022-2023 TileDB, Inc.
* @copyright Copyright (c) 2022-2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -108,7 +108,8 @@ capi_return_t tiledb_ctx_get_last_error(
capi_return_t tiledb_ctx_is_supported_fs(
tiledb_ctx_t* ctx, tiledb_filesystem_t fs, int32_t* is_supported) {
ensure_output_pointer_is_valid(is_supported);
*is_supported = (int32_t)ctx->storage_manager()->vfs()->supports_fs(

*is_supported = (int32_t)ctx->context().resources().vfs().supports_fs(
static_cast<tiledb::sm::Filesystem>(fs));
return TILEDB_OK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class StorageManagerStub {
inline common::ThreadPool* io_tp() {
return &resources_.io_tp();
}
inline VFS* vfs() {
throw std::logic_error("StorageManagerStub does not instantiate a VFS");
}
inline Status cancel_all_tasks() {
return Status{};
};
Expand Down
8 changes: 4 additions & 4 deletions tiledb/sm/array/test/unit_consistency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2022 TileDB, Inc.
* @copyright Copyright (c) 2022-2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -186,7 +186,7 @@ TEST_CASE(
REQUIRE(x.is_open(uri) == false);

// Clean up
REQUIRE(sm.vfs()->remove_dir(uri).ok());
REQUIRE(resources.vfs().remove_dir(uri).ok());
}

TEST_CASE(
Expand Down Expand Up @@ -227,7 +227,7 @@ TEST_CASE(

// Clean up
for (auto uri : uris) {
REQUIRE(sm.vfs()->remove_dir(uri).ok());
REQUIRE(resources.vfs().remove_dir(uri).ok());
}
}

Expand Down Expand Up @@ -283,5 +283,5 @@ TEST_CASE(
REQUIRE(array.get()->close().ok());
REQUIRE(x.registry_size() == 0);
REQUIRE(x.is_open(uri) == false);
REQUIRE(sm.vfs()->remove_dir(uri).ok());
REQUIRE(resources.vfs().remove_dir(uri).ok());
}
16 changes: 6 additions & 10 deletions tiledb/sm/consolidator/array_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ Status ArrayMetaConsolidator::consolidate(
throw_if_not_ok(array_for_writes.close());

// Write vacuum file
RETURN_NOT_OK(
storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size()));
throw_if_not_ok(resources_.vfs().close_file(vac_uri));

return Status::Ok();
}
Expand All @@ -126,18 +125,15 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) {
}

// Get the array metadata URIs and vacuum file URIs to be vacuum
auto vfs = storage_manager_->vfs();
auto& vfs = resources_.vfs();
auto compute_tp = storage_manager_->compute_tp();

auto array_dir = ArrayDirectory(
storage_manager_->resources(),
URI(array_name),
0,
std::numeric_limits<uint64_t>::max());
resources_, URI(array_name), 0, std::numeric_limits<uint64_t>::max());

// Delete the array metadata and vacuum files
vfs->remove_files(compute_tp, array_dir.array_meta_uris_to_vacuum());
vfs->remove_files(compute_tp, array_dir.array_meta_vac_uris_to_vacuum());
vfs.remove_files(compute_tp, array_dir.array_meta_uris_to_vacuum());
vfs.remove_files(compute_tp, array_dir.array_meta_vac_uris_to_vacuum());
}

/* ****************************** */
Expand Down
15 changes: 7 additions & 8 deletions tiledb/sm/consolidator/commits_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2022 TileDB, Inc.
* @copyright Copyright (c) 2022-2024 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -82,7 +82,7 @@ Status CommitsConsolidator::consolidate(

// Get the array uri to consolidate from the array directory.
auto array_dir = ArrayDirectory(
storage_manager_->resources(),
resources_,
URI(array_name),
0,
utils::time::timestamp_now_ms(),
Expand All @@ -96,7 +96,7 @@ Status CommitsConsolidator::consolidate(
// Get the file name.
auto& to_consolidate = array_dir.commit_uris_to_consolidate();
Consolidator::write_consolidated_commits_file(
write_version, array_dir, to_consolidate, storage_manager_);
write_version, array_dir, to_consolidate, resources_);

return Status::Ok();
}
Expand All @@ -109,18 +109,17 @@ void CommitsConsolidator::vacuum(const char* array_name) {

// Get the array metadata URIs and vacuum file URIs to be vacuum
ArrayDirectory array_dir(
storage_manager_->resources(),
resources_,
URI(array_name),
0,
utils::time::timestamp_now_ms(),
ArrayDirectoryMode::COMMITS);

// Delete the commits and vacuum files
auto vfs = storage_manager_->vfs();
auto& vfs = resources_.vfs();
auto compute_tp = storage_manager_->compute_tp();
vfs->remove_files(compute_tp, array_dir.commit_uris_to_vacuum());
vfs->remove_files(
compute_tp, array_dir.consolidated_commits_uris_to_vacuum());
vfs.remove_files(compute_tp, array_dir.commit_uris_to_vacuum());
vfs.remove_files(compute_tp, array_dir.consolidated_commits_uris_to_vacuum());
}

} // namespace tiledb::sm
12 changes: 6 additions & 6 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void Consolidator::write_consolidated_commits_file(
format_version_t write_version,
ArrayDirectory array_dir,
const std::vector<URI>& commit_uris,
StorageManager* storage_manager) {
ContextResources& resources) {
// Compute the file name.
auto name = storage_format::generate_consolidated_fragment_name(
commit_uris.front(), commit_uris.back(), write_version);
Expand All @@ -274,7 +274,7 @@ void Consolidator::write_consolidated_commits_file(
// the size variable.
if (stdx::string::ends_with(
uri.to_string(), constants::delete_file_suffix)) {
throw_if_not_ok(storage_manager->vfs()->file_size(uri, &file_sizes[i]));
throw_if_not_ok(resources.vfs().file_size(uri, &file_sizes[i]));
total_size += file_sizes[i];
total_size += sizeof(storage_size_t);
}
Expand All @@ -295,8 +295,8 @@ void Consolidator::write_consolidated_commits_file(
uri.to_string(), constants::delete_file_suffix)) {
memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t));
file_index += sizeof(storage_size_t);
throw_if_not_ok(storage_manager->vfs()->read(
uri, 0, &data[file_index], file_sizes[i]));
throw_if_not_ok(
resources.vfs().read(uri, 0, &data[file_index], file_sizes[i]));
file_index += file_sizes[i];
}
}
Expand All @@ -305,9 +305,9 @@ void Consolidator::write_consolidated_commits_file(
URI consolidated_commits_uri =
array_dir.get_commits_dir(write_version)
.join_path(name + constants::con_commits_file_suffix);
throw_if_not_ok(storage_manager->vfs()->write(
throw_if_not_ok(resources.vfs().write(
consolidated_commits_uri, data.data(), data.size()));
throw_if_not_ok(storage_manager->vfs()->close_file(consolidated_commits_uri));
throw_if_not_ok(resources.vfs().close_file(consolidated_commits_uri));
}

void Consolidator::array_vacuum(
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/consolidator/consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ class Consolidator {
* @param write_version Write version.
* @param array_dir ArrayDirectory where the data is stored.
* @param commit_uris Commit files to include.
* @param storage_manager The storage manager.
* @param resources The context resources.
*/
static void write_consolidated_commits_file(
format_version_t write_version,
ArrayDirectory array_dir,
const std::vector<URI>& commit_uris,
StorageManager* storage_manager);
ContextResources& resources);

/**
* Cleans up the array, such as its consolidated fragments and array
Expand Down
27 changes: 12 additions & 15 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ void FragmentConsolidator::vacuum(const char* array_name) {
array_dir.write_commit_ignore_file(commit_uris_to_ignore);
}

auto vfs = storage_manager_->vfs();
auto& vfs = resources_.vfs();
auto compute_tp = storage_manager_->compute_tp();

// Delete fragment directories
Expand All @@ -455,22 +455,22 @@ void FragmentConsolidator::vacuum(const char* array_name) {
// Remove the commit file, if present.
auto commit_uri = array_dir.get_commit_uri(fragment_uris_to_vacuum[i]);
bool is_file = false;
RETURN_NOT_OK(vfs->is_file(commit_uri, &is_file));
throw_if_not_ok(vfs.is_file(commit_uri, &is_file));
if (is_file) {
RETURN_NOT_OK(vfs->remove_file(commit_uri));
throw_if_not_ok(vfs.remove_file(commit_uri));
}

bool is_dir = false;
RETURN_NOT_OK(vfs->is_dir(fragment_uris_to_vacuum[i], &is_dir));
throw_if_not_ok(vfs.is_dir(fragment_uris_to_vacuum[i], &is_dir));
if (is_dir) {
RETURN_NOT_OK(vfs->remove_dir(fragment_uris_to_vacuum[i]));
throw_if_not_ok(vfs.remove_dir(fragment_uris_to_vacuum[i]));
}

return Status::Ok();
}));

// Delete the vacuum files.
vfs->remove_files(
vfs.remove_files(
compute_tp, filtered_fragment_uris.fragment_vac_uris_to_vacuum());
}

Expand Down Expand Up @@ -585,10 +585,9 @@ Status FragmentConsolidator::consolidate_internal(
auto st = query_w->finalize();
if (!st.ok()) {
bool is_dir = false;
throw_if_not_ok(
storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir));
throw_if_not_ok(resources_.vfs().is_dir(*new_fragment_uri, &is_dir));
if (is_dir)
throw_if_not_ok(storage_manager_->vfs()->remove_dir(*new_fragment_uri));
throw_if_not_ok(resources_.vfs().remove_dir(*new_fragment_uri));
return st;
}

Expand All @@ -600,10 +599,9 @@ Status FragmentConsolidator::consolidate_internal(
to_consolidate);
if (!st.ok()) {
bool is_dir = false;
throw_if_not_ok(
storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir));
throw_if_not_ok(resources_.vfs().is_dir(*new_fragment_uri, &is_dir));
if (is_dir)
throw_if_not_ok(storage_manager_->vfs()->remove_dir(*new_fragment_uri));
throw_if_not_ok(resources_.vfs().remove_dir(*new_fragment_uri));
return st;
}

Expand Down Expand Up @@ -1016,9 +1014,8 @@ Status FragmentConsolidator::write_vacuum_file(
}

auto data = ss.str();
RETURN_NOT_OK(
storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size()));
throw_if_not_ok(resources_.vfs().close_file(vac_uri));

return Status::Ok();
}
Expand Down
13 changes: 7 additions & 6 deletions tiledb/sm/consolidator/fragment_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status FragmentMetaConsolidator::consolidate(
first, last, write_version);

auto frag_md_uri = array_dir.get_fragment_metadata_dir(write_version);
RETURN_NOT_OK(storage_manager_->vfs()->create_dir(frag_md_uri));
throw_if_not_ok(resources_.vfs().create_dir(frag_md_uri));
uri = URI(frag_md_uri.to_string() + name + constants::meta_file_suffix);

// Get the consolidated fragment metadata version
Expand Down Expand Up @@ -171,10 +171,10 @@ Status FragmentMetaConsolidator::consolidate(
EncryptionKey enc_key;
RETURN_NOT_OK(enc_key.set_key(encryption_type, encryption_key, key_length));

GenericTileIO tile_io(storage_manager_->resources(), uri);
GenericTileIO tile_io(resources_, uri);
[[maybe_unused]] uint64_t nbytes = 0;
tile_io.write_generic(tile, enc_key, &nbytes);
RETURN_NOT_OK(storage_manager_->vfs()->close_file(uri));
throw_if_not_ok(resources_.vfs().close_file(uri));

return Status::Ok();
}
Expand Down Expand Up @@ -202,7 +202,7 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) {
}
}

auto vfs = storage_manager_->vfs();
auto& vfs = resources_.vfs();
auto compute_tp = storage_manager_->compute_tp();

// Vacuum
Expand All @@ -211,8 +211,9 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) {
auto& uri = fragment_meta_uris[i];
FragmentID fragment_id{uri};
auto timestamp_range{fragment_id.timestamp_range()};
if (timestamp_range.second != t_latest)
RETURN_NOT_OK(vfs->remove_file(uri));
if (timestamp_range.second != t_latest) {
throw_if_not_ok(vfs.remove_file(uri));
}
return Status::Ok();
}));
}
Expand Down
13 changes: 6 additions & 7 deletions tiledb/sm/consolidator/group_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ Status GroupMetaConsolidator::consolidate(
throw_if_not_ok(group_for_writes.close());

// Write vacuum file
RETURN_NOT_OK(
storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
throw_if_not_ok(resources_.vfs().write(vac_uri, data.c_str(), data.size()));
throw_if_not_ok(resources_.vfs().close_file(vac_uri));

return Status::Ok();
}
Expand All @@ -113,12 +112,12 @@ void GroupMetaConsolidator::vacuum(const char* group_name) {
}

// Get the group metadata URIs and vacuum file URIs to be vacuumed
auto vfs = storage_manager_->vfs();
auto& vfs = resources_.vfs();
auto compute_tp = storage_manager_->compute_tp();
GroupDirectory group_dir;
try {
group_dir = GroupDirectory(
vfs,
&vfs,
compute_tp,
URI(group_name),
0,
Expand All @@ -128,8 +127,8 @@ void GroupMetaConsolidator::vacuum(const char* group_name) {
}

// Delete the group metadata and vacuum files
vfs->remove_files(compute_tp, group_dir.group_meta_uris_to_vacuum());
vfs->remove_files(compute_tp, group_dir.group_meta_vac_uris_to_vacuum());
vfs.remove_files(compute_tp, group_dir.group_meta_uris_to_vacuum());
vfs.remove_files(compute_tp, group_dir.group_meta_vac_uris_to_vacuum());
}

/* ****************************** */
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/group/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Status Group::open(
try {
group_dir_ = make_shared<GroupDirectory>(
HERE(),
storage_manager_->vfs(),
&resources_.vfs(),
storage_manager_->compute_tp(),
group_uri_,
timestamp_start,
Expand All @@ -177,7 +177,7 @@ Status Group::open(
try {
group_dir_ = make_shared<GroupDirectory>(
HERE(),
storage_manager_->vfs(),
&resources_.vfs(),
storage_manager_->compute_tp(),
group_uri_,
timestamp_start,
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ Status DeletesAndUpdates::dowork() {
// Create the commit URI if needed.
auto& array_dir = array_->array_directory();
auto commit_uri = array_dir.get_commits_dir(write_version);
RETURN_NOT_OK(storage_manager_->vfs()->create_dir(commit_uri));
throw_if_not_ok(resources_.vfs().create_dir(commit_uri));

// Serialize the negated condition (aud update values if they are not empty)
// and write to disk.
Expand Down
Loading

0 comments on commit 372507c

Please sign in to comment.