Skip to content

Commit

Permalink
Add S3ConnectionPool to Zarr and Writer classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Jul 6, 2024
1 parent 1a544dd commit 9fc300b
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 32 deletions.
9 changes: 6 additions & 3 deletions src/writers/sink.creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
namespace zarr = acquire::sink::zarr;
namespace common = zarr::common;

zarr::SinkCreator::SinkCreator(std::shared_ptr<common::ThreadPool> thread_pool_)
zarr::SinkCreator::SinkCreator(
std::shared_ptr<common::ThreadPool> thread_pool_,
std::shared_ptr<common::S3ConnectionPool> connection_pool)
: thread_pool_{ thread_pool_ }
, connection_pool_{ connection_pool }
{
}

Expand Down Expand Up @@ -291,7 +294,7 @@ extern "C"
auto thread_pool = std::make_shared<common::ThreadPool>(
std::thread::hardware_concurrency(),
[](const std::string& err) { LOGE("Error: %s\n", err.c_str()); });
zarr::SinkCreator creator{ thread_pool };
zarr::SinkCreator creator{ thread_pool, nullptr };

std::vector<zarr::Dimension> dims;
dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks
Expand Down Expand Up @@ -337,7 +340,7 @@ extern "C"
auto thread_pool = std::make_shared<common::ThreadPool>(
std::thread::hardware_concurrency(),
[](const std::string& err) { LOGE("Error: %s", err.c_str()); });
zarr::SinkCreator creator{ thread_pool };
zarr::SinkCreator creator{ thread_pool, nullptr };

std::vector<zarr::Dimension> dims;
dims.emplace_back(
Expand Down
5 changes: 4 additions & 1 deletion src/writers/sink.creator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "sink.hh"
#include "common/dimension.hh"
#include "common/thread.pool.hh"
#include "common/s3.connection.hh"

#include <optional>
#include <memory>
Expand All @@ -13,7 +14,8 @@ struct SinkCreator final
{
public:
SinkCreator() = delete;
explicit SinkCreator(std::shared_ptr<common::ThreadPool> thread_pool);
SinkCreator(std::shared_ptr<common::ThreadPool> thread_pool_,
std::shared_ptr<common::S3ConnectionPool> connection_pool);
~SinkCreator() noexcept = default;

/// @brief Create a collection of data sinks, either chunk or shard.
Expand Down Expand Up @@ -48,6 +50,7 @@ struct SinkCreator final

private:
std::shared_ptr<common::ThreadPool> thread_pool_;
std::shared_ptr<common::S3ConnectionPool> connection_pool_; // could be null

/// @brief Parallel create a collection of directories.
/// @param[in] dir_paths The directories to create.
Expand Down
8 changes: 5 additions & 3 deletions src/writers/writer.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#include <stdexcept>
#include "writer.hh"
#include "common/utilities.hh"

#include <cmath>
#include <functional>
#include <latch>
#include <stdexcept>

namespace zarr = acquire::sink::zarr;

Expand Down Expand Up @@ -177,9 +177,11 @@ zarr::downsample(const WriterConfig& config, WriterConfig& downsampled_config)

/// Writer
zarr::Writer::Writer(const WriterConfig& config,
std::shared_ptr<common::ThreadPool> thread_pool)
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool)
: config_{ config }
, thread_pool_{ thread_pool }
, connection_pool_{ connection_pool }
, bytes_to_flush_{ 0 }
, frames_written_{ 0 }
, append_chunk_index_{ 0 }
Expand Down Expand Up @@ -463,7 +465,7 @@ class TestWriter : public zarr::Writer
public:
TestWriter(const zarr::WriterConfig& array_spec,
std::shared_ptr<common::ThreadPool> thread_pool)
: zarr::Writer(array_spec, thread_pool)
: zarr::Writer(array_spec, thread_pool, nullptr)
{
}

Expand Down
6 changes: 5 additions & 1 deletion src/writers/writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "common/dimension.hh"
#include "common/thread.pool.hh"
#include "common/s3.connection.hh"
#include "blosc.compressor.hh"
#include "file.sink.hh"

Expand Down Expand Up @@ -40,7 +41,8 @@ struct Writer
public:
Writer() = delete;
Writer(const WriterConfig& config,
std::shared_ptr<common::ThreadPool> thread_pool);
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool);

virtual ~Writer() noexcept = default;

Expand Down Expand Up @@ -71,6 +73,8 @@ struct Writer
uint32_t append_chunk_index_;
bool is_finalizing_;

std::shared_ptr<common::S3ConnectionPool> connection_pool_;

void make_buffers_() noexcept;
void validate_frame_(const VideoFrame* frame);
size_t write_frame_to_chunks_(const uint8_t* buf, size_t buf_size);
Expand Down
16 changes: 10 additions & 6 deletions src/writers/zarrv2.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace zarr = acquire::sink::zarr;

zarr::ZarrV2Writer::ZarrV2Writer(
const WriterConfig& config,
std::shared_ptr<common::ThreadPool> thread_pool)
: Writer(config, thread_pool)
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool)
: Writer(config, thread_pool, connection_pool)
{
}

Expand All @@ -24,7 +25,7 @@ zarr::ZarrV2Writer::flush_impl_()
(fs::path(data_root_) / std::to_string(append_chunk_index_)).string();

{
SinkCreator creator(thread_pool_);
SinkCreator creator(thread_pool_, connection_pool_);
if (!creator.make_data_sinks(data_root,
config_.dimensions,
common::chunks_along_dimension,
Expand Down Expand Up @@ -124,7 +125,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV2Writer writer(config, thread_pool);
zarr::ZarrV2Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

const size_t frame_size = 64 * 48 * 2;

Expand Down Expand Up @@ -240,7 +242,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV2Writer writer(config, thread_pool);
zarr::ZarrV2Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48);
frame->bytes_of_frame =
Expand Down Expand Up @@ -339,7 +342,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV2Writer writer(config, thread_pool);
zarr::ZarrV2Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48);
frame->bytes_of_frame =
Expand Down
3 changes: 2 additions & 1 deletion src/writers/zarrv2.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ struct ZarrV2Writer final : public Writer
public:
ZarrV2Writer() = delete;
ZarrV2Writer(const WriterConfig& config,
std::shared_ptr<common::ThreadPool> thread_pool);
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool);

~ZarrV2Writer() override = default;

Expand Down
16 changes: 10 additions & 6 deletions src/writers/zarrv3.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ namespace zarr = acquire::sink::zarr;

zarr::ZarrV3Writer::ZarrV3Writer(
const WriterConfig& array_spec,
std::shared_ptr<common::ThreadPool> thread_pool)
: Writer(array_spec, thread_pool)
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool)
: Writer(array_spec, thread_pool, connection_pool)
, shard_file_offsets_(common::number_of_shards(array_spec.dimensions), 0)
, shard_tables_{ common::number_of_shards(array_spec.dimensions) }
{
Expand All @@ -33,7 +34,7 @@ zarr::ZarrV3Writer::flush_impl_()
.string();

{
SinkCreator creator(thread_pool_);
SinkCreator creator(thread_pool_, connection_pool_);
if (sinks_.empty() &&
!creator.make_data_sinks(data_root,
config_.dimensions,
Expand Down Expand Up @@ -208,7 +209,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV3Writer writer(config, thread_pool);
zarr::ZarrV3Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

const size_t frame_size = 64 * 48 * 2;

Expand Down Expand Up @@ -345,7 +347,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV3Writer writer(config, thread_pool);
zarr::ZarrV3Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48);
frame->bytes_of_frame =
Expand Down Expand Up @@ -466,7 +469,8 @@ extern "C"
.compression_params = std::nullopt,
};

zarr::ZarrV3Writer writer(config, thread_pool);
zarr::ZarrV3Writer writer(
config, thread_pool, std::shared_ptr<common::S3ConnectionPool>());

frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48);
frame->bytes_of_frame =
Expand Down
3 changes: 2 additions & 1 deletion src/writers/zarrv3.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ struct ZarrV3Writer final : public Writer
public:
ZarrV3Writer() = delete;
ZarrV3Writer(const WriterConfig& array_spec,
std::shared_ptr<common::ThreadPool> thread_pool);
std::shared_ptr<common::ThreadPool> thread_pool,
std::shared_ptr<common::S3ConnectionPool> connection_pool);

~ZarrV3Writer() override = default;

Expand Down
8 changes: 6 additions & 2 deletions src/zarr.hh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "common/utilities.hh"
#include "common/thread.pool.hh"
#include "common/s3.connection.hh"
#include "writers/writer.hh"
#include "writers/blosc.compressor.hh"

Expand Down Expand Up @@ -57,11 +58,14 @@ struct Zarr : public Storage
// scaled frames, keyed by level-of-detail
std::unordered_map<int, std::optional<VideoFrame*>> scaled_frames_;

// changes on flush
/// changes on start
std::shared_ptr<common::ThreadPool> thread_pool_;
std::shared_ptr<common::S3ConnectionPool> connection_pool_;

/// changes on flush
std::vector<std::unique_ptr<Sink>> metadata_sinks_;

/// Multithreading
std::shared_ptr<common::ThreadPool> thread_pool_;
mutable std::mutex mutex_; // for error_ / error_msg_

/// Error state
Expand Down
9 changes: 5 additions & 4 deletions src/zarr.v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ zarr::ZarrV2::allocate_writers_()
.compression_params = blosc_compression_params_,
};

writers_.push_back(std::make_shared<ZarrV2Writer>(config, thread_pool_));
writers_.push_back(
std::make_shared<ZarrV2Writer>(config, thread_pool_, connection_pool_));

if (enable_multiscale_) {
WriterConfig downsampled_config;
Expand All @@ -59,8 +60,8 @@ zarr::ZarrV2::allocate_writers_()
int level = 1;
while (do_downsample) {
do_downsample = downsample(config, downsampled_config);
writers_.push_back(
std::make_shared<ZarrV2Writer>(downsampled_config, thread_pool_));
writers_.push_back(std::make_shared<ZarrV2Writer>(
downsampled_config, thread_pool_, connection_pool_));
scaled_frames_.emplace(level++, std::nullopt);

config = std::move(downsampled_config);
Expand All @@ -72,7 +73,7 @@ zarr::ZarrV2::allocate_writers_()
void
zarr::ZarrV2::make_metadata_sinks_()
{
SinkCreator creator(thread_pool_);
SinkCreator creator(thread_pool_, connection_pool_);
CHECK(creator.create_v2_metadata_sinks(
dataset_root_, writers_.size(), metadata_sinks_));
}
Expand Down
9 changes: 5 additions & 4 deletions src/zarr.v3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ zarr::ZarrV3::allocate_writers_()
.compression_params = blosc_compression_params_,
};

writers_.push_back(std::make_shared<ZarrV3Writer>(config, thread_pool_));
writers_.push_back(
std::make_shared<ZarrV3Writer>(config, thread_pool_, connection_pool_));

if (enable_multiscale_) {
WriterConfig downsampled_config;
Expand All @@ -52,8 +53,8 @@ zarr::ZarrV3::allocate_writers_()
int level = 1;
while (do_downsample) {
do_downsample = downsample(config, downsampled_config);
writers_.push_back(
std::make_shared<ZarrV3Writer>(downsampled_config, thread_pool_));
writers_.push_back(std::make_shared<ZarrV3Writer>(
downsampled_config, thread_pool_, connection_pool_));
scaled_frames_.emplace(level++, std::nullopt);

config = std::move(downsampled_config);
Expand All @@ -73,7 +74,7 @@ zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const
void
zarr::ZarrV3::make_metadata_sinks_()
{
SinkCreator creator(thread_pool_);
SinkCreator creator(thread_pool_, connection_pool_);
CHECK(creator.create_v3_metadata_sinks(
dataset_root_, writers_.size(), metadata_sinks_));
}
Expand Down

0 comments on commit 9fc300b

Please sign in to comment.