Skip to content

Commit

Permalink
Standalone 13/N: Integrate components into ZarrStream_s (#305)
Browse files Browse the repository at this point in the history
Depends on #304.
  • Loading branch information
aliddell authored Oct 4, 2024
1 parent aa59079 commit dc674fc
Show file tree
Hide file tree
Showing 33 changed files with 3,906 additions and 111 deletions.
4 changes: 4 additions & 0 deletions src/streaming/acquire.zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ extern "C"

void ZarrStream_destroy(struct ZarrStream_s* stream)
{
if (!finalize_stream(stream)) {
return;
}

delete stream;
}

Expand Down
31 changes: 23 additions & 8 deletions src/streaming/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ zarr::downsample(const ArrayWriterConfig& config,
shard_size_chunks };
}
}
downsampled_config.dimensions = std::make_unique<ArrayDimensions>(
downsampled_config.dimensions = std::make_shared<ArrayDimensions>(
std::move(downsampled_dims), config.dtype);

downsampled_config.level_of_detail = config.level_of_detail + 1;
Expand All @@ -76,17 +76,17 @@ zarr::downsample(const ArrayWriterConfig& config,
}

/// Writer
zarr::ArrayWriter::ArrayWriter(ArrayWriterConfig&& config,
zarr::ArrayWriter::ArrayWriter(const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool)
: ArrayWriter(std::move(config), thread_pool, nullptr)
{
}

zarr::ArrayWriter::ArrayWriter(
ArrayWriterConfig&& config,
const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
: config_{ std::move(config) }
: config_{ config }
, thread_pool_{ thread_pool }
, s3_connection_pool_{ s3_connection_pool }
, bytes_to_flush_{ 0 }
Expand All @@ -97,7 +97,7 @@ zarr::ArrayWriter::ArrayWriter(
}

size_t
zarr::ArrayWriter::write_frame(std::span<std::byte> data)
zarr::ArrayWriter::write_frame(std::span<const std::byte> data)
{
const auto nbytes_data = data.size();
const auto nbytes_frame =
Expand Down Expand Up @@ -239,12 +239,12 @@ zarr::ArrayWriter::make_buffers_() noexcept

for (auto& buf : chunk_buffers_) {
buf.resize(nbytes);
std::fill_n(buf.begin(), nbytes, std::byte(0));
std::fill(buf.begin(), buf.end(), std::byte(0));
}
}

size_t
zarr::ArrayWriter::write_frame_to_chunks_(std::span<std::byte> data)
zarr::ArrayWriter::write_frame_to_chunks_(std::span<const std::byte> data)
{
// break the frame into tiles and write them to the chunk buffers
const auto bytes_per_px = bytes_of_type(config_.dtype);
Expand Down Expand Up @@ -423,6 +423,11 @@ zarr::ArrayWriter::flush_()
void
zarr::ArrayWriter::close_sinks_()
{
for (auto i = 0; i < data_sinks_.size(); ++i) {
EXPECT(finalize_sink(std::move(data_sinks_[i])),
"Failed to finalize sink ",
i);
}
data_sinks_.clear();
}

Expand All @@ -438,14 +443,24 @@ zarr::ArrayWriter::rollover_()
bool
zarr::finalize_array(std::unique_ptr<ArrayWriter>&& writer)
{
if (writer == nullptr) {
LOG_INFO("Array writer is null. Nothing to finalize.");
return true;
}

writer->is_finalizing_ = true;
try {
writer->flush_();
writer->flush_(); // data sinks finalized here
} catch (const std::exception& exc) {
LOG_ERROR("Failed to finalize array writer: ", exc.what());
return false;
}

if (!finalize_sink(std::move(writer->metadata_sink_))) {
LOG_ERROR("Failed to finalize metadata sink");
return false;
}

writer.reset();
return true;
}
13 changes: 6 additions & 7 deletions src/streaming/array.writer.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "zarr.stream.hh"
#include "zarr.dimension.hh"
#include "thread.pool.hh"
#include "s3.connection.hh"
#include "blosc.compression.params.hh"
Expand All @@ -12,10 +12,9 @@
namespace fs = std::filesystem;

namespace zarr {

struct ArrayWriterConfig
{
std::unique_ptr<ArrayDimensions> dimensions;
std::shared_ptr<ArrayDimensions> dimensions;
ZarrDataType dtype;
int level_of_detail;
std::optional<std::string> bucket_name;
Expand All @@ -40,10 +39,10 @@ downsample(const ArrayWriterConfig& config,
class ArrayWriter
{
public:
ArrayWriter(ArrayWriterConfig&& config,
ArrayWriter(const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool);

ArrayWriter(ArrayWriterConfig&& config,
ArrayWriter(const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

Expand All @@ -54,7 +53,7 @@ class ArrayWriter
* @param data The frame data.
* @return The number of bytes written.
*/
[[nodiscard]] size_t write_frame(std::span<std::byte> data);
[[nodiscard]] size_t write_frame(std::span<const std::byte> data);

protected:
ArrayWriterConfig config_;
Expand Down Expand Up @@ -89,7 +88,7 @@ class ArrayWriter
bool should_flush_() const;
virtual bool should_rollover_() const = 0;

size_t write_frame_to_chunks_(std::span<std::byte> data);
size_t write_frame_to_chunks_(std::span<const std::byte> data);
void compress_buffers_();

void flush_();
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/file.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ zarr::FileSink::FileSink(std::string_view filename)
}

bool
zarr::FileSink::write(size_t offset, std::span<std::byte> data)
zarr::FileSink::write(size_t offset, std::span<const std::byte> data)
{
const auto bytes_of_buf = data.size();
if (data.data() == nullptr || bytes_of_buf == 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/file.sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FileSink : public Sink
public:
explicit FileSink(std::string_view filename);

bool write(size_t offset, std::span<std::byte> data) override;
bool write(size_t offset, std::span<const std::byte> data) override;

protected:
bool flush_() override;
Expand Down
4 changes: 2 additions & 2 deletions src/streaming/s3.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ zarr::S3Sink::flush_()
}

bool
zarr::S3Sink::write(size_t offset, std::span<std::byte> data)
zarr::S3Sink::write(size_t offset, std::span<const std::byte> data)
{
if (data.data() == nullptr || data.empty()) {
return true;
Expand All @@ -66,7 +66,7 @@ zarr::S3Sink::write(size_t offset, std::span<std::byte> data)
nbytes_buffered_ = offset - nbytes_flushed_;

size_t bytes_of_data = data.size();
std::byte* data_ptr = data.data();
const std::byte* data_ptr = data.data();
while (bytes_of_data > 0) {
const auto bytes_to_write =
std::min(bytes_of_data, part_buffer_.size() - nbytes_buffered_);
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/s3.sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class S3Sink : public Sink
std::string_view object_key,
std::shared_ptr<S3ConnectionPool> connection_pool);

bool write(size_t offset, std::span<std::byte> data) override;
bool write(size_t offset, std::span<const std::byte> data) override;

protected:
bool flush_() override;
Expand Down
6 changes: 6 additions & 0 deletions src/streaming/sink.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
#include "sink.hh"
#include "macros.hh"

bool
zarr::finalize_sink(std::unique_ptr<zarr::Sink>&& sink)
{
if (sink == nullptr) {
LOG_INFO("Sink is null. Nothing to finalize.");
return true;
}

if (!sink->flush_()) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Sink
* @return True if the write was successful, false otherwise.
*/
[[nodiscard]] virtual bool write(size_t offset,
std::span<std::byte> buf) = 0;
std::span<const std::byte> buf) = 0;

protected:
[[nodiscard]] virtual bool flush_() = 0;
Expand Down
Loading

0 comments on commit dc674fc

Please sign in to comment.