Skip to content

Commit

Permalink
Merge branch '238-implement-s3-sinks-pt-4' into 238-implement-s3-sink…
Browse files Browse the repository at this point in the history
…s-pt-5
  • Loading branch information
aliddell committed Jul 11, 2024
2 parents b6f0cc5 + 01ed5ce commit 915f86e
Show file tree
Hide file tree
Showing 29 changed files with 220 additions and 101 deletions.
30 changes: 2 additions & 28 deletions .github/workflows/test_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main
pull_request_target:
pull_request:
branches:
- main

Expand All @@ -15,7 +15,7 @@ jobs:
test:
name: ${{ matrix.platform }}
runs-on: ${{ matrix.platform }}
timeout-minutes: 10
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -61,29 +61,3 @@ jobs:
- name: Test
working-directory: ${{github.workspace}}/build
run: ctest -C ${{env.BUILD_TYPE}} -L acquire-driver-zarr --output-on-failure

merge:
name: Automerge
runs-on: "ubuntu-latest"
needs: test
if: ${{ github.actor == 'dependabot[bot]' }}
steps:
- name: Checkout PR
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.sha }}
repository: ${{ github.event.pull_request.head.repo.full_name }}

- name: Approve PR
run: gh pr review --approve "$PR_URL"
env:
PR_URL: ${{ github.event.pull_request.html_url }}
GH_TOKEN: ${{ secrets.PAT }}

# Don't auto-merge major version updates
- name: Merge PR
if: ${{ steps.dependabot-metadata.outputs.update-type != 'version-update:semver-major' }}
run: gh pr merge --auto --squash "$PR_URL"
env:
PR_URL: ${{ github.event.pull_request.html_url }}
GH_TOKEN: ${{ secrets.PAT }}
59 changes: 53 additions & 6 deletions src/common/dimension.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
#include "dimension.hh"

#include <cmath>
#include <thread>
#include "utilities.hh"
#include "zarr.hh"
#include "platform.h"
#include <algorithm>

namespace zarr = acquire::sink::zarr;

namespace {
inline std::string
trim(const std::string& s)
{
// trim left
std::string trimmed = s;
trimmed.erase(trimmed.begin(),
std::find_if(trimmed.begin(), trimmed.end(), [](char c) {
return !std::isspace(c);
}));

// trim right
trimmed.erase(std::find_if(trimmed.rbegin(),
trimmed.rend(),
[](char c) { return !std::isspace(c); })
.base(),
trimmed.end());

return trimmed;
}
} // namespace

zarr::Dimension::Dimension(const std::string& name,
DimensionType kind,
uint32_t array_size_px,
uint32_t chunk_size_px,
uint32_t shard_size_chunks)
: name{ name }
: name{ trim(name) }
, kind{ kind }
, array_size_px{ array_size_px }
, chunk_size_px{ chunk_size_px }
Expand All @@ -22,6 +40,7 @@ zarr::Dimension::Dimension(const std::string& name,
EXPECT(kind < DimensionTypeCount, "Invalid dimension type.");
EXPECT(!name.empty(), "Dimension name cannot be empty.");
}

zarr::Dimension::Dimension(const StorageDimension& dim)
: Dimension(dim.name.str,
dim.kind,
Expand All @@ -30,3 +49,31 @@ zarr::Dimension::Dimension(const StorageDimension& dim)
dim.shard_size_chunks)
{
}

#ifndef NO_UNIT_TESTS
#ifdef _WIN32
#define acquire_export __declspec(dllexport)
#else
#define acquire_export
#endif // _WIN32

extern "C" acquire_export int
unit_test__trim()
{
try {
EXPECT(trim(" foo") == "foo", "Failed to trim left.");
EXPECT(trim("foo ") == "foo", "Failed to trim right.");
EXPECT(trim(" foo ") == "foo", "Failed to trim both.");
EXPECT(trim("foo") == "foo", "Failed to trim either.");
EXPECT(trim("").empty(), "Failed to trim empty.");
return 1;
} catch (const std::exception& e) {
LOGE("Exception: %s", e.what());
} catch (...) {
LOGE("Unknown exception");
}

return 0;
}

#endif // NO_UNIT_TESTS
6 changes: 3 additions & 3 deletions src/common/dimension.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#ifndef H_ACQUIRE_STORAGE_ZARR_COMMON_DIMENSION_V0
#define H_ACQUIRE_STORAGE_ZARR_COMMON_DIMENSION_V0
#pragma once

#include "macros.hh"
#include "device/props/storage.h"

#include <string>
Expand All @@ -14,6 +14,7 @@ struct Dimension
unsigned int array_size_px,
unsigned int chunk_size_px,
unsigned int shard_size_chunks);

explicit Dimension(const StorageDimension& dim);

const std::string name;
Expand All @@ -23,4 +24,3 @@ struct Dimension
const unsigned int shard_size_chunks;
};
} // namespace acquire::sink::zarr
#endif // H_ACQUIRE_STORAGE_ZARR_COMMON_DIMENSION_V0
20 changes: 20 additions & 0 deletions src/common/macros.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include "logger.h"
#include <stdexcept>

#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define EXPECT(e, ...) \
do { \
if (!(e)) { \
LOGE(__VA_ARGS__); \
throw std::runtime_error("Expression was false: " #e); \
} \
} while (0)
#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e)

#define TRACE(...)

#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V)))
#define countof(e) (sizeof(e) / sizeof(*(e)))
18 changes: 10 additions & 8 deletions src/common/thread.pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
namespace zarr = acquire::sink::zarr;
namespace common = zarr::common;

common::ThreadPool::ThreadPool(size_t n_threads,
common::ThreadPool::ThreadPool(unsigned int n_threads,
std::function<void(const std::string&)> err)
: error_handler_{ err }
, is_accepting_jobs_{ true }
{
n_threads = std::clamp(
n_threads,
(size_t)1,
(size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1));
n_threads, 1u, std::max(std::thread::hardware_concurrency(), 1u));

for (auto i = 0; i < n_threads; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
}

common::ThreadPool::~ThreadPool() noexcept
{
{
Expand All @@ -29,26 +28,26 @@ common::ThreadPool::~ThreadPool() noexcept

await_stop();
}

void
common::ThreadPool::push_to_job_queue(JobT&& job)
{
std::unique_lock lock(jobs_mutex_);
CHECK(is_accepting_jobs_);

jobs_.push(std::move(job));
lock.unlock();

cv_.notify_one();
}

void
common::ThreadPool::await_stop() noexcept
{
{
std::scoped_lock lock(jobs_mutex_);
is_accepting_jobs_ = false;
}

cv_.notify_all();
cv_.notify_all();
}

// spin down threads
for (auto& thread : threads_) {
Expand All @@ -57,6 +56,7 @@ common::ThreadPool::await_stop() noexcept
}
}
}

std::optional<common::ThreadPool::JobT>
common::ThreadPool::pop_from_job_queue_() noexcept
{
Expand All @@ -68,11 +68,13 @@ common::ThreadPool::pop_from_job_queue_() noexcept
jobs_.pop();
return job;
}

bool
common::ThreadPool::should_stop_() const noexcept
{
return !is_accepting_jobs_ && jobs_.empty();
}

void
common::ThreadPool::thread_worker_()
{
Expand Down
2 changes: 1 addition & 1 deletion src/common/thread.pool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct ThreadPool final
// std::string& argument to the error handler is a diagnostic message from
// the failing job and is logged to the error stream by the Zarr driver when
// the next call to `append()` is made.
ThreadPool(size_t n_threads, std::function<void(const std::string&)> err);
ThreadPool(unsigned int n_threads, std::function<void(const std::string&)> err);
~ThreadPool() noexcept;

void push_to_job_queue(JobT&& job);
Expand Down
38 changes: 14 additions & 24 deletions src/common/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,28 +166,22 @@ common::bytes_per_chunk(const std::vector<Dimension>& dimensions,
return n_bytes;
}

const char*
common::sample_type_to_dtype(SampleType t)

{
static const char* table[] = { "u1", "u2", "i1", "i2",
"f4", "u2", "u2", "u2" };
if (t < countof(table)) {
return table[t];
} else {
throw std::runtime_error("Invalid sample type.");
}
}

const char*
common::sample_type_to_string(SampleType t) noexcept
{
static const char* table[] = { "u8", "u16", "i8", "i16",
"f32", "u16", "u16", "u16" };
if (t < countof(table)) {
return table[t];
} else {
return "unrecognized pixel type";
switch (t) {
case SampleType_u8:
return "u8";
case SampleType_u16:
return "u16";
case SampleType_i8:
return "i8";
case SampleType_i16:
return "i16";
case SampleType_f32:
return "f32";
default:
return "unrecognized pixel type";
}
}

Expand All @@ -207,15 +201,11 @@ common::split_uri(const std::string& uri)
auto end = uri.find_first_of(delim);

std::vector<std::string> out;
while (end <= std::string::npos) {
while (end != std::string::npos) {
if (end > begin) {
out.emplace_back(uri.substr(begin, end - begin));
}

if (end == std::string::npos) {
break;
}

begin = end + 1;
end = uri.find_first_of(delim, begin);
}
Expand Down
25 changes: 1 addition & 24 deletions src/common/utilities.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "logger.h"
#include "device/props/components.h"
#include "device/props/storage.h"
#include "macros.hh"
#include "dimension.hh"

#include <condition_variable>
Expand All @@ -15,23 +16,6 @@
#include <stdexcept>
#include <thread>

#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define EXPECT(e, ...) \
do { \
if (!(e)) { \
LOGE(__VA_ARGS__); \
throw std::runtime_error("Expression was false: " #e); \
} \
} while (0)
#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e)

// #define TRACE(...) LOG(__VA_ARGS__)
#define TRACE(...)

#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V)))
#define countof(e) (sizeof(e) / sizeof(*(e)))

namespace fs = std::filesystem;

namespace acquire::sink::zarr {
Expand Down Expand Up @@ -96,13 +80,6 @@ size_t
bytes_per_chunk(const std::vector<Dimension>& dimensions,
const SampleType& dtype);

/// @brief Get the Zarr dtype for a given SampleType.
/// @param t An enumerated sample type.
/// @throw std::runtime_error if @par t is not a valid SampleType.
/// @return A representation of the SampleType @par t expected by a Zarr reader.
const char*
sample_type_to_dtype(SampleType t);

/// @brief Get a string representation of the SampleType enum.
/// @param t An enumerated sample type.
/// @return A human-readable representation of the SampleType @par t.
Expand Down
Loading

0 comments on commit 915f86e

Please sign in to comment.