Skip to content

Commit

Permalink
Add S3 support (#199)
Browse files Browse the repository at this point in the history
aliddell authored Aug 5, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent f85e978 commit da0a68a
Showing 13 changed files with 343 additions and 109 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/test_s3.yml
Original file line number Diff line number Diff line change
@@ -17,11 +17,16 @@ jobs:
MINIO_ROOT_PASSWORD: password
MINIO_URL: http://localhost:9000
MINIO_ALIAS: myminio
MINIO_BUCKET: acquire-test
MINIO_ACCESS_KEY: acquire
MINIO_SECRET_KEY: 12345678

steps:
- uses: actions/checkout@v3
with:
submodules: true
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
@@ -42,3 +47,20 @@ jobs:
sleep 5
mcli alias set $MINIO_ALIAS $MINIO_URL $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD
mcli admin user svcacct add --access-key $MINIO_ACCESS_KEY --secret-key $MINIO_SECRET_KEY $MINIO_ALIAS $MINIO_ROOT_USER
- name: Create a bucket
run: |
mcli mb $MINIO_ALIAS/$MINIO_BUCKET
- name: Install
run: |
pip install --upgrade pip
pip install '.[testing]'
- name: Test
run: |
echo "ZARR_S3_ENDPOINT=$MINIO_URL" > .env
echo "ZARR_S3_BUCKET_NAME=$MINIO_BUCKET" >> .env
echo "ZARR_S3_ACCESS_KEY_ID=$MINIO_ACCESS_KEY" >> .env
echo "ZARR_S3_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY" >> .env
python -m pytest -k test_write_zarr_to_s3
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "acquire-imaging"
authors = ["Nathan Clack <[email protected]>"]
version = "0.3.0"
version = "0.3.1"
edition = "2021"

[lib]
@@ -28,7 +28,7 @@ bindgen = "0.69.1"
cmake = "0.1"
http = "1.0"
json = "0.12"
reqwest = { version = "0.11", features = ["blocking", "json"] }
reqwest = { version = "0.12", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
zip-extract = "0.1"
1 change: 0 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ fn main() {

let dst = cmake::Config::new("acquire-common")
.profile("RelWithDebInfo")
.static_crt(true)
.define("NOTEST", "TRUE")
.define("NO_UNIT_TESTS", "TRUE")
.define("NO_EXAMPLES", "TRUE")
2 changes: 1 addition & 1 deletion drivers.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"acquire-driver-zarr": "0.1.11",
"acquire-driver-zarr": "0.1.12",
"acquire-driver-egrabber": "0.1.5",
"acquire-driver-hdcam": "0.1.9",
"acquire-driver-spinnaker": "0.1.1",
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ testing = [
"dask",
"ome-zarr",
"ruff",
"python-dotenv"
]

[project.entry-points."napari.manifest"]
10 changes: 4 additions & 6 deletions python/acquire/__init__.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@
if TYPE_CHECKING:
import napari # type: ignore


g_runtime: Optional[Runtime] = None
"""The global acquire runtime."""

@@ -68,7 +67,7 @@ def normalize_fallback_arg(arg: Union[str, List[str]]) -> List[str]:
p.video[0].storage.identifier = dm.select_one_of(
DeviceKind.Storage, storage
)
p.video[0].storage.settings.filename = output_filename
p.video[0].storage.settings.uri = output_filename
p.video[0].max_frame_count = 100
p.video[0].frame_average_count = 0 # disables

@@ -91,7 +90,6 @@ def setup_one_streams(runtime: Runtime, frame_count: int) -> Properties:

p.video[0].camera.identifier = dm.select(DeviceKind.Camera, cameras[0])
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Trash")
# p.video[0].storage.settings.filename = output_filename
p.video[0].camera.settings.binning = 1
p.video[0].camera.settings.shape = (64, 64)
p.video[0].camera.settings.pixel_type = SampleType.U16
@@ -116,7 +114,6 @@ def setup_two_streams(runtime: Runtime, frame_count: int) -> Properties:

p.video[0].camera.identifier = dm.select(DeviceKind.Camera, cameras[0])
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Trash")
# p.video[0].storage.settings.filename = output_filename
p.video[0].camera.settings.binning = 1
p.video[0].camera.settings.shape = (2304, 2304)
p.video[0].camera.settings.pixel_type = SampleType.U16
@@ -125,7 +122,6 @@ def setup_two_streams(runtime: Runtime, frame_count: int) -> Properties:

p.video[1].camera.identifier = dm.select(DeviceKind.Camera, cameras[1])
p.video[1].storage.identifier = dm.select(DeviceKind.Storage, "Trash")
# p.video[1].storage.settings.filename = output_filename
p.video[1].camera.settings.binning = 1
p.video[1].camera.settings.shape = (64, 64)
p.video[1].camera.settings.pixel_type = SampleType.U16
@@ -221,7 +217,9 @@ def next_frame() -> Optional[npt.NDArray[Any]]:
counts, bins = histogram(update_times)
p50 = bins[where(cumsum(counts) >= 0.5 * len(update_times))[0][0]]
p90 = bins[where(cumsum(counts) >= 0.9 * len(update_times))[0][0]]
logging.info(f"Update times - median: {p50*1e3} ms 90%<{p90*1e3} ms")
logging.info(
f"Update times - median: {p50 * 1e3} ms 90%<{p90 * 1e3} ms"
)

runtime.stop()
logging.info("STOPPED")
26 changes: 17 additions & 9 deletions python/acquire/acquire.pyi
Original file line number Diff line number Diff line change
@@ -373,7 +373,8 @@ class DeviceManager:
supported.
Returns:
Optional[DeviceIdentifier]: The selected device identifier, or None if none of the specified devices are available.
Optional[DeviceIdentifier]: The selected device identifier, or None
if none of the specified devices are available.
"""

@final
@@ -1023,7 +1024,7 @@ class StorageCapabilities:
chunking_is_supported:
A boolean indicating whether chunking is supported for this storage
device.
shard_is_supported:
sharding_is_supported:
A boolean indicating whether sharding is supported for this storage
device.
multiscale_is_supported:
@@ -1034,6 +1035,7 @@ class StorageCapabilities:
chunking_is_supported: bool
sharding_is_supported: bool
multiscale_is_supported: bool
s3_is_supported: bool

def dict(self) -> Dict[str, Any]:
"""Returns a dictionary of a `StorageCapabilities` object's attributes."""
@@ -1076,14 +1078,18 @@ class StorageProperties:
"""The `StorageProperties` class represents properties for data storage.
Attributes:
uri:
The URI where the image data will be stored.
external_metadata_json:
An optional attribute of the metadata JSON filename as a string.
filename:
An optional attribute representing the filename for storing the
image data.
Optional JSON-formatted metadata for the acquisition.
s3_access_key_id:
The access key ID for the S3 bucket. This value is only applicable
for Zarr storage devices and S3 URIs.
s3_secret_access_key:
The secret access key for the S3 bucket. This value is only applicable
for Zarr storage devices and S3 URIs.
first_frame_id:
An integer representing the ID of the first frame for a given
acquisition.
The ID of the first frame.
pixel_scale_um:
A tuple of two floats representing the pixel size of the camera in
micrometers.
@@ -1096,8 +1102,10 @@ class StorageProperties:
A boolean indicating whether multiscale storage is enabled.
"""

uri: Optional[str]
external_metadata_json: Optional[str]
filename: Optional[str]
s3_access_key_id: Optional[str]
s3_secret_access_key: Optional[str]
first_frame_id: int
pixel_scale_um: Tuple[float, float]
acquisition_dimensions: List[StorageDimension]
2 changes: 2 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -61,7 +61,9 @@ impl RawRuntime {
}

fn start(&self) -> Result<()> {
debug!("START Runtime");
unsafe { capi::acquire_start(self.inner.as_ptr()) }.ok()?;
debug!("START Runtime OK");
Ok(())
}

112 changes: 90 additions & 22 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -100,12 +100,20 @@ impl TryFrom<capi::StorageDimension> for StorageDimension {
pub struct StorageProperties {
#[pyo3(get, set)]
#[serde(default)]
pub(crate) filename: Option<String>,
pub(crate) uri: Option<String>,

#[pyo3(get, set)]
#[serde(default)]
pub(crate) external_metadata_json: Option<String>,

#[pyo3(get, set)]
#[serde(default)]
pub(crate) s3_access_key_id: Option<String>,

#[pyo3(get, set)]
#[serde(default)]
pub(crate) s3_secret_access_key: Option<String>,

/// Doesn't do anything right now. One day could be used for file-rollover.
#[pyo3(get, set)]
#[serde(default)]
@@ -126,8 +134,10 @@ impl_plain_old_dict!(StorageProperties);
impl Default for StorageProperties {
fn default() -> Self {
Self {
filename: Default::default(),
uri: Default::default(),
external_metadata_json: Default::default(),
s3_access_key_id: Default::default(),
s3_secret_access_key: Default::default(),
first_frame_id: Default::default(),
pixel_scale_um: (1., 1.), // Default to 1.0 um/pixel (square pixels)
acquisition_dimensions: Default::default(),
@@ -140,11 +150,11 @@ impl TryFrom<capi::StorageProperties> for StorageProperties {
type Error = anyhow::Error;

fn try_from(value: capi::StorageProperties) -> Result<Self, Self::Error> {
let filename = if value.filename.nbytes == 0 {
let uri = if value.uri.nbytes == 0 {
None
} else {
Some(
unsafe { CStr::from_ptr(value.filename.str_) }
unsafe { CStr::from_ptr(value.uri.str_) }
.to_str()?
.to_owned(),
)
@@ -158,6 +168,24 @@ impl TryFrom<capi::StorageProperties> for StorageProperties {
.to_owned(),
)
};
let s3_access_key_id = if value.access_key_id.nbytes == 0 {
None
} else {
Some(
unsafe { CStr::from_ptr(value.access_key_id.str_) }
.to_str()?
.to_owned(),
)
};
let s3_secret_access_key = if value.secret_access_key.nbytes == 0 {
None
} else {
Some(
unsafe { CStr::from_ptr(value.secret_access_key.str_) }
.to_str()?
.to_owned(),
)
};

let mut acquisition_dimensions: Vec<Py<StorageDimension>> = Default::default();
for i in 0..value.acquisition_dimensions.size {
@@ -174,9 +202,11 @@ impl TryFrom<capi::StorageProperties> for StorageProperties {
}

Ok(Self {
filename,
first_frame_id: value.first_frame_id,
uri,
external_metadata_json,
s3_access_key_id,
s3_secret_access_key,
first_frame_id: value.first_frame_id,
pixel_scale_um: (value.pixel_scale_um.x, value.pixel_scale_um.y),
acquisition_dimensions,
enable_multiscale: (value.enable_multiscale == 1),
@@ -254,6 +284,9 @@ pub struct StorageCapabilities {

#[pyo3(get)]
multiscale_is_supported: bool,

#[pyo3(get)]
s3_is_supported: bool,
}

impl_plain_old_dict!(StorageCapabilities);
@@ -264,6 +297,7 @@ impl Default for StorageCapabilities {
chunking_is_supported: Default::default(),
sharding_is_supported: Default::default(),
multiscale_is_supported: Default::default(),
s3_is_supported: Default::default(),
}
}
}
@@ -276,6 +310,7 @@ impl TryFrom<capi::StoragePropertyMetadata> for StorageCapabilities {
chunking_is_supported: value.chunking_is_supported == 1,
sharding_is_supported: value.sharding_is_supported == 1,
multiscale_is_supported: value.multiscale_is_supported == 1,
s3_is_supported: value.s3_is_supported == 1,
})
}
}
@@ -284,52 +319,71 @@ impl TryFrom<capi::StoragePropertyMetadata> for StorageCapabilities {
impl Default for capi::StorageProperties {
fn default() -> Self {
Self {
filename: Default::default(),
first_frame_id: Default::default(),
uri: Default::default(),
external_metadata_json: Default::default(),
access_key_id: Default::default(),
secret_access_key: Default::default(),
first_frame_id: Default::default(),
pixel_scale_um: Default::default(),
acquisition_dimensions: Default::default(),
enable_multiscale: Default::default(),
}
}
}

fn str_to_cstring(str: &Option<String>) -> Result<Option<CString>> {
if let Some(uri) = str {
Ok(Some(CString::new(uri.as_str())?))
} else {
Ok(None)
}
}

impl TryFrom<&StorageProperties> for capi::StorageProperties {
type Error = anyhow::Error;

fn try_from(value: &StorageProperties) -> Result<Self, Self::Error> {
let mut out: capi::StorageProperties = unsafe { std::mem::zeroed() };

// Careful: x needs to live long enough
let x = if let Some(filename) = &value.filename {
Some(CString::new(filename.as_str())?)
} else {
None
};
let (filename, bytes_of_filename) = if let Some(ref x) = x {
let x = str_to_cstring(&value.uri)?;
let (uri, bytes_of_uri) = if let Some(ref x) = x {
(x.as_ptr(), x.to_bytes_with_nul().len())
} else {
(null(), 0)
};

// Careful: y needs to live long enough
let y = if let Some(metadata) = &value.external_metadata_json {
Some(CString::new(metadata.as_str())?)
} else {
None
};
let y = str_to_cstring(&value.external_metadata_json)?;
let (metadata, bytes_of_metadata) = if let Some(ref y) = y {
(y.as_ptr(), y.to_bytes_with_nul().len())
} else {
(null(), 0)
};

// Careful: z needs to live long enough
let z = str_to_cstring(&value.s3_access_key_id)?;
let (access_key_id, bytes_of_access_key_id) = if let Some(ref z) = z {
(z.as_ptr(), z.to_bytes_with_nul().len())
} else {
(null(), 0)
};

// Careful: w needs to live long enough
let w = str_to_cstring(&value.s3_secret_access_key)?;
let (secret_access_key, bytes_of_secret_access_key) = if let Some(ref w) = w {
(w.as_ptr(), w.to_bytes_with_nul().len())
} else {
(null(), 0)
};

// This copies the string into a buffer owned by the return value.
if !unsafe {
capi::storage_properties_init(
&mut out,
value.first_frame_id,
filename,
bytes_of_filename as _,
uri,
bytes_of_uri as _,
metadata,
bytes_of_metadata as _,
capi::PixelScale {
@@ -340,11 +394,23 @@ impl TryFrom<&StorageProperties> for capi::StorageProperties {
) == 1
} {
Err(anyhow::anyhow!("Failed to initialize storage properties."))
} else if !unsafe {
capi::storage_properties_set_access_key_and_secret(
&mut out,
access_key_id,
bytes_of_access_key_id as _,
secret_access_key,
bytes_of_secret_access_key as _,
) == 1
} {
Err(anyhow::anyhow!(
"Failed to set access key id and secret access key."
))
} else if !unsafe {
capi::storage_properties_set_enable_multiscale(&mut out, value.enable_multiscale as u8)
== 1
} {
Err(anyhow::anyhow!("Failed acquire api status check"))
Err(anyhow::anyhow!("Failed to set multiscale settings."))
} else {
// initialize each dimension separately
for (i, pydim) in value.acquisition_dimensions.iter().enumerate() {
@@ -416,6 +482,7 @@ impl Default for capi::StoragePropertyMetadata {
chunking_is_supported: Default::default(),
sharding_is_supported: Default::default(),
multiscale_is_supported: Default::default(),
s3_is_supported: Default::default(),
}
}
}
@@ -428,6 +495,7 @@ impl TryFrom<&StorageCapabilities> for capi::StoragePropertyMetadata {
chunking_is_supported: value.chunking_is_supported as u8,
sharding_is_supported: value.sharding_is_supported as u8,
multiscale_is_supported: value.multiscale_is_supported as u8,
s3_is_supported: value.s3_is_supported as u8,
})
}
}
84 changes: 58 additions & 26 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
import time
from datetime import timedelta
from time import sleep
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import acquire
from acquire import DeviceKind, DeviceState, Runtime, Trigger, PropertyType
@@ -102,7 +102,7 @@ def test_repeat_acq(runtime: Runtime):
assert (
p.video[0].storage.identifier is not None
), "Expected a storage identifier"
assert p.video[0].storage.settings.filename == "out.tif"
assert p.video[0].storage.settings.uri == "out.tif"
p.video[0].camera.settings.shape = (192, 108)
p.video[0].max_frame_count = 10
p = runtime.set_configuration(p)
@@ -165,15 +165,15 @@ def test_set_storage(runtime: Runtime):
)
assert p.video[0].storage.identifier is not None

p.video[0].storage.settings.filename = "out.tif"
assert p.video[0].storage.settings.filename == "out.tif"
p.video[0].storage.settings.uri = "out.tif"
assert p.video[0].storage.settings.uri == "out.tif"


def test_setup(runtime: Runtime):
p = acquire.setup(runtime, "simulated.*empty", "Trash")
assert p.video[0].camera.identifier is not None
assert p.video[0].storage.identifier is not None
assert p.video[0].storage.settings.filename == "out.tif"
assert p.video[0].storage.settings.uri == "out.tif"
assert p.video[0].max_frame_count == 100
p.video[0].camera.settings.shape = (192, 108)
p = runtime.set_configuration(p)
@@ -196,7 +196,7 @@ def took_too_long():
for f in a.frames():
logging.info(
f"{f.data().shape} {f.data()[0][0][0][0]} "
+ f"{f.metadata()}"
f"{f.metadata()}"
)
nframes += packet
logging.info(
@@ -219,7 +219,7 @@ def test_selection_is_consistent(runtime: Runtime):
assert hcam1 == hcam2


def test_change_filename(runtime: Runtime):
def test_change_uri(runtime: Runtime):
dm = runtime.device_manager()
p = runtime.get_configuration()
p.video[0].camera.identifier = dm.select(DeviceKind.Camera, "simulated.*")
@@ -233,9 +233,9 @@ def test_change_filename(runtime: Runtime):
"another long one ok it is really long this time.tif",
]
for name in names:
p.video[0].storage.settings.filename = name
p.video[0].storage.settings.uri = name
p = runtime.set_configuration(p)
assert p.video[0].storage.settings.filename == name
assert p.video[0].storage.settings.uri == name

nframes = 0
runtime.start()
@@ -257,7 +257,7 @@ def test_write_external_metadata_to_tiff(
p.video[0].camera.settings.shape = (33, 47)
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Tiff")
p.video[0].max_frame_count = 3
p.video[0].storage.settings.filename = f"{request.node.name}.tif"
p.video[0].storage.settings.uri = f"{request.node.name}.tif"
metadata = {"hello": "world"}
p.video[0].storage.settings.external_metadata_json = json.dumps(metadata)
runtime.set_configuration(p)
@@ -270,7 +270,7 @@ def test_write_external_metadata_to_tiff(
runtime.stop()

# Check that the written tif has the expected structure
with tifffile.TiffFile(p.video[0].storage.settings.filename) as f:
with tifffile.TiffFile(p.video[0].storage.settings.uri) as f:

def meta(iframe: int) -> Dict[Any, Any]:
return json.loads(f.pages[iframe].tags["ImageDescription"].value)
@@ -287,7 +287,7 @@ def meta(iframe: int) -> Dict[Any, Any]:

@pytest.mark.skip(
reason="Runs into memory limitations on github ci."
+ " See https://github.com/acquire-project/cpx/issues/147"
" See https://github.com/acquire-project/cpx/issues/147"
)
def test_two_video_streams(runtime: Runtime):
dm = runtime.device_manager()
@@ -334,9 +334,9 @@ def is_not_done() -> bool:
expected_frame_id = nframes[stream_id] + i
assert frame.metadata().frame_id == expected_frame_id, (
"frame id's didn't match "
+ f"({frame.metadata().frame_id}"
+ f"!={expected_frame_id})"
+ f" [stream {stream_id} nframes {nframes}]"
f"({frame.metadata().frame_id}"
f"!={expected_frame_id})"
f" [stream {stream_id} nframes {nframes}]"
)
nframes[stream_id] += n
logging.debug(f"NFRAMES {nframes}")
@@ -380,7 +380,7 @@ def test_abort(runtime: Runtime):

def wait_for_data(
runtime: Runtime, stream_id: int = 0, timeout: Optional[timedelta] = None
) -> acquire.AvailableData:
) -> Tuple[int, int]:
# None is used as a missing sentinel value, not to indicate no timeout.
if timeout is None:
timeout = timedelta(seconds=5)
@@ -517,14 +517,14 @@ def test_simulated_camera_capabilities(


@pytest.mark.parametrize(
("descriptor", "chunking", "sharding", "multiscale"),
("descriptor", "chunking", "sharding", "multiscale", "s3"),
[
("raw", False, False, False),
("trash", False, False, False),
("tiff", False, False, False),
("tiff-json", False, False, False),
("zarr", True, False, True),
("zarrv3", True, True, False),
("raw", False, False, False, False),
("trash", False, False, False, False),
("tiff", False, False, False, False),
("tiff-json", False, False, False, False),
("zarr", True, False, True, True),
("zarrv3", True, True, True, True),
],
)
def test_storage_capabilities(
@@ -533,6 +533,7 @@ def test_storage_capabilities(
chunking: bool,
sharding: bool,
multiscale: bool,
s3: bool,
):
dm = runtime.device_manager()
p = runtime.get_configuration()
@@ -551,6 +552,7 @@ def test_storage_capabilities(
assert storage.chunking_is_supported == chunking
assert storage.sharding_is_supported == sharding
assert storage.multiscale_is_supported == multiscale
assert storage.s3_is_supported == s3


def test_invalidated_frame(runtime: Runtime):
@@ -585,19 +587,49 @@ def test_switch_device_identifier(

dm = runtime.device_manager()
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "tiff")
p.video[0].storage.settings.filename = f"{request.node.name}.tif"
p.video[0].storage.settings.uri = f"{request.node.name}.tif"
p = runtime.set_configuration(p)
assert p.video[0].storage.identifier.name == "tiff"

runtime.start()
runtime.stop()

# will raise an exception if the file doesn't exist or is invalid
with tifffile.TiffFile(p.video[0].storage.settings.filename):
with tifffile.TiffFile(p.video[0].storage.settings.uri):
pass

# cleanup
os.remove(p.video[0].storage.settings.filename)
os.remove(p.video[0].storage.settings.uri)


def test_acquire_unaligned(runtime: Runtime):
"""Due to the way the queue is constructed, if a VideoFrame with data has
a size that is not divisible by 8 bytes, the next VideoFrame will be
unaligned. Rust will panic when this happens, so we pad VideoFrames to 8
bytes. This test checks that the runtime handles this correctly to satisfy
Rust.
"""
dm = runtime.device_manager()
props = runtime.get_configuration()
video = props.video[0]
video.camera.identifier = dm.select(acquire.DeviceKind.Camera, ".*empty.*")

# sizeof(VideoFrame) + 33 * 47 is not divisible by 8
video.camera.settings.shape = (33, 47)
video.storage.identifier = dm.select(acquire.DeviceKind.Storage, "trash")

video.max_frame_count = 3
runtime.set_configuration(props)

nframes = 0
runtime.start()
while nframes < video.max_frame_count:
with runtime.get_available_data(0) as packet:
for i in range(packet.get_frame_count()):
_ = next(packet.frames())
nframes += 1
runtime.stop()
assert nframes == video.max_frame_count


# NOTES:
2 changes: 1 addition & 1 deletion tests/test_egrabber.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ def test_vieworks_stream(

p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Zarr")
assert p.video[0].storage.identifier
p.video[0].storage.settings.filename = request.node.name + ".zarr"
p.video[0].storage.settings.uri = request.node.name + ".zarr"

# Set the camera here so we can query it's triggering capabilities.
# This comes in the form of the returned properties.
184 changes: 144 additions & 40 deletions tests/test_zarr.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import os
from pathlib import Path
from tempfile import mkdtemp
from typing import Optional

import dotenv
import numpy as np
import pytest
import s3fs
import zarr
from dask import array as da
from numcodecs import blosc as blosc
@@ -15,11 +18,13 @@
import acquire
from acquire import Runtime, DeviceKind

dotenv.load_dotenv()


# FIXME (aliddell): this should be module scoped, but the runtime is leaky
@pytest.fixture(scope="function")
def runtime():
yield acquire.Runtime()
yield Runtime()


def test_set_acquisition_dimensions(
@@ -33,7 +38,9 @@ def test_set_acquisition_dimensions(
props.video[0].camera.settings.shape = (64, 48)

props.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Zarr")
props.video[0].storage.settings.filename = f"{request.node.name}.zarr"
props.video[0].storage.settings.uri = str(
Path(mkdtemp()) / f"{request.node.name}.zarr"
)
props.video[0].max_frame_count = 32

# configure storage dimensions
@@ -59,8 +66,6 @@ def test_set_acquisition_dimensions(
]
assert len(props.video[0].storage.settings.acquisition_dimensions) == 3

# sleep(10)

# set and test
props = runtime.set_configuration(props)
assert len(props.video[0].storage.settings.acquisition_dimensions) == 3
@@ -104,17 +109,21 @@ def test_write_external_metadata_to_zarr(
runtime: Runtime, request: pytest.FixtureRequest
):
dm = runtime.device_manager()
p = runtime.get_configuration()
p.video[0].camera.identifier = dm.select(
DeviceKind.Camera, "simulated.*sin.*"
props = runtime.get_configuration()
props.video[0].camera.identifier = dm.select(
DeviceKind.Camera, "simulated.*empty.*"
)
props.video[0].camera.settings.shape = (33, 47)
props.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Zarr")
props.video[0].max_frame_count = 4
props.video[0].storage.settings.uri = str(
Path(mkdtemp()) / f"{request.node.name}.zarr"
)
p.video[0].camera.settings.shape = (33, 47)
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Zarr")
p.video[0].max_frame_count = 4
p.video[0].storage.settings.filename = f"{request.node.name}.zarr"
metadata = {"hello": "world"}
p.video[0].storage.settings.external_metadata_json = json.dumps(metadata)
p.video[0].storage.settings.pixel_scale_um = (0.5, 4)
props.video[0].storage.settings.external_metadata_json = json.dumps(
metadata
)
props.video[0].storage.settings.pixel_scale_um = (0.5, 4)

# configure storage dimensions
dimension_x = acquire.StorageDimension(
@@ -132,23 +141,23 @@ def test_write_external_metadata_to_zarr(
)
assert dimension_z.shard_size_chunks == 0

p.video[0].storage.settings.acquisition_dimensions = [
props.video[0].storage.settings.acquisition_dimensions = [
dimension_x,
dimension_y,
dimension_z,
]

p = runtime.set_configuration(p)
props = runtime.set_configuration(props)

nframes = 0
runtime.start()
while nframes < p.video[0].max_frame_count:
while nframes < props.video[0].max_frame_count:
with runtime.get_available_data(0) as packet:
nframes += packet.get_frame_count()
runtime.stop()

assert p.video[0].storage.settings.filename
store = parse_url(p.video[0].storage.settings.filename)
assert props.video[0].storage.settings.uri
store = parse_url(props.video[0].storage.settings.uri)
assert store
reader = Reader(store)
nodes = list(reader())
@@ -161,9 +170,9 @@ def test_write_external_metadata_to_zarr(
# scale/level.
image_data = multi_scale_image_node.data[0]
assert image_data.shape == (
p.video[0].max_frame_count,
p.video[0].camera.settings.shape[1],
p.video[0].camera.settings.shape[0],
props.video[0].max_frame_count,
props.video[0].camera.settings.shape[1],
props.video[0].camera.settings.shape[0],
)

multi_scale_image_metadata = multi_scale_image_node.metadata
@@ -183,11 +192,11 @@ def test_write_external_metadata_to_zarr(
pixel_scale_um = tuple(
transform["scale"][axis_names.index(axis)] for axis in ("x", "y")
)
assert pixel_scale_um == p.video[0].storage.settings.pixel_scale_um
assert pixel_scale_um == props.video[0].storage.settings.pixel_scale_um

# ome-zarr only reads attributes it recognizes, so use a plain zarr reader
# to read external metadata instead.
group = zarr.open(p.video[0].storage.settings.filename)
group = zarr.open(props.video[0].storage.settings.uri)
assert group["0"].attrs.asdict() == metadata


@@ -201,8 +210,8 @@ def test_write_external_metadata_to_zarr(
def test_write_compressed_zarr(
runtime: Runtime, request: pytest.FixtureRequest, compressor_name
):
filename = f"{request.node.name}.zarr"
filename = filename.replace("[", "_").replace("]", "_")
uri = str(Path(mkdtemp()) / f"{request.node.name}.zarr")
uri = uri.replace("[", "_").replace("]", "_")

dm = runtime.device_manager()
p = runtime.get_configuration()
@@ -216,7 +225,7 @@ def test_write_compressed_zarr(
f"ZarrBlosc1{compressor_name.capitalize()}ByteShuffle",
)
p.video[0].max_frame_count = 70
p.video[0].storage.settings.filename = filename
p.video[0].storage.settings.uri = uri
metadata = {"foo": "bar"}
p.video[0].storage.settings.external_metadata_json = json.dumps(metadata)

@@ -254,7 +263,7 @@ def test_write_compressed_zarr(
runtime.stop()

# load from Zarr
group = zarr.open(p.video[0].storage.settings.filename)
group = zarr.open(p.video[0].storage.settings.uri)
data = group["0"]

assert data.compressor.cname == compressor_name
@@ -270,7 +279,7 @@ def test_write_compressed_zarr(
assert data.attrs.asdict() == metadata

# load from Dask
data = da.from_zarr(p.video[0].storage.settings.filename, component="0")
data = da.from_zarr(p.video[0].storage.settings.uri, component="0")
assert data.shape == (
p.video[0].max_frame_count,
1,
@@ -308,7 +317,9 @@ def test_write_zarr_with_chunking(
DeviceKind.Storage,
"Zarr",
)
p.video[0].storage.settings.filename = f"{request.node.name}.zarr"
p.video[0].storage.settings.uri = str(
Path(mkdtemp()) / f"{request.node.name}.zarr"
)
p.video[0].max_frame_count = number_of_frames

# configure storage dimensions
@@ -338,7 +349,7 @@ def test_write_zarr_with_chunking(
runtime.start()
runtime.stop()

group = zarr.open(p.video[0].storage.settings.filename)
group = zarr.open(p.video[0].storage.settings.uri)
data = group["0"]

assert data.chunks == (64, 540, 960)
@@ -355,8 +366,8 @@ def test_write_zarr_multiscale(
runtime: acquire.Runtime,
request: pytest.FixtureRequest,
):
filename = f"{request.node.name}.zarr"
filename = filename.replace("[", "_").replace("]", "_")
uri = str(Path(mkdtemp()) / f"{request.node.name}.zarr")
uri = uri.replace("[", "_").replace("]", "_")

dm = runtime.device_manager()

@@ -371,7 +382,7 @@ def test_write_zarr_multiscale(
DeviceKind.Storage,
"Zarr",
)
p.video[0].storage.settings.filename = filename
p.video[0].storage.settings.uri = uri
p.video[0].storage.settings.pixel_scale_um = (1, 1)
p.video[0].max_frame_count = 100

@@ -403,12 +414,11 @@ def test_write_zarr_multiscale(
runtime.start()
runtime.stop()

reader = Reader(parse_url(filename))
reader = Reader(parse_url(uri))
zgroup = list(reader())[0]
# loads each layer as a dask array from the Zarr dataset
data = [
da.from_zarr(filename, component=str(i))
for i in range(len(zgroup.data))
da.from_zarr(uri, component=str(i)) for i in range(len(zgroup.data))
]
assert len(data) == 3

@@ -451,7 +461,9 @@ def test_write_zarr_v3(
DeviceKind.Storage,
f"ZarrV3Blosc1{codec.capitalize()}ByteShuffle" if codec else "ZarrV3",
)
p.video[0].storage.settings.filename = f"{request.node.name}.zarr"
p.video[0].storage.settings.uri = str(
Path(mkdtemp()) / f"{request.node.name}.zarr"
)
p.video[0].max_frame_count = number_of_frames

# configure storage dimensions
@@ -490,7 +502,7 @@ def test_write_zarr_v3(
runtime.start()
runtime.stop()

store = zarr.DirectoryStoreV3(p.video[0].storage.settings.filename)
store = zarr.DirectoryStoreV3(p.video[0].storage.settings.uri)
group = zarr.open(store=store, mode="r")
data = group["0"]

@@ -516,7 +528,7 @@ def test_metadata_with_trailing_whitespace(
p.video[0].camera.settings.exposure_time_us = 1e4
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Zarr")
p.video[0].max_frame_count = 70
p.video[0].storage.settings.filename = str(
p.video[0].storage.settings.uri = str(
Path(mkdtemp()) / f"{request.node.name}.zarr"
)
metadata = {"foo": "bar"}
@@ -549,7 +561,99 @@ def test_metadata_with_trailing_whitespace(
runtime.stop()

# load from Zarr
group = zarr.open(p.video[0].storage.settings.filename)
group = zarr.open(p.video[0].storage.settings.uri)
data = group["0"]

assert data.attrs.asdict() == metadata


def test_write_zarr_to_s3(runtime: Runtime, request: pytest.FixtureRequest):
required_env_vars = [
"ZARR_S3_ENDPOINT",
"ZARR_S3_BUCKET_NAME",
"ZARR_S3_ACCESS_KEY_ID",
"ZARR_S3_SECRET_ACCESS_KEY",
]

for var in required_env_vars:
if var not in os.environ:
pytest.skip(f"{var} not set")

zarr_s3_endpoint = os.environ["ZARR_S3_ENDPOINT"]
zarr_s3_bucket_name = os.environ["ZARR_S3_BUCKET_NAME"]
zarr_s3_access_key_id = os.environ["ZARR_S3_ACCESS_KEY_ID"]
zarr_s3_secret_access_key = os.environ["ZARR_S3_SECRET_ACCESS_KEY"]

dm = runtime.device_manager()
props = runtime.get_configuration()
video = props.video[0]

video.camera.identifier = dm.select(
DeviceKind.Camera, "simulated.*empty.*"
)
video.camera.settings.shape = (1920, 1080)
video.camera.settings.exposure_time_us = 1e4
video.camera.settings.pixel_type = acquire.SampleType.U8

video.storage.identifier = dm.select(
DeviceKind.Storage,
"Zarr",
)
video.storage.settings.uri = (
f"{zarr_s3_endpoint}/{zarr_s3_bucket_name}/{request.node.name}.zarr"
)
video.storage.settings.s3_access_key_id = zarr_s3_access_key_id
video.storage.settings.s3_secret_access_key = zarr_s3_secret_access_key

video.max_frame_count = 64

# configure storage dimensions
dimension_x = acquire.StorageDimension(
name="x", kind="Space", array_size_px=1920, chunk_size_px=960
)
assert dimension_x.shard_size_chunks == 0

dimension_y = acquire.StorageDimension(
name="y", kind="Space", array_size_px=1080, chunk_size_px=540
)
assert dimension_y.shard_size_chunks == 0

dimension_t = acquire.StorageDimension(
name="t", kind="Time", array_size_px=0, chunk_size_px=64
)
assert dimension_t.shard_size_chunks == 0

video.storage.settings.acquisition_dimensions = [
dimension_x,
dimension_y,
dimension_t,
]

runtime.set_configuration(props)

runtime.start()
runtime.stop()

s3 = s3fs.S3FileSystem(
key=zarr_s3_access_key_id,
secret=zarr_s3_secret_access_key,
client_kwargs={"endpoint_url": zarr_s3_endpoint},
)
store = s3fs.S3Map(
root=f"{zarr_s3_bucket_name}/{request.node.name}.zarr", s3=s3
)
cache = zarr.LRUStoreCache(store, max_size=2**28)
group = zarr.group(store=cache)

data = group["0"]

assert data.chunks == (64, 540, 960)
assert data.shape == (
64,
video.camera.settings.shape[1],
video.camera.settings.shape[0],
)
assert data.nchunks == 4

# cleanup
s3.rm(f"{zarr_s3_bucket_name}/{request.node.name}.zarr", recursive=True)

0 comments on commit da0a68a

Please sign in to comment.