Skip to content

Commit

Permalink
Remove ArrayWriter::finalize().
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Aug 30, 2024
1 parent c40a0c4 commit 0fc4799
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 65 deletions.
22 changes: 12 additions & 10 deletions src/internal/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ zarr::ArrayWriter::ArrayWriter(
size_t
zarr::ArrayWriter::write_frame(const uint8_t* data, size_t nbytes)
{
if (bytes_of_frame(config_.dimensions, config_.dtype) != nbytes) {
const size_t nbytes_frame =
bytes_of_frame(config_.dimensions, config_.dtype);

if (nbytes_frame != nbytes) {
LOG_WARNING("Frame size mismatch: expected %zu, got %zu. Skipping",
nbytes_frame,
nbytes);
return 0;
}

Expand All @@ -98,6 +104,9 @@ zarr::ArrayWriter::write_frame(const uint8_t* data, size_t nbytes)
// split the incoming frame into tiles and write_frame them to the chunk
// buffers
const auto bytes_written = write_frame_to_chunks_(data, nbytes);
EXPECT(bytes_written == nbytes, "Failed to write_frame frame to chunks");

LOG_DEBUG("Wrote %zu bytes of frame %zu", bytes_written, frames_written_);
bytes_to_flush_ += bytes_written;
++frames_written_;

Expand All @@ -108,15 +117,6 @@ zarr::ArrayWriter::write_frame(const uint8_t* data, size_t nbytes)
return bytes_written;
}

void
zarr::ArrayWriter::finalize()
{
is_finalizing_ = true;
flush_();
close_sinks_();
is_finalizing_ = false;
}

bool
zarr::ArrayWriter::make_data_sinks_()
{
Expand Down Expand Up @@ -207,6 +207,8 @@ zarr::ArrayWriter::make_metadata_sink_()
void
zarr::ArrayWriter::make_buffers_() noexcept
{
LOG_DEBUG("Creating chunk buffers");

const size_t n_chunks = number_of_chunks_in_memory(config_.dimensions);
chunk_buffers_.resize(n_chunks); // no-op if already the correct size

Expand Down
13 changes: 9 additions & 4 deletions src/internal/array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ class ArrayWriter
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

virtual ~ArrayWriter() noexcept = default;

virtual ~ArrayWriter() = default;

/**
* @brief Write a frame to the array.
* @param data The frame data.
* @param nbytes The number of bytes in the frame.
* @return The number of bytes written.
*/
[[nodiscard]] size_t write_frame(const uint8_t* data, size_t nbytes);
void finalize();

protected:
ArrayWriterConfig config_;
Expand Down Expand Up @@ -88,4 +93,4 @@ class ArrayWriter

void close_sinks_();
};
} // namespace acquire::sink::zarr
} // namespace zarr
2 changes: 1 addition & 1 deletion src/internal/s3.sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ class S3Sink final : public Sink
/// otherwise false.
[[nodiscard]] bool finalize_multipart_upload_();
};
} // namespace acquire::sink::zarr
} // namespace zarr
2 changes: 1 addition & 1 deletion src/internal/sink.creator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,4 @@ class SinkCreator final
std::vector<std::string>& object_keys,
std::unordered_map<std::string, std::unique_ptr<Sink>>& sinks);
};
} // namespace acquire::sink::zarr
} // namespace zarr
3 changes: 1 addition & 2 deletions src/internal/stream.settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ trim(const char* s, size_t bytes_of_s)

return trimmed;
}
} // namespace

} end ::{anonymous} namespace
/* Create and destroy */
ZarrStreamSettings*
ZarrStreamSettings_create()
Expand Down
7 changes: 2 additions & 5 deletions src/internal/zarr.stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ validate_settings(const struct ZarrStreamSettings_s* settings,

return true;
}
} // namespace

} end ::{anonymous} namespace
ZarrStream*
ZarrStream_create(struct ZarrStreamSettings_s* settings, ZarrVersion version)
{
Expand Down Expand Up @@ -401,9 +400,7 @@ ZarrStream_s::~ZarrStream_s()
write_group_metadata_();
metadata_sinks_.clear();

for (auto& writer : writers_) {
writer->finalize();
}
writers_.clear(); // flush before shutting down thread pool
thread_pool_->await_stop();
} catch (const std::exception& e) {
LOG_ERROR("Error finalizing Zarr stream: %s", e.what());
Expand Down
15 changes: 13 additions & 2 deletions src/internal/zarrv2.array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ sample_type_to_dtype(ZarrDataType t)
std::to_string(static_cast<int>(t)));
}
}
} // namespace

} end ::{anonymous} namespace
zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
const ArrayWriterConfig& config,
std::shared_ptr<ThreadPool> thread_pool,
Expand All @@ -54,6 +53,18 @@ zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
{
}

zarr::ZarrV2ArrayWriter::~ZarrV2ArrayWriter()
{
is_finalizing_ = true;
try {
flush_();
} catch (const std::exception& exc) {
LOG_ERROR("Failed to finalize array writer: %s", exc.what());
} catch (...) {
LOG_ERROR("Failed to finalize array writer: (unknown)");
}
}

ZarrVersion
zarr::ZarrV2ArrayWriter::version_() const
{
Expand Down
4 changes: 2 additions & 2 deletions src/internal/zarrv2.array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ class ZarrV2ArrayWriter final : public ArrayWriter
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

~ZarrV2ArrayWriter() override = default;
~ZarrV2ArrayWriter() override;

private:
ZarrVersion version_() const override;
bool flush_impl_() override;
bool write_array_metadata_() override;
bool should_rollover_() const override;
};
} // namespace acquire::sink::zarr
} // namespace zarr
12 changes: 12 additions & 0 deletions src/internal/zarrv3.array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter(
}
}

zarr::ZarrV3ArrayWriter::~ZarrV3ArrayWriter()
{
is_finalizing_ = true;
try {
flush_();
} catch (const std::exception& exc) {
LOG_ERROR("Failed to finalize array writer: %s", exc.what());
} catch (...) {
LOG_ERROR("Failed to finalize array writer: (unknown)");
}
}

ZarrVersion
zarr::ZarrV3ArrayWriter::version_() const
{
Expand Down
4 changes: 2 additions & 2 deletions src/internal/zarrv3.array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct ZarrV3ArrayWriter final : public ArrayWriter
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

~ZarrV3ArrayWriter() override = default;
~ZarrV3ArrayWriter() override;

private:
std::vector<size_t> shard_file_offsets_;
Expand All @@ -22,4 +22,4 @@ struct ZarrV3ArrayWriter final : public ArrayWriter
bool write_array_metadata_() override;
bool should_rollover_() const override;
};
} // namespace acquire::sink::zarr
} // namespace zarr
13 changes: 7 additions & 6 deletions tests/unit-tests/zarrv2-writer-write-even.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
}
}
writer.finalize();

check_json();

Expand Down
13 changes: 7 additions & 6 deletions tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
}
}
writer.finalize();

check_json();

Expand Down
13 changes: 7 additions & 6 deletions tests/unit-tests/zarrv2-writer-write-ragged-internal-dim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV2ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
for (auto i = 0; i < n_frames; ++i) { // 2 time points
CHECK(writer.write_frame(data.data(), frame_size));
}
}
writer.finalize();

check_json();

Expand Down
13 changes: 7 additions & 6 deletions tests/unit-tests/zarrv3-writer-write-even.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size));
for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size));
}
}
writer.finalize();

check_json();

Expand Down
14 changes: 8 additions & 6 deletions tests/unit-tests/zarrv3-writer-write-ragged-append-dim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size) == frame_size);
for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size) ==
frame_size);
}
}
writer.finalize();

check_json();

Expand Down
14 changes: 8 additions & 6 deletions tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,17 @@ main()
.compression_params = std::nullopt,
};

zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);
{
zarr::ZarrV3ArrayWriter writer(config, thread_pool, nullptr);

const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);
const size_t frame_size = array_width * array_height * nbytes_px;
std::vector<uint8_t> data(frame_size, 0);

for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size) == frame_size);
for (auto i = 0; i < n_frames; ++i) {
CHECK(writer.write_frame(data.data(), frame_size) ==
frame_size);
}
}
writer.finalize();

const auto chunk_size = chunk_width * chunk_height * chunk_planes *
chunk_timepoints * nbytes_px;
Expand Down

0 comments on commit 0fc4799

Please sign in to comment.