From 5f2795e582c4a2396263b234293b97534e1d3f0c Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 20 Nov 2024 11:28:46 -0500 Subject: [PATCH 01/12] Add benchmark to test different chunk/shard/compression/storage configurations --- CMakeLists.txt | 7 + benchmarks/CMakeLists.txt | 18 +++ benchmarks/benchmark.cpp | 233 +++++++++++++++++++++++++++++++++ src/streaming/array.writer.cpp | 1 - 4 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 benchmarks/CMakeLists.txt create mode 100644 benchmarks/benchmark.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d877cb..19e97d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) option(BUILD_PYTHON "Build Python bindings" OFF) +option(BUILD_BENCHMARK "Build benchmarks" OFF) if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) include(CTest) @@ -33,6 +34,12 @@ else () message(STATUS "Skipping test targets") endif () +if (BUILD_BENCHMARK) + add_subdirectory(benchmarks) +else () + message(STATUS "Skipping benchmarks") +endif () + if (${BUILD_PYTHON}) add_subdirectory(python) else () diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt new file mode 100644 index 0000000..80fda26 --- /dev/null +++ b/benchmarks/CMakeLists.txt @@ -0,0 +1,18 @@ +set(project acquire-zarr) + +set(tgt acquire-zarr-benchmark) +add_executable(${tgt} benchmark.cpp) +target_compile_definitions(${tgt} PUBLIC "TEST=\"${tgt}\"") +set_target_properties(${tgt} PROPERTIES + MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>" +) +target_include_directories(${tgt} PRIVATE + ${PROJECT_SOURCE_DIR}/include + ${PROJECT_SOURCE_DIR}/src/logger +) +target_link_libraries(${tgt} PRIVATE + acquire-logger + acquire-zarr + nlohmann_json::nlohmann_json + miniocpp::miniocpp +) diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp new file mode 100644 index 0000000..93b1e7d --- /dev/null +++ b/benchmarks/benchmark.cpp @@ -0,0 +1,233 @@ +#include "acquire.zarr.h" +#include +#include +#include +#include +#include +#include +#include + +#define DIM(name_, type_, array_size, chunk_size, shard_size) \ + { .name = (name_), \ + .type = (type_), \ + .array_size_px = (array_size), \ + .chunk_size_px = (chunk_size), \ + .shard_size_chunks = (shard_size) } + +namespace fs = std::filesystem; + +struct ChunkConfig +{ + unsigned int t, c, z, y, x; +}; + +const std::vector CHUNK_CONFIGS = { { 1, 1, 64, 64, 64 }, + { 1, 1, 128, 128, 128 }, + { 1, 1, 256, 256, 256 } }; + +const unsigned int ARRAY_WIDTH = 1920, ARRAY_HEIGHT = 1080, ARRAY_PLANES = 6, + ARRAY_CHANNELS = 3, ARRAY_TIMEPOINTS = 10; + +const unsigned int NUM_RUNS = 1; + +struct BenchmarkConfig +{ + ChunkConfig chunk; + int zarr_version; + std::string compression; + std::string storage; + unsigned int chunks_per_shard_x; + unsigned int chunks_per_shard_y; + std::string s3_endpoint; + std::string s3_bucket; + std::string s3_access_key; + std::string s3_secret_key; +}; + +class Timer +{ + using Clock = std::chrono::high_resolution_clock; + Clock::time_point start; + + public: + Timer() + : start(Clock::now()) + { + } + double elapsed() + { + auto end = Clock::now(); + return std::chrono::duration(end - start).count(); + } +}; + +ZarrStream* +setup_stream(const BenchmarkConfig& config) +{ + ZarrStreamSettings settings = { .store_path = "benchmark.zarr", + .s3_settings = nullptr, + .compression_settings = nullptr, + .data_type = ZarrDataType_uint16, + .version = static_cast( + config.zarr_version) }; + + ZarrCompressionSettings comp_settings = {}; + if (config.compression != "none") { + comp_settings.compressor = ZarrCompressor_Blosc1; + comp_settings.codec = config.compression == "lz4" + ? ZarrCompressionCodec_BloscLZ4 + : ZarrCompressionCodec_BloscZstd; + comp_settings.level = 1; + comp_settings.shuffle = 1; + settings.compression_settings = &comp_settings; + } + + ZarrS3Settings s3_settings = {}; + if (config.storage == "s3") { + s3_settings = { + .endpoint = config.s3_endpoint.c_str(), + .bucket_name = config.s3_bucket.c_str(), + .access_key_id = config.s3_access_key.c_str(), + .secret_access_key = config.s3_secret_key.c_str(), + }; + settings.s3_settings = &s3_settings; + } + + ZarrStreamSettings_create_dimension_array(&settings, 5); + auto* dims = settings.dimensions; + + dims[0] = + DIM("t", ZarrDimensionType_Time, ARRAY_TIMEPOINTS, config.chunk.t, 1); + dims[1] = + DIM("c", ZarrDimensionType_Channel, ARRAY_CHANNELS, config.chunk.c, 1); + dims[2] = + DIM("z", ZarrDimensionType_Space, ARRAY_PLANES, config.chunk.z, 1); + dims[3] = DIM("y", + ZarrDimensionType_Space, + ARRAY_HEIGHT, + config.chunk.y, + config.chunks_per_shard_y); + dims[4] = DIM("x", + ZarrDimensionType_Space, + ARRAY_WIDTH, + config.chunk.x, + config.chunks_per_shard_x); + + return ZarrStream_create(&settings); +} + +double +run_benchmark(const BenchmarkConfig& config) +{ + auto* stream = setup_stream(config); + if (!stream) + return -1.0; + + const size_t frame_size = ARRAY_WIDTH * ARRAY_HEIGHT * sizeof(uint16_t); + std::vector frame(ARRAY_WIDTH * ARRAY_HEIGHT, 0); + const auto num_frames = ARRAY_PLANES * ARRAY_CHANNELS * ARRAY_TIMEPOINTS; + + Timer timer; + size_t bytes_out; + for (int i = 0; i < num_frames; ++i) { + if (ZarrStream_append(stream, frame.data(), frame_size, &bytes_out) != + ZarrStatusCode_Success) { + ZarrStream_destroy(stream); + return -1.0; + } + } + double elapsed = timer.elapsed(); + + ZarrStream_destroy(stream); + if (config.storage == "filesystem") { + fs::remove_all("benchmark.zarr"); + } + return elapsed; +} + +int +main() +{ + std::ofstream csv("zarr_benchmarks.csv"); + csv << "chunk_size,zarr_version,compression,storage,chunks_per_shard_y," + "chunks_per_shard_x,run,time_seconds\n"; + + std::vector configs; + for (const auto& chunk : CHUNK_CONFIGS) { + + // V2 configurations (no sharding) + for (const auto& compression : { "none", "lz4", "zstd" }) { + configs.push_back({ chunk, 2, compression, "filesystem", 1, 1 }); + + if (std::getenv("ZARR_S3_ENDPOINT")) { + configs.push_back({ chunk, + 2, + compression, + "s3", + 1, + 1, + std::getenv("ZARR_S3_ENDPOINT"), + std::getenv("ZARR_S3_BUCKET_NAME"), + std::getenv("ZARR_S3_ACCESS_KEY_ID"), + std::getenv("ZARR_S3_SECRET_ACCESS_KEY") }); + } + } + + unsigned int max_cps_y = (ARRAY_HEIGHT + chunk.y - 1) / chunk.y; + unsigned int max_cps_x = (ARRAY_WIDTH + chunk.x - 1) / chunk.x; + + // V3 configurations (with sharding) + for (unsigned int cps_y = 1; cps_y <= max_cps_y; cps_y *= 2) { + for (unsigned int cps_x = 1; cps_x <= max_cps_x; cps_x *= 2) { + for (const auto& compression : { "none", "lz4", "zstd" }) { + configs.push_back( + { chunk, 3, compression, "filesystem", cps_x, cps_y }); + + if (std::getenv("ZARR_S3_ENDPOINT")) { + configs.push_back( + { chunk, + 3, + compression, + "s3", + cps_x, + cps_y, + std::getenv("ZARR_S3_ENDPOINT"), + std::getenv("ZARR_S3_BUCKET_NAME"), + std::getenv("ZARR_S3_ACCESS_KEY_ID"), + std::getenv("ZARR_S3_SECRET_ACCESS_KEY") }); + } + } + } + } + } + + for (const auto& config : configs) { + std::string chunk_str = std::to_string(config.chunk.t) + "x" + + std::to_string(config.chunk.c) + "x" + + std::to_string(config.chunk.z) + "x" + + std::to_string(config.chunk.y) + "x" + + std::to_string(config.chunk.x); + + for (unsigned int run = 1; run <= NUM_RUNS; ++run) { + std::cout << "Benchmarking " << chunk_str << " Zarr V" + << config.zarr_version + << ", compression: " << config.compression + << ", storage: " << config.storage + << ", CPS (y): " << config.chunks_per_shard_y + << ", CPS (x): " << config.chunks_per_shard_x << ", (run " + << run << " / " << NUM_RUNS << ")..."; + double time = run_benchmark(config); + std::cout << " " << time << "s\n"; + if (time >= 0) { + csv << chunk_str << "," << config.zarr_version << "," + << config.compression << "," << config.storage << "," + << config.chunks_per_shard_y << "," + << config.chunks_per_shard_x << "," << run << "," + << std::fixed << std::setprecision(3) << time << "\n"; + } + csv.flush(); + } + } + + return 0; +} \ No newline at end of file diff --git a/src/streaming/array.writer.cpp b/src/streaming/array.writer.cpp index f1a3b55..6ca4671 100644 --- a/src/streaming/array.writer.cpp +++ b/src/streaming/array.writer.cpp @@ -190,7 +190,6 @@ bool zarr::ArrayWriter::make_metadata_sink_() { if (metadata_sink_) { - LOG_INFO("Metadata sink already exists"); return true; } From 1cf276e61e6c00e121f9038d8ca037a5542dd3b2 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 21 Nov 2024 09:41:43 -0500 Subject: [PATCH 02/12] Add ShardBuffer class to keep track of ready-to-write chunks. --- src/streaming/array.writer.cpp | 2 +- src/streaming/array.writer.hh | 3 +- src/streaming/zarrv3.array.writer.hh | 45 ++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/streaming/array.writer.cpp b/src/streaming/array.writer.cpp index 6ca4671..0c7fd21 100644 --- a/src/streaming/array.writer.cpp +++ b/src/streaming/array.writer.cpp @@ -360,7 +360,7 @@ zarr::ArrayWriter::compress_buffers_() try { const auto tmp_size = bytes_of_chunk + BLOSC_MAX_OVERHEAD; - std::vector tmp(tmp_size); + ChunkBuffer tmp(tmp_size); const auto nb = blosc_compress_ctx(params.clevel, params.shuffle, diff --git a/src/streaming/array.writer.hh b/src/streaming/array.writer.hh index cf934f4..076b114 100644 --- a/src/streaming/array.writer.hh +++ b/src/streaming/array.writer.hh @@ -56,10 +56,11 @@ class ArrayWriter [[nodiscard]] size_t write_frame(std::span data); protected: + using ChunkBuffer = std::vector; ArrayWriterConfig config_; /// Chunking - std::vector> chunk_buffers_; + std::vector chunk_buffers_; /// Filesystem std::vector> data_sinks_; diff --git a/src/streaming/zarrv3.array.writer.hh b/src/streaming/zarrv3.array.writer.hh index 3ad1b05..bbc7f9e 100644 --- a/src/streaming/zarrv3.array.writer.hh +++ b/src/streaming/zarrv3.array.writer.hh @@ -3,6 +3,49 @@ #include "array.writer.hh" namespace zarr { +struct ChunkIndex +{ + uint32_t buffer_idx; + uint64_t offset; + uint64_t size; +}; + +class ShardBuffer +{ + public: + ShardBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } + + bool try_add_chunk(uint32_t chunk_buffer_index, uint64_t chunk_size) + { + std::lock_guard lock(mutex_); + if (ready_chunks_.size() >= MAX_BUFFER_SIZE_) { + return false; + } + ready_chunks_.push_back( + { chunk_buffer_index, cumulative_size_, chunk_size }); + cumulative_size_ += chunk_size; + ready_count_++; + return true; + } + + std::vector take_chunks() + { + std::lock_guard lock(mutex_); + std::vector chunks = std::move(ready_chunks_); + ready_chunks_.clear(); + ready_count_ = 0; + return chunks; + } + + private: + static constexpr size_t MAX_BUFFER_SIZE_ = 16; + + std::mutex mutex_; + std::vector ready_chunks_; + uint64_t cumulative_size_{ 0 }; + uint32_t ready_count_{ 0 }; +}; + struct ZarrV3ArrayWriter : public ArrayWriter { public: @@ -14,6 +57,8 @@ struct ZarrV3ArrayWriter : public ArrayWriter std::shared_ptr s3_connection_pool); private: + std::vector shard_buffers_; + std::vector shard_file_offsets_; std::vector> shard_tables_; From 48fe1a6fbdd4c1a74326871d37b627430c0d3808 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 21 Nov 2024 10:34:27 -0500 Subject: [PATCH 03/12] Increase number of runs/config in benchmark. --- benchmarks/benchmark.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index 93b1e7d..69bf060 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -28,7 +28,7 @@ const std::vector CHUNK_CONFIGS = { { 1, 1, 64, 64, 64 }, const unsigned int ARRAY_WIDTH = 1920, ARRAY_HEIGHT = 1080, ARRAY_PLANES = 6, ARRAY_CHANNELS = 3, ARRAY_TIMEPOINTS = 10; -const unsigned int NUM_RUNS = 1; +const unsigned int NUM_RUNS = 5; struct BenchmarkConfig { From 1d8c6056113710d73c086b6982f5152cf6c44daa Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 21 Nov 2024 13:30:24 -0500 Subject: [PATCH 04/12] Add ChunkIndexBuffer to ZarrV2ArrayWriter. --- src/streaming/zarrv2.array.writer.hh | 35 ++++++++++++++++++++++++++++ src/streaming/zarrv3.array.writer.hh | 9 +++---- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/streaming/zarrv2.array.writer.hh b/src/streaming/zarrv2.array.writer.hh index 2a6039a..febe88f 100644 --- a/src/streaming/zarrv2.array.writer.hh +++ b/src/streaming/zarrv2.array.writer.hh @@ -3,6 +3,39 @@ #include "array.writer.hh" namespace zarr { +class ChunkIndexBuffer +{ + public: + ChunkIndexBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } + + bool try_add_chunk(uint32_t chunk_buffer_index) + { + std::lock_guard lock(mutex_); + if (ready_chunks_.size() >= MAX_BUFFER_SIZE_) { + return false; + } + + ready_chunks_.push_back(chunk_buffer_index); + ++ready_count_; + return true; + } + + std::vector take_chunks() + { + std::lock_guard lock(mutex_); + std::vector chunks = std::move(ready_chunks_); + ready_chunks_.clear(); + ready_count_ = 0; + return chunks; + } + + private: + static constexpr size_t MAX_BUFFER_SIZE_ = 16; + + std::mutex mutex_; + std::vector ready_chunks_; + uint32_t ready_count_{ 0 }; +}; class ZarrV2ArrayWriter final : public ArrayWriter { public: @@ -14,6 +47,8 @@ class ZarrV2ArrayWriter final : public ArrayWriter std::shared_ptr s3_connection_pool); private: + ChunkIndexBuffer ready_chunks_; + ZarrVersion version_() const override { return ZarrVersion_2; }; bool flush_impl_() override; bool write_array_metadata_() override; diff --git a/src/streaming/zarrv3.array.writer.hh b/src/streaming/zarrv3.array.writer.hh index bbc7f9e..1e9d5fa 100644 --- a/src/streaming/zarrv3.array.writer.hh +++ b/src/streaming/zarrv3.array.writer.hh @@ -10,10 +10,10 @@ struct ChunkIndex uint64_t size; }; -class ShardBuffer +class ShardIndexBuffer { public: - ShardBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } + ShardIndexBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } bool try_add_chunk(uint32_t chunk_buffer_index, uint64_t chunk_size) { @@ -21,10 +21,11 @@ class ShardBuffer if (ready_chunks_.size() >= MAX_BUFFER_SIZE_) { return false; } + ready_chunks_.push_back( { chunk_buffer_index, cumulative_size_, chunk_size }); cumulative_size_ += chunk_size; - ready_count_++; + ++ready_count_; return true; } @@ -57,7 +58,7 @@ struct ZarrV3ArrayWriter : public ArrayWriter std::shared_ptr s3_connection_pool); private: - std::vector shard_buffers_; + std::vector shards_ready_; std::vector shard_file_offsets_; std::vector> shard_tables_; From f0a97e8e2328b84afbb651df0e6059d780a77900 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 27 Nov 2024 15:02:47 -0500 Subject: [PATCH 05/12] wip: Vectorized file write (Windows). --- src/streaming/CMakeLists.txt | 2 + src/streaming/vectorized.file.writer.cpp | 221 +++++++++++++++++++++ src/streaming/vectorized.file.writer.hh | 42 ++++ tests/unit-tests/CMakeLists.txt | 1 + tests/unit-tests/vectorized-file-write.cpp | 90 +++++++++ 5 files changed, 356 insertions(+) create mode 100644 src/streaming/vectorized.file.writer.cpp create mode 100644 src/streaming/vectorized.file.writer.hh create mode 100644 tests/unit-tests/vectorized-file-write.cpp diff --git a/src/streaming/CMakeLists.txt b/src/streaming/CMakeLists.txt index cfa734c..c9b5fef 100644 --- a/src/streaming/CMakeLists.txt +++ b/src/streaming/CMakeLists.txt @@ -29,6 +29,8 @@ add_library(${tgt} zarrv2.array.writer.cpp zarrv3.array.writer.hh zarrv3.array.writer.cpp + vectorized.file.writer.hh + vectorized.file.writer.cpp ) target_include_directories(${tgt} diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp new file mode 100644 index 0000000..39ce693 --- /dev/null +++ b/src/streaming/vectorized.file.writer.cpp @@ -0,0 +1,221 @@ +#include "vectorized.file.writer.hh" +#include "macros.hh" + +namespace { +#ifdef _WIN32 +std::string +get_last_error_as_string() +{ + DWORD errorMessageID = ::GetLastError(); + if (errorMessageID == 0) { + return std::string(); // No error message has been recorded + } + + LPSTR messageBuffer = nullptr; + + size_t size = FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + errorMessageID, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&messageBuffer, + 0, + NULL); + + std::string message(messageBuffer, size); + + LocalFree(messageBuffer); + + return message; +} + +size_t +get_sector_size(const std::string& path) +{ + // Get volume root path + char volume_path[MAX_PATH]; + if (!GetVolumePathNameA(path.c_str(), volume_path, MAX_PATH)) { + return 0; + } + + DWORD sectors_per_cluster; + DWORD bytes_per_sector; + DWORD number_of_free_clusters; + DWORD total_number_of_clusters; + + if (!GetDiskFreeSpaceA(volume_path, + §ors_per_cluster, + &bytes_per_sector, + &number_of_free_clusters, + &total_number_of_clusters)) { + return 0; + } + + return bytes_per_sector; +} +#endif + +bool +is_aligned(const void* ptr, size_t alignment) +{ + return reinterpret_cast(ptr) % alignment == 0; +} +} // namespace + +zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path) +{ +#ifdef _WIN32 + SYSTEM_INFO si; + GetSystemInfo(&si); + page_size_ = si.dwPageSize; + + sector_size_ = get_sector_size(path); + if (sector_size_ == 0) { + throw std::runtime_error("Failed to get sector size"); + } + + handle_ = CreateFileA(path.c_str(), + GENERIC_WRITE, + 0, // No sharing + nullptr, + OPEN_ALWAYS, + FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | + FILE_FLAG_SEQUENTIAL_SCAN, + nullptr); + if (handle_ == INVALID_HANDLE_VALUE) { + auto err = get_last_error_as_string(); + throw std::runtime_error("Failed to open file '" + path + "': " + err); + } +#else + page_size_ = sysconf(_SC_PAGESIZE); + fd_ = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0644); + if (fd_ < 0) { + throw std::runtime_error("Failed to open file: " + path); + } +#endif +} + +zarr::VectorizedFileWriter::~VectorizedFileWriter() +{ +#ifdef _WIN32 + if (handle_ != INVALID_HANDLE_VALUE) { + CloseHandle(handle_); + } +#else + if (fd_ >= 0) { + close(fd_); + } +#endif +} + +bool +zarr::VectorizedFileWriter::write_vectors( + const std::vector>& buffers, + const std::vector& offsets) +{ + std::lock_guard lock(mutex_); + +#ifdef _WIN32 + size_t total_bytes_to_write = 0; + for (const auto& buffer : buffers) { + total_bytes_to_write += buffer.size(); + } + + const size_t nbytes_aligned = align_size_(total_bytes_to_write); + CHECK(nbytes_aligned >= total_bytes_to_write); + + auto* aligned_ptr = (std::byte*)_aligned_malloc(nbytes_aligned, page_size_); + if (!aligned_ptr) { + return false; + } + + auto* cur = aligned_ptr; + for (const auto& buffer : buffers) { + std::copy(buffer.begin(), buffer.end(), cur); + cur += buffer.size(); + } + + std::vector segments(nbytes_aligned / page_size_); + + cur = aligned_ptr; + for (auto& segment : segments) { + memset(&segment, 0, sizeof(segment)); + segment.Buffer = PtrToPtr64(cur); + cur += page_size_; + } + + OVERLAPPED overlapped = { 0 }; + overlapped.Offset = static_cast(offsets[0] & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offsets[0] >> 32); + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + + DWORD bytes_written; + + bool retval{ true }; + if (!WriteFileGather( + handle_, segments.data(), nbytes_aligned, nullptr, &overlapped)) { + if (GetLastError() != ERROR_IO_PENDING) { + LOG_ERROR("Failed to write file: ", get_last_error_as_string()); + retval = false; + } + + // Wait for the operation to complete + if (!GetOverlappedResult(handle_, &overlapped, &bytes_written, TRUE)) { + LOG_ERROR("Failed to get overlapped result: ", + get_last_error_as_string()); + retval = false; + } + } + + _aligned_free(aligned_ptr); + return retval; +#else + std::vector iovecs; + iovecs.reserve(buffers.size()); + + for (const auto& buffer : buffers) { + if (!is_aligned(buffer.data(), page_size_)) { + return false; + } + struct iovec iov; + iov.iov_base = + const_cast(static_cast(buffer.data())); + iov.iov_len = buffer.size(); + iovecs.push_back(iov); + } + + if (lseek(fd_, offsets[0], SEEK_SET) == -1) { + return false; + } + + ssize_t total_bytes = 0; + for (const auto& buffer : buffers) { + total_bytes += buffer.size(); + } + + ssize_t bytes_written = writev(fd_, iovecs.data(), iovecs.size()); + if (bytes_written != total_bytes) { + return false; + } +#endif + return true; +} + +size_t +zarr::VectorizedFileWriter::align_size_(size_t size) const +{ + return align_to_sector_(align_to_page_(size)); +} + +size_t +zarr::VectorizedFileWriter::align_to_page_(size_t size) const +{ + return (size + page_size_ - 1) & ~(page_size_ - 1); +} + +size_t +zarr::VectorizedFileWriter::align_to_sector_(size_t size) const +{ + return (size + sector_size_ - 1) & ~(sector_size_ - 1); +} \ No newline at end of file diff --git a/src/streaming/vectorized.file.writer.hh b/src/streaming/vectorized.file.writer.hh new file mode 100644 index 0000000..2a15f62 --- /dev/null +++ b/src/streaming/vectorized.file.writer.hh @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#include +#include +#endif + +namespace zarr { +class VectorizedFileWriter +{ + public: + explicit VectorizedFileWriter(const std::string& path); + ~VectorizedFileWriter(); + + bool write_vectors(const std::vector>& buffers, + const std::vector& offsets); + + std::mutex& mutex() { return mutex_; } + + private: + std::mutex mutex_; + size_t page_size_; +#ifdef _WIN32 + HANDLE handle_; + size_t sector_size_; +#else + int fd_; +#endif + + size_t align_size_(size_t size) const; + size_t align_to_page_(size_t size) const; + size_t align_to_sector_(size_t size) const; +}; +} // namespace zarr \ No newline at end of file diff --git a/tests/unit-tests/CMakeLists.txt b/tests/unit-tests/CMakeLists.txt index 6069b73..fba62cc 100644 --- a/tests/unit-tests/CMakeLists.txt +++ b/tests/unit-tests/CMakeLists.txt @@ -25,6 +25,7 @@ set(tests zarrv3-writer-write-even zarrv3-writer-write-ragged-append-dim zarrv3-writer-write-ragged-internal-dim + vectorized-file-write ) foreach (name ${tests}) diff --git a/tests/unit-tests/vectorized-file-write.cpp b/tests/unit-tests/vectorized-file-write.cpp new file mode 100644 index 0000000..a24ff5a --- /dev/null +++ b/tests/unit-tests/vectorized-file-write.cpp @@ -0,0 +1,90 @@ +#include "vectorized.file.writer.hh" +#include "unit.test.macros.hh" + +#include +#include +#include + +namespace fs = std::filesystem; + +size_t +write_to_file(const std::string& filename) +{ + size_t file_size = 0; + zarr::VectorizedFileWriter writer(filename); + + std::vector> data(10); + std::vector offsets(10); + size_t offset = 0; + for (auto i = 0; i < data.size(); ++i) { + data[i].resize((i + 1) * 1024); + std::fill(data[i].begin(), data[i].end(), std::byte(i)); + + offsets[i] = offset; + offset += data[i].size(); + } + + file_size = offsets.back() + data.back().size(); + CHECK(writer.write_vectors(data, offsets)); + + return file_size; +} + +void +verify_file_data(const std::string& filename, size_t file_size) +{ + std::ifstream file(filename, std::ios::binary); + std::vector read_buffer(file_size); + + file.read(reinterpret_cast(read_buffer.data()), file_size); + CHECK(file.good() && file.gcount() == file_size); + + // Verify data pattern + size_t offset = 0; + for (size_t i = 0; i < 10; i++) { + size_t size = (i + 1) * 1024; + + for (size_t j = offset; j < offset + size; j++) { + auto byte = (int)read_buffer[j]; + EXPECT(byte == i, + "Data mismatch at offset ", + j, + ". Expected ", + i, + " got ", + byte, + "."); + } + offset += size; + } +} + +int +main() +{ + const auto base_dir = fs::temp_directory_path() / "vectorized-file-writer"; + if (!fs::exists(base_dir) && !fs::create_directories(base_dir)) { + std::cerr << "Failed to create directory: " << base_dir << std::endl; + return 1; + } + + int retval = 1; + const auto filename = (base_dir / "test.bin").string(); + + try { + const auto file_size = write_to_file(filename); + EXPECT(fs::exists(filename), "File not found: ", filename); + verify_file_data(filename, file_size); + + retval = 0; + } catch (const std::exception& exc) { + std::cerr << "Exception: " << exc.what() << std::endl; + } + + // cleanup + if (fs::exists(base_dir) && !fs::remove_all(base_dir)) { + std::cerr << "Failed to remove directory: " << base_dir << std::endl; + } + + return retval; +} \ No newline at end of file From 6aadf65a9e3335fa8823f8836851aeba8085780e Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 27 Nov 2024 16:21:07 -0500 Subject: [PATCH 06/12] Changes for Linux --- src/streaming/vectorized.file.writer.cpp | 53 +++++++++++++----------- src/streaming/vectorized.file.writer.hh | 1 - 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp index 39ce693..6c32a4d 100644 --- a/src/streaming/vectorized.file.writer.cpp +++ b/src/streaming/vectorized.file.writer.cpp @@ -1,6 +1,8 @@ #include "vectorized.file.writer.hh" #include "macros.hh" +#include + namespace { #ifdef _WIN32 std::string @@ -54,6 +56,12 @@ get_sector_size(const std::string& path) return bytes_per_sector; } +#else +std::string +get_last_error_as_string() +{ + return strerror(errno); +} #endif bool @@ -171,31 +179,29 @@ zarr::VectorizedFileWriter::write_vectors( _aligned_free(aligned_ptr); return retval; #else - std::vector iovecs; - iovecs.reserve(buffers.size()); + std::vector iovecs(buffers.size()); - for (const auto& buffer : buffers) { - if (!is_aligned(buffer.data(), page_size_)) { - return false; - } - struct iovec iov; - iov.iov_base = - const_cast(static_cast(buffer.data())); - iov.iov_len = buffer.size(); - iovecs.push_back(iov); - } - - if (lseek(fd_, offsets[0], SEEK_SET) == -1) { - return false; + for (auto i = 0; i < buffers.size(); ++i) { + auto* iov = &iovecs[i]; + memset(iov, 0, sizeof(struct iovec)); + iov->iov_base = + const_cast(static_cast(buffers[i].data())); + iov->iov_len = buffers[i].size(); } ssize_t total_bytes = 0; for (const auto& buffer : buffers) { - total_bytes += buffer.size(); + total_bytes += static_cast(buffer.size()); } - ssize_t bytes_written = writev(fd_, iovecs.data(), iovecs.size()); + ssize_t bytes_written = pwritev(fd_, + iovecs.data(), + static_cast(iovecs.size()), + static_cast(offsets[0])); + if (bytes_written != total_bytes) { + auto error = get_last_error_as_string(); + LOG_ERROR("Failed to write file: ", error); return false; } #endif @@ -205,17 +211,16 @@ zarr::VectorizedFileWriter::write_vectors( size_t zarr::VectorizedFileWriter::align_size_(size_t size) const { - return align_to_sector_(align_to_page_(size)); + size = align_to_page_(size); +#ifdef _WIN32 + return (size + sector_size_ - 1) & ~(sector_size_ - 1); +#else + return size; +#endif } size_t zarr::VectorizedFileWriter::align_to_page_(size_t size) const { return (size + page_size_ - 1) & ~(page_size_ - 1); -} - -size_t -zarr::VectorizedFileWriter::align_to_sector_(size_t size) const -{ - return (size + sector_size_ - 1) & ~(sector_size_ - 1); } \ No newline at end of file diff --git a/src/streaming/vectorized.file.writer.hh b/src/streaming/vectorized.file.writer.hh index 2a15f62..01bef6b 100644 --- a/src/streaming/vectorized.file.writer.hh +++ b/src/streaming/vectorized.file.writer.hh @@ -37,6 +37,5 @@ class VectorizedFileWriter size_t align_size_(size_t size) const; size_t align_to_page_(size_t size) const; - size_t align_to_sector_(size_t size) const; }; } // namespace zarr \ No newline at end of file From 97e93c12eef416386dd921db917fa346b84bcb2b Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Mon, 2 Dec 2024 09:27:03 -0500 Subject: [PATCH 07/12] Vectorized file write on OSX --- src/streaming/vectorized.file.writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp index 6c32a4d..23126b0 100644 --- a/src/streaming/vectorized.file.writer.cpp +++ b/src/streaming/vectorized.file.writer.cpp @@ -97,7 +97,7 @@ zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path) } #else page_size_ = sysconf(_SC_PAGESIZE); - fd_ = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0644); + fd_ = open(path.c_str(), O_WRONLY | O_CREAT, 0644); if (fd_ < 0) { throw std::runtime_error("Failed to open file: " + path); } From d70ca8ef92cbe84fb444eb9e6204a4257ff88604 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Mon, 2 Dec 2024 10:25:18 -0500 Subject: [PATCH 08/12] Test vectorized file write with multiple writes to the same file. --- src/streaming/vectorized.file.writer.cpp | 8 ++-- src/streaming/vectorized.file.writer.hh | 2 +- tests/unit-tests/vectorized-file-write.cpp | 45 +++++++++++++++++----- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp index 23126b0..c5a8623 100644 --- a/src/streaming/vectorized.file.writer.cpp +++ b/src/streaming/vectorized.file.writer.cpp @@ -120,7 +120,7 @@ zarr::VectorizedFileWriter::~VectorizedFileWriter() bool zarr::VectorizedFileWriter::write_vectors( const std::vector>& buffers, - const std::vector& offsets) + size_t offset) { std::lock_guard lock(mutex_); @@ -154,8 +154,8 @@ zarr::VectorizedFileWriter::write_vectors( } OVERLAPPED overlapped = { 0 }; - overlapped.Offset = static_cast(offsets[0] & 0xFFFFFFFF); - overlapped.OffsetHigh = static_cast(offsets[0] >> 32); + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); DWORD bytes_written; @@ -197,7 +197,7 @@ zarr::VectorizedFileWriter::write_vectors( ssize_t bytes_written = pwritev(fd_, iovecs.data(), static_cast(iovecs.size()), - static_cast(offsets[0])); + static_cast(offset)); if (bytes_written != total_bytes) { auto error = get_last_error_as_string(); diff --git a/src/streaming/vectorized.file.writer.hh b/src/streaming/vectorized.file.writer.hh index 01bef6b..405bd5d 100644 --- a/src/streaming/vectorized.file.writer.hh +++ b/src/streaming/vectorized.file.writer.hh @@ -21,7 +21,7 @@ class VectorizedFileWriter ~VectorizedFileWriter(); bool write_vectors(const std::vector>& buffers, - const std::vector& offsets); + size_t offset); std::mutex& mutex() { return mutex_; } diff --git a/tests/unit-tests/vectorized-file-write.cpp b/tests/unit-tests/vectorized-file-write.cpp index a24ff5a..3b2c8c2 100644 --- a/tests/unit-tests/vectorized-file-write.cpp +++ b/tests/unit-tests/vectorized-file-write.cpp @@ -14,20 +14,21 @@ write_to_file(const std::string& filename) zarr::VectorizedFileWriter writer(filename); std::vector> data(10); - std::vector offsets(10); - size_t offset = 0; for (auto i = 0; i < data.size(); ++i) { data[i].resize((i + 1) * 1024); std::fill(data[i].begin(), data[i].end(), std::byte(i)); - - offsets[i] = offset; - offset += data[i].size(); + file_size += data[i].size(); } + CHECK(writer.write_vectors(data, 0)); - file_size = offsets.back() + data.back().size(); - CHECK(writer.write_vectors(data, offsets)); + // write more data + for (auto i = 0; i < 10; ++i) { + auto& vec = data[i]; + std::fill(vec.begin(), vec.end(), std::byte(i + 10)); + } + CHECK(writer.write_vectors(data, file_size)); - return file_size; + return 2 * file_size; } void @@ -41,10 +42,10 @@ verify_file_data(const std::string& filename, size_t file_size) // Verify data pattern size_t offset = 0; - for (size_t i = 0; i < 10; i++) { + for (size_t i = 0; i < 10; ++i) { size_t size = (i + 1) * 1024; - for (size_t j = offset; j < offset + size; j++) { + for (size_t j = offset; j < offset + size; ++j) { auto byte = (int)read_buffer[j]; EXPECT(byte == i, "Data mismatch at offset ", @@ -57,6 +58,23 @@ verify_file_data(const std::string& filename, size_t file_size) } offset += size; } + + for (size_t i = 0; i < 10; ++i) { + size_t size = (i + 1) * 1024; + + for (size_t j = offset; j < offset + size; ++j) { + auto byte = (int)read_buffer[j]; + EXPECT(byte == i + 10, + "Data mismatch at offset ", + j, + ". Expected ", + i + 10, + " got ", + byte, + "."); + } + offset += size; + } } int @@ -74,6 +92,13 @@ main() try { const auto file_size = write_to_file(filename); EXPECT(fs::exists(filename), "File not found: ", filename); + + auto file_size_on_disk = fs::file_size(filename); + EXPECT(file_size_on_disk >= file_size, // sum(1:10) * 1024 * 2 + "Expected file size of at least ", + file_size, + " bytes, got ", + file_size_on_disk); verify_file_data(filename, file_size); retval = 0; From 1f95b058e98511a80b4a182b195a8c298d286324 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Mon, 2 Dec 2024 11:31:03 -0500 Subject: [PATCH 09/12] Minor edits --- src/streaming/vectorized.file.writer.cpp | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp index c5a8623..e4ea039 100644 --- a/src/streaming/vectorized.file.writer.cpp +++ b/src/streaming/vectorized.file.writer.cpp @@ -63,12 +63,6 @@ get_last_error_as_string() return strerror(errno); } #endif - -bool -is_aligned(const void* ptr, size_t alignment) -{ - return reinterpret_cast(ptr) % alignment == 0; -} } // namespace zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path) @@ -123,6 +117,7 @@ zarr::VectorizedFileWriter::write_vectors( size_t offset) { std::lock_guard lock(mutex_); + bool retval{ true }; #ifdef _WIN32 size_t total_bytes_to_write = 0; @@ -133,7 +128,8 @@ zarr::VectorizedFileWriter::write_vectors( const size_t nbytes_aligned = align_size_(total_bytes_to_write); CHECK(nbytes_aligned >= total_bytes_to_write); - auto* aligned_ptr = (std::byte*)_aligned_malloc(nbytes_aligned, page_size_); + auto* aligned_ptr = + static_cast(_aligned_malloc(nbytes_aligned, page_size_)); if (!aligned_ptr) { return false; } @@ -160,7 +156,6 @@ zarr::VectorizedFileWriter::write_vectors( DWORD bytes_written; - bool retval{ true }; if (!WriteFileGather( handle_, segments.data(), nbytes_aligned, nullptr, &overlapped)) { if (GetLastError() != ERROR_IO_PENDING) { @@ -177,7 +172,6 @@ zarr::VectorizedFileWriter::write_vectors( } _aligned_free(aligned_ptr); - return retval; #else std::vector iovecs(buffers.size()); @@ -202,10 +196,10 @@ zarr::VectorizedFileWriter::write_vectors( if (bytes_written != total_bytes) { auto error = get_last_error_as_string(); LOG_ERROR("Failed to write file: ", error); - return false; + retval = false; } #endif - return true; + return retval; } size_t From 57178672447312415b6aa9ce2d5a342105b2ab5e Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 4 Dec 2024 15:42:20 -0500 Subject: [PATCH 10/12] Use a vector of spans to avoid copies. --- src/streaming/vectorized.file.writer.cpp | 2 +- src/streaming/vectorized.file.writer.hh | 2 +- tests/unit-tests/vectorized-file-write.cpp | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp index e4ea039..18b57f8 100644 --- a/src/streaming/vectorized.file.writer.cpp +++ b/src/streaming/vectorized.file.writer.cpp @@ -113,7 +113,7 @@ zarr::VectorizedFileWriter::~VectorizedFileWriter() bool zarr::VectorizedFileWriter::write_vectors( - const std::vector>& buffers, + const std::vector>& buffers, size_t offset) { std::lock_guard lock(mutex_); diff --git a/src/streaming/vectorized.file.writer.hh b/src/streaming/vectorized.file.writer.hh index 405bd5d..92350ea 100644 --- a/src/streaming/vectorized.file.writer.hh +++ b/src/streaming/vectorized.file.writer.hh @@ -20,7 +20,7 @@ class VectorizedFileWriter explicit VectorizedFileWriter(const std::string& path); ~VectorizedFileWriter(); - bool write_vectors(const std::vector>& buffers, + bool write_vectors(const std::vector>& buffers, size_t offset); std::mutex& mutex() { return mutex_; } diff --git a/tests/unit-tests/vectorized-file-write.cpp b/tests/unit-tests/vectorized-file-write.cpp index 3b2c8c2..79ebe6d 100644 --- a/tests/unit-tests/vectorized-file-write.cpp +++ b/tests/unit-tests/vectorized-file-write.cpp @@ -14,19 +14,23 @@ write_to_file(const std::string& filename) zarr::VectorizedFileWriter writer(filename); std::vector> data(10); + std::vector> spans(10); + for (auto i = 0; i < data.size(); ++i) { data[i].resize((i + 1) * 1024); std::fill(data[i].begin(), data[i].end(), std::byte(i)); file_size += data[i].size(); + spans[i] = data[i]; } - CHECK(writer.write_vectors(data, 0)); + CHECK(writer.write_vectors(spans, 0)); // write more data for (auto i = 0; i < 10; ++i) { auto& vec = data[i]; std::fill(vec.begin(), vec.end(), std::byte(i + 10)); + spans[i] = vec; } - CHECK(writer.write_vectors(data, file_size)); + CHECK(writer.write_vectors(spans, file_size)); return 2 * file_size; } From 0f71dd060b740358964391099a7daf1475370bfc Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 5 Dec 2024 13:07:47 -0500 Subject: [PATCH 11/12] Remove dead code. --- src/streaming/zarrv2.array.writer.hh | 35 --------------------- src/streaming/zarrv3.array.writer.hh | 46 ---------------------------- 2 files changed, 81 deletions(-) diff --git a/src/streaming/zarrv2.array.writer.hh b/src/streaming/zarrv2.array.writer.hh index febe88f..2a6039a 100644 --- a/src/streaming/zarrv2.array.writer.hh +++ b/src/streaming/zarrv2.array.writer.hh @@ -3,39 +3,6 @@ #include "array.writer.hh" namespace zarr { -class ChunkIndexBuffer -{ - public: - ChunkIndexBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } - - bool try_add_chunk(uint32_t chunk_buffer_index) - { - std::lock_guard lock(mutex_); - if (ready_chunks_.size() >= MAX_BUFFER_SIZE_) { - return false; - } - - ready_chunks_.push_back(chunk_buffer_index); - ++ready_count_; - return true; - } - - std::vector take_chunks() - { - std::lock_guard lock(mutex_); - std::vector chunks = std::move(ready_chunks_); - ready_chunks_.clear(); - ready_count_ = 0; - return chunks; - } - - private: - static constexpr size_t MAX_BUFFER_SIZE_ = 16; - - std::mutex mutex_; - std::vector ready_chunks_; - uint32_t ready_count_{ 0 }; -}; class ZarrV2ArrayWriter final : public ArrayWriter { public: @@ -47,8 +14,6 @@ class ZarrV2ArrayWriter final : public ArrayWriter std::shared_ptr s3_connection_pool); private: - ChunkIndexBuffer ready_chunks_; - ZarrVersion version_() const override { return ZarrVersion_2; }; bool flush_impl_() override; bool write_array_metadata_() override; diff --git a/src/streaming/zarrv3.array.writer.hh b/src/streaming/zarrv3.array.writer.hh index 1e9d5fa..3ad1b05 100644 --- a/src/streaming/zarrv3.array.writer.hh +++ b/src/streaming/zarrv3.array.writer.hh @@ -3,50 +3,6 @@ #include "array.writer.hh" namespace zarr { -struct ChunkIndex -{ - uint32_t buffer_idx; - uint64_t offset; - uint64_t size; -}; - -class ShardIndexBuffer -{ - public: - ShardIndexBuffer() { ready_chunks_.reserve(MAX_BUFFER_SIZE_); } - - bool try_add_chunk(uint32_t chunk_buffer_index, uint64_t chunk_size) - { - std::lock_guard lock(mutex_); - if (ready_chunks_.size() >= MAX_BUFFER_SIZE_) { - return false; - } - - ready_chunks_.push_back( - { chunk_buffer_index, cumulative_size_, chunk_size }); - cumulative_size_ += chunk_size; - ++ready_count_; - return true; - } - - std::vector take_chunks() - { - std::lock_guard lock(mutex_); - std::vector chunks = std::move(ready_chunks_); - ready_chunks_.clear(); - ready_count_ = 0; - return chunks; - } - - private: - static constexpr size_t MAX_BUFFER_SIZE_ = 16; - - std::mutex mutex_; - std::vector ready_chunks_; - uint64_t cumulative_size_{ 0 }; - uint32_t ready_count_{ 0 }; -}; - struct ZarrV3ArrayWriter : public ArrayWriter { public: @@ -58,8 +14,6 @@ struct ZarrV3ArrayWriter : public ArrayWriter std::shared_ptr s3_connection_pool); private: - std::vector shards_ready_; - std::vector shard_file_offsets_; std::vector> shard_tables_; From ed89a98c9f9aa02e2881342cfd951ab29d70df7b Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 5 Dec 2024 13:20:46 -0500 Subject: [PATCH 12/12] Remove benchmark. --- CMakeLists.txt | 7 -- benchmarks/CMakeLists.txt | 18 --- benchmarks/benchmark.cpp | 233 -------------------------------------- 3 files changed, 258 deletions(-) delete mode 100644 benchmarks/CMakeLists.txt delete mode 100644 benchmarks/benchmark.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 19e97d7..5d877cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) option(BUILD_PYTHON "Build Python bindings" OFF) -option(BUILD_BENCHMARK "Build benchmarks" OFF) if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) include(CTest) @@ -34,12 +33,6 @@ else () message(STATUS "Skipping test targets") endif () -if (BUILD_BENCHMARK) - add_subdirectory(benchmarks) -else () - message(STATUS "Skipping benchmarks") -endif () - if (${BUILD_PYTHON}) add_subdirectory(python) else () diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt deleted file mode 100644 index 80fda26..0000000 --- a/benchmarks/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -set(project acquire-zarr) - -set(tgt acquire-zarr-benchmark) -add_executable(${tgt} benchmark.cpp) -target_compile_definitions(${tgt} PUBLIC "TEST=\"${tgt}\"") -set_target_properties(${tgt} PROPERTIES - MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>" -) -target_include_directories(${tgt} PRIVATE - ${PROJECT_SOURCE_DIR}/include - ${PROJECT_SOURCE_DIR}/src/logger -) -target_link_libraries(${tgt} PRIVATE - acquire-logger - acquire-zarr - nlohmann_json::nlohmann_json - miniocpp::miniocpp -) diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp deleted file mode 100644 index 69bf060..0000000 --- a/benchmarks/benchmark.cpp +++ /dev/null @@ -1,233 +0,0 @@ -#include "acquire.zarr.h" -#include -#include -#include -#include -#include -#include -#include - -#define DIM(name_, type_, array_size, chunk_size, shard_size) \ - { .name = (name_), \ - .type = (type_), \ - .array_size_px = (array_size), \ - .chunk_size_px = (chunk_size), \ - .shard_size_chunks = (shard_size) } - -namespace fs = std::filesystem; - -struct ChunkConfig -{ - unsigned int t, c, z, y, x; -}; - -const std::vector CHUNK_CONFIGS = { { 1, 1, 64, 64, 64 }, - { 1, 1, 128, 128, 128 }, - { 1, 1, 256, 256, 256 } }; - -const unsigned int ARRAY_WIDTH = 1920, ARRAY_HEIGHT = 1080, ARRAY_PLANES = 6, - ARRAY_CHANNELS = 3, ARRAY_TIMEPOINTS = 10; - -const unsigned int NUM_RUNS = 5; - -struct BenchmarkConfig -{ - ChunkConfig chunk; - int zarr_version; - std::string compression; - std::string storage; - unsigned int chunks_per_shard_x; - unsigned int chunks_per_shard_y; - std::string s3_endpoint; - std::string s3_bucket; - std::string s3_access_key; - std::string s3_secret_key; -}; - -class Timer -{ - using Clock = std::chrono::high_resolution_clock; - Clock::time_point start; - - public: - Timer() - : start(Clock::now()) - { - } - double elapsed() - { - auto end = Clock::now(); - return std::chrono::duration(end - start).count(); - } -}; - -ZarrStream* -setup_stream(const BenchmarkConfig& config) -{ - ZarrStreamSettings settings = { .store_path = "benchmark.zarr", - .s3_settings = nullptr, - .compression_settings = nullptr, - .data_type = ZarrDataType_uint16, - .version = static_cast( - config.zarr_version) }; - - ZarrCompressionSettings comp_settings = {}; - if (config.compression != "none") { - comp_settings.compressor = ZarrCompressor_Blosc1; - comp_settings.codec = config.compression == "lz4" - ? ZarrCompressionCodec_BloscLZ4 - : ZarrCompressionCodec_BloscZstd; - comp_settings.level = 1; - comp_settings.shuffle = 1; - settings.compression_settings = &comp_settings; - } - - ZarrS3Settings s3_settings = {}; - if (config.storage == "s3") { - s3_settings = { - .endpoint = config.s3_endpoint.c_str(), - .bucket_name = config.s3_bucket.c_str(), - .access_key_id = config.s3_access_key.c_str(), - .secret_access_key = config.s3_secret_key.c_str(), - }; - settings.s3_settings = &s3_settings; - } - - ZarrStreamSettings_create_dimension_array(&settings, 5); - auto* dims = settings.dimensions; - - dims[0] = - DIM("t", ZarrDimensionType_Time, ARRAY_TIMEPOINTS, config.chunk.t, 1); - dims[1] = - DIM("c", ZarrDimensionType_Channel, ARRAY_CHANNELS, config.chunk.c, 1); - dims[2] = - DIM("z", ZarrDimensionType_Space, ARRAY_PLANES, config.chunk.z, 1); - dims[3] = DIM("y", - ZarrDimensionType_Space, - ARRAY_HEIGHT, - config.chunk.y, - config.chunks_per_shard_y); - dims[4] = DIM("x", - ZarrDimensionType_Space, - ARRAY_WIDTH, - config.chunk.x, - config.chunks_per_shard_x); - - return ZarrStream_create(&settings); -} - -double -run_benchmark(const BenchmarkConfig& config) -{ - auto* stream = setup_stream(config); - if (!stream) - return -1.0; - - const size_t frame_size = ARRAY_WIDTH * ARRAY_HEIGHT * sizeof(uint16_t); - std::vector frame(ARRAY_WIDTH * ARRAY_HEIGHT, 0); - const auto num_frames = ARRAY_PLANES * ARRAY_CHANNELS * ARRAY_TIMEPOINTS; - - Timer timer; - size_t bytes_out; - for (int i = 0; i < num_frames; ++i) { - if (ZarrStream_append(stream, frame.data(), frame_size, &bytes_out) != - ZarrStatusCode_Success) { - ZarrStream_destroy(stream); - return -1.0; - } - } - double elapsed = timer.elapsed(); - - ZarrStream_destroy(stream); - if (config.storage == "filesystem") { - fs::remove_all("benchmark.zarr"); - } - return elapsed; -} - -int -main() -{ - std::ofstream csv("zarr_benchmarks.csv"); - csv << "chunk_size,zarr_version,compression,storage,chunks_per_shard_y," - "chunks_per_shard_x,run,time_seconds\n"; - - std::vector configs; - for (const auto& chunk : CHUNK_CONFIGS) { - - // V2 configurations (no sharding) - for (const auto& compression : { "none", "lz4", "zstd" }) { - configs.push_back({ chunk, 2, compression, "filesystem", 1, 1 }); - - if (std::getenv("ZARR_S3_ENDPOINT")) { - configs.push_back({ chunk, - 2, - compression, - "s3", - 1, - 1, - std::getenv("ZARR_S3_ENDPOINT"), - std::getenv("ZARR_S3_BUCKET_NAME"), - std::getenv("ZARR_S3_ACCESS_KEY_ID"), - std::getenv("ZARR_S3_SECRET_ACCESS_KEY") }); - } - } - - unsigned int max_cps_y = (ARRAY_HEIGHT + chunk.y - 1) / chunk.y; - unsigned int max_cps_x = (ARRAY_WIDTH + chunk.x - 1) / chunk.x; - - // V3 configurations (with sharding) - for (unsigned int cps_y = 1; cps_y <= max_cps_y; cps_y *= 2) { - for (unsigned int cps_x = 1; cps_x <= max_cps_x; cps_x *= 2) { - for (const auto& compression : { "none", "lz4", "zstd" }) { - configs.push_back( - { chunk, 3, compression, "filesystem", cps_x, cps_y }); - - if (std::getenv("ZARR_S3_ENDPOINT")) { - configs.push_back( - { chunk, - 3, - compression, - "s3", - cps_x, - cps_y, - std::getenv("ZARR_S3_ENDPOINT"), - std::getenv("ZARR_S3_BUCKET_NAME"), - std::getenv("ZARR_S3_ACCESS_KEY_ID"), - std::getenv("ZARR_S3_SECRET_ACCESS_KEY") }); - } - } - } - } - } - - for (const auto& config : configs) { - std::string chunk_str = std::to_string(config.chunk.t) + "x" + - std::to_string(config.chunk.c) + "x" + - std::to_string(config.chunk.z) + "x" + - std::to_string(config.chunk.y) + "x" + - std::to_string(config.chunk.x); - - for (unsigned int run = 1; run <= NUM_RUNS; ++run) { - std::cout << "Benchmarking " << chunk_str << " Zarr V" - << config.zarr_version - << ", compression: " << config.compression - << ", storage: " << config.storage - << ", CPS (y): " << config.chunks_per_shard_y - << ", CPS (x): " << config.chunks_per_shard_x << ", (run " - << run << " / " << NUM_RUNS << ")..."; - double time = run_benchmark(config); - std::cout << " " << time << "s\n"; - if (time >= 0) { - csv << chunk_str << "," << config.zarr_version << "," - << config.compression << "," << config.storage << "," - << config.chunks_per_shard_y << "," - << config.chunks_per_shard_x << "," << run << "," - << std::fixed << std::setprecision(3) << time << "\n"; - } - csv.flush(); - } - } - - return 0; -} \ No newline at end of file