Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunrd0 committed Nov 22, 2023
1 parent 5a34f35 commit 8da19b7
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 101 deletions.
65 changes: 65 additions & 0 deletions test/src/unit-s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include <test/support/tdb_catch.h>
#include "test/support/src/helpers.h"
#include "test/support/src/vfs_helpers.h"
#include "tiledb/common/thread_pool.h"
#include "tiledb/sm/config/config.h"
#include "tiledb/sm/filesystem/s3.h"
Expand Down Expand Up @@ -554,4 +555,68 @@ TEST_CASE_METHOD(S3Fx, "Test S3 use Bucket/Object CannedACL", "[s3]") {
try_with_bucket_object_canned_acl("public_read_write", "public_read_write");
try_with_bucket_object_canned_acl("authenticated_read", "authenticated_read");
}

TEST_CASE(
"S3: S3Scanner iterator to populate vector", "[s3][ls-scan-iterator]") {
S3Test s3_test({10, 50});
bool recursive = true;
// 1000 is the default max_keys for S3. This is the same default used by
// S3Scanner. Testing with small max_keys validates the iterator handles batch
// collection and filtering appropriately.
int max_keys = GENERATE(1000, 10, 7);

DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") {
FileFilter file_filter;

SECTION("Reject all objects") {
file_filter = [](const std::string_view&, uint64_t) { return false; };
}

SECTION("Filter objects including 'test_file_1' in key") {
file_filter = [](const std::string_view& path, uint64_t) {
if (path.find("test_file_1") != std::string::npos) {
return true;
}
return false;
};
}

SECTION("Scan for a single object") {
file_filter = [](const std::string_view& path, uint64_t) {
if (path.find("test_file_50") != std::string::npos) {
return true;
}
return false;
};
}

// Filter expected results to apply file_filter.
LsObjects expected;
std::copy_if(
s3_test.expected_results_.begin(),
s3_test.expected_results_.end(),
std::back_inserter(expected),
[&file_filter](const auto& a) {
return file_filter(a.first, a.second);
});

auto scan = s3_test.get_s3().scanner(
s3_test.temp_dir_, file_filter, accept_all_dirs, recursive);
scan.set_max_keys(max_keys);
auto iter = scan.iterator();
std::vector<Aws::S3::Model::Object> results_vector(
iter.begin(), iter.end());
for (const auto& result : results_vector) {
CHECK(file_filter(result.GetKey(), result.GetSize()));
}

CHECK(results_vector.size() == expected.size());
for (size_t i = 0; i < expected.size(); i++) {
auto s3_object = results_vector[i];
auto full_uri = s3_test.temp_dir_.to_string() + "/" + s3_object.GetKey();
CHECK(full_uri == expected[i].first);
CHECK(static_cast<uint64_t>(s3_object.GetSize()) == expected[i].second);
}
}
}
#endif
77 changes: 30 additions & 47 deletions test/src/unit-vfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,31 +353,26 @@ TEST_CASE("VFS: test ls_with_sizes", "[vfs][ls-with-sizes]") {
REQUIRE(vfs_ls.remove_dir(URI(path)).ok());
}

// Currently only S3 is supported for VFS::ls_cb.
// Currently only S3 is supported for VFS::ls_recursive.
using TestBackends = std::tuple<S3Test>;
TEMPLATE_LIST_TEST_CASE(
"VFS: Test internal ls_filtered recursion argument",
"[vfs][ls_filtered]",
"[vfs][ls_filtered][recursion]",
TestBackends) {
TestType fs({10, 50});
if (!fs.is_supported()) {
return;
}

// TODO: Fails when recursive is false, because AWS prefix includes subdir_N/
bool recursive = GENERATE(true, false);
DYNAMIC_SECTION(
fs.temp_dir_.backend_name()
<< " ls_filtered with recursion: " << (recursive ? "true" : "false")) {
/*
* Temporary stub while common base class is added. The s3 accessor won't be
* needed, and can be converted to a virtual function call.
*/
#ifdef HAVE_S3
// If testing with recursion use the root directory, otherwise use a subdir.
auto path = recursive ? fs.temp_dir_ : fs.temp_dir_.join_path("subdir_1");
auto ls_objects = fs.s3().ls_filtered(
path, VFSTestBase::no_file_filter, accept_all_dirs, recursive);
auto ls_objects = fs.get_s3().ls_filtered(
path, VFSTestBase::accept_all_files, accept_all_dirs, recursive);

if (!recursive) {
// If non-recursive, all objects in the first directory should be
Expand All @@ -392,45 +387,10 @@ TEMPLATE_LIST_TEST_CASE(
}
}

TEST_CASE("VFS: S3ScanIterator to populate vector", "[vfs][ls_filtered]") {
S3Test s3_test({10, 50});
bool recursive = GENERATE(true, false);

DYNAMIC_SECTION(
"S3ScanIterator with recursion: " << (recursive ? "true" : "false")) {
#ifdef HAVE_S3
auto file_filter = [](const std::string_view& path, uint64_t) {
if (path.find("test_file_1") != std::string::npos) {
return true;
}
return false;
};

// If testing with recursion use the root directory, otherwise use a subdir.
auto path =
recursive ? s3_test.temp_dir_ : s3_test.temp_dir_.join_path("subdir_1");
auto scan =
s3_test.s3().scanner(path, file_filter, accept_all_dirs, recursive);
auto iter = scan.iterator();
std::vector<Aws::S3::Model::Object> results_vector(
iter.begin(), iter.end());
for (const auto& result : results_vector) {
CHECK(file_filter(result.GetKey(), 0));
}
if (recursive) {
CHECK(results_vector.size() == 13);
} else {
CHECK(results_vector.size() == 2);
}
#endif
}
}

TEST_CASE(
"VFS: ls_recursive throws for unsupported backends",
"[vfs][ls_recursive]") {
// Local and mem fs tests are in
// tiledb/sm/filesystem/test/unit_ls_filtered.cc
// Local and mem fs tests are in tiledb/sm/filesystem/test/unit_ls_filtered.cc
std::string prefix = GENERATE("s3://", "hdfs://", "azure://", "gcs://");
VFSTest vfs_test({1}, prefix);
if (!vfs_test.is_supported()) {
Expand All @@ -442,15 +402,38 @@ TEST_CASE(
if (vfs_test.temp_dir_.is_s3()) {
DYNAMIC_SECTION(backend << " supported backend should not throw") {
CHECK_NOTHROW(vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, VFSTestBase::no_file_filter));
vfs_test.temp_dir_, VFSTestBase::accept_all_files));
}
} else {
DYNAMIC_SECTION(backend << " unsupported backend should throw") {
CHECK_THROWS_WITH(
vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, VFSTestBase::no_file_filter),
vfs_test.temp_dir_, VFSTestBase::accept_all_files),
Catch::Matchers::ContainsSubstring(
"storage backend is not supported"));
}
}
}

TEST_CASE(
"VFS: Throwing FileFilter for ls_recursive",
"[vfs][ls_recursive][file-filter]") {
std::string prefix = "s3://";
VFSTest vfs_test({0}, prefix);
auto file_filter = [](const std::string_view&, uint64_t) -> bool {
throw std::logic_error("Throwing FileFilter");
};
SECTION("Throwing FileFilter with 0 objects should not throw") {
CHECK_NOTHROW(vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, file_filter, tiledb::sm::accept_all_dirs));
}
SECTION("Throwing FileFilter with N objects should throw") {
vfs_test.vfs_.touch(vfs_test.temp_dir_.join_path("file")).ok();
CHECK_THROWS_AS(
vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter),
std::logic_error);
CHECK_THROWS_WITH(
vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter),
Catch::Matchers::ContainsSubstring("Throwing FileFilter"));
}
}
17 changes: 15 additions & 2 deletions test/support/src/vfs_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ class VFSTestBase {
*/
static tiledb::sm::Config create_test_config();

static bool no_file_filter(const std::string_view&, uint64_t) {
/** FilePredicate for passing to ls_filtered that accepts all files. */
static bool accept_all_files(const std::string_view&, uint64_t) {
return true;
}

Expand Down Expand Up @@ -798,7 +799,7 @@ class VFSTest : public VFSTestBase {
};

/** Test object for tiledb::sm::S3 functionality. */
class S3Test : public VFSTestBase, public tiledb::sm::S3_within_VFS {
class S3Test : public VFSTestBase, protected tiledb::sm::S3_within_VFS {
public:
explicit S3Test(const std::vector<size_t>& test_tree)
: VFSTestBase(test_tree, "s3://")
Expand All @@ -819,6 +820,18 @@ class S3Test : public VFSTestBase, public tiledb::sm::S3_within_VFS {
}
#endif
}

#ifdef HAVE_S3
/** Expose protected accessor from S3_within_VFS. */
tiledb::sm::S3& get_s3() {
return s3();
}

/** Expose protected const accessor from S3_within_VFS. */
const tiledb::sm::S3& get_s3() const {
return s3();
}
#endif
};

/** Stub test object for tiledb::sm::Win and Posix functionality. */
Expand Down
11 changes: 1 addition & 10 deletions tiledb/sm/filesystem/filesystem_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,7 @@

namespace tiledb::sm {

class TempFilesystemBase {
public:
template <FilePredicate F, DirectoryPredicate D>
LsObjects ls_filtered(
const URI&, F, D = tiledb::sm::accept_all_dirs, bool = false) const {
return {};
}
};

class FilesystemBase : TempFilesystemBase {
class FilesystemBase {
public:
FilesystemBase() = default;

Expand Down
8 changes: 5 additions & 3 deletions tiledb/sm/filesystem/ls_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,19 @@ namespace tiledb::sm {
using FileFilter = std::function<bool(const std::string_view&, uint64_t)>;

using DirectoryFilter = std::function<bool(const std::string_view&)>;
/** Static DirectoryFilter used as default argument. */
[[maybe_unused]] static bool accept_all_dirs(const std::string_view&) {
return true;
}

/** Type defintion for objects returned from ls_recursive. */
using LsObjects = std::vector<std::pair<std::string, uint64_t>>;

/**
* LsScanIterator iterates over the results of a ListObjectsV2 request wrapped
* by deriving classes of LsScanner. See S3Scanner as an example.
*
* @tparam T The S3Scanner type that created this iterator.
* @tparam T The LsScanner type that created this iterator.
*/
template <class T, class U>
class LsScanIterator {
Expand Down Expand Up @@ -116,7 +118,7 @@ class LsScanIterator {
/**
* Dereference operator.
*
* @return The current S3 object from AWS ListObjects request.
* @return The current object being visited.
*/
reference operator*() {
return *ptr_;
Expand Down Expand Up @@ -162,7 +164,7 @@ class LsScanIterator {
/** Pointer to the scanner that created this iterator. */
T* scanner_;

/** Pointer to the current S3 object. */
/** Pointer to the current object. */
pointer ptr_;
};

Expand Down
49 changes: 37 additions & 12 deletions tiledb/sm/filesystem/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,22 @@ class S3Scanner : public LsScanner<F, D> {
return Iterator(this);
}

/**
* Set the maximum number of keys to fetch for each S3 ListObjects request.
*
* @param max_keys The maximum number of keys to fetch per request.
*/
inline void set_max_keys(int max_keys) {
list_objects_request_.SetMaxKeys(max_keys);
}

private:
/** Pointer to the S3 client initialized by VFS. */
shared_ptr<TileDBS3Client> client_;
/** Delimiter used for ListObjects request. */
std::string delimiter_;
/** Iterators for the current objects fetched from S3. */
std::vector<Aws::S3::Model::Object>::const_iterator begin_, end_;
typename Iterator::pointer begin_, end_;

/** The current request being scanned. */
Aws::S3::Model::ListObjectsV2Request list_objects_request_;
Expand All @@ -507,7 +516,7 @@ class S3Scanner : public LsScanner<F, D> {
* This class implements the various S3 filesystem functions. It also
* maintains buffer caches for writing into the various attribute files.
*/
class S3 : public TempFilesystemBase {
class S3 {
private:
/** Forward declaration */
struct MultiPartUploadState;
Expand Down Expand Up @@ -688,6 +697,18 @@ class S3 : public TempFilesystemBase {
return objects;
}

/**
* Constructs a scanner for listing S3 objects. The scanner can be used to
* retrieve an InputIterator for passing to algorithms such as `std::copy_if`
* or STL constructors supporting initialization via input iterators.
*
* @param parent The parent prefix to list sub-paths.
* @param f The FilePredicate to invoke on each object for filtering.
* @param d The DirectoryPredicate to invoke on each common prefix for
* pruning. This is currently unused, but is kept here for future support.
* @param recursive Whether to recursively list subdirectories.
* @return Fully constructed S3Scanner object.
*/
template <FilePredicate F, DirectoryPredicate D>
S3Scanner<F, D> scanner(
const URI& parent,
Expand Down Expand Up @@ -1508,28 +1529,32 @@ void S3Scanner<F, D>::next(typename Iterator::pointer& ptr) {
if (found_) {
found_ = false;
ptr++;
}

// If results were truncated, fetch more when we reach the end of this batch.
if (more_to_fetch() && ptr == end_) {
ptr = fetch_results();
// If results were truncated, fetch more when we reach the end of this
// batch.
if (more_to_fetch() && ptr == end_) {
ptr = fetch_results();
}
}

while (ptr != end_) {
auto object = *ptr;
uint64_t size = object.GetSize();
std::string path = "s3://" + list_objects_request_.GetBucket() +
S3::add_front_slash(object.GetKey());
if (!this->file_filter_(path, size)) {
ptr++;
} else {
if (this->file_filter_(path, size)) {
// iterator is at the next object within results accepted by the filters.
found_ = true;
return;
} else {
// Object was rejected by the FilePredicate, do not include it in results.
ptr++;

if (more_to_fetch() && ptr == end_) {
ptr = fetch_results();
}
}
if (more_to_fetch() && ptr == end_) {
ptr = fetch_results();
}

// TODO: Add support for directory pruning.
}
}
Expand Down
Loading

0 comments on commit 8da19b7

Please sign in to comment.