diff --git a/Cargo.toml b/Cargo.toml index 0668519..44c228a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "acquire-imaging" authors = ["Nathan Clack "] -version = "0.2.2" +version = "0.3.0" edition = "2021" [lib] @@ -9,24 +9,24 @@ name = "acquire" crate-type = ["cdylib"] [dependencies] -pyo3 = { version = "0.19", features = [ +pyo3 = { version = "0.20", features = [ "extension-module", "anyhow", "abi3-py38", "serde", ] } -pyo3-log = "0.8" -numpy = "0.19" +pyo3-log = "0.9" +numpy = "0.20" log = "0.4" anyhow = "1.0" parking_lot = "0.12" serde = { version = "1.0", features = ["derive"] } -pythonize = "0.19" +pythonize = "0.20.0" [build-dependencies] -bindgen = "0.66" +bindgen = "0.69.1" cmake = "0.1" -http = "0.2" +http = "1.0" json = "0.12" reqwest = { version = "0.11", features = ["blocking", "json"] } serde = { version = "1.0", features = ["derive"] } diff --git a/acquire-libs/acquire-video-runtime b/acquire-libs/acquire-video-runtime index 83bd825..8ff1105 160000 --- a/acquire-libs/acquire-video-runtime +++ b/acquire-libs/acquire-video-runtime @@ -1 +1 @@ -Subproject commit 83bd825626e6530895598ee1f78c9cb2230d9b40 +Subproject commit 8ff1105935ba1f952bdfd436543e0e1b3ea885f6 diff --git a/build.rs b/build.rs index cbebd98..780fed7 100644 --- a/build.rs +++ b/build.rs @@ -73,7 +73,7 @@ fn main() { let bindings = bindgen::Builder::default() .header("wrapper.h") .clang_arg(format!("-I{}/include", dst.display())) - .parse_callbacks(Box::new(bindgen::CargoCallbacks)) + .parse_callbacks(Box::new(bindgen::CargoCallbacks::new())) .generate() .expect("Unable to generate bindings"); diff --git a/python/acquire/__init__.py b/python/acquire/__init__.py index e76f3e8..adf6627 100644 --- a/python/acquire/__init__.py +++ b/python/acquire/__init__.py @@ -189,17 +189,18 @@ def is_not_done() -> bool: def next_frame() -> Optional[npt.NDArray[Any]]: """Get the next frame from the current stream.""" if nframes[stream_id] < p.video[stream_id].max_frame_count: - if packet := runtime.get_available_data(stream_id): - n = packet.get_frame_count() - nframes[stream_id] += n - logging.info( - f"[stream {stream_id}] frame count: {nframes}" - ) - f = next(packet.frames()) - logging.debug( - f"stream {stream_id} frame {f.metadata().frame_id}" - ) - return f.data().squeeze().copy() + with runtime.get_available_data(stream_id) as packet: + if packet: + n = packet.get_frame_count() + nframes[stream_id] += n + logging.info( + f"[stream {stream_id}] frame count: {nframes}" + ) + f = next(packet.frames()) + logging.debug( + f"stream {stream_id} frame {f.metadata().frame_id}" + ) + return f.data().squeeze().copy() return None while is_not_done(): # runtime.get_state()==DeviceState.Running: diff --git a/python/acquire/acquire.pyi b/python/acquire/acquire.pyi index e8e8ac6..0311752 100644 --- a/python/acquire/acquire.pyi +++ b/python/acquire/acquire.pyi @@ -12,10 +12,18 @@ from typing import ( from numpy.typing import NDArray +@final +class AvailableDataContext: + def __enter__(self) -> Optional[AvailableData]: ... + def __exit__( + self, exc_type: Any, exc_value: Any, traceback: Any + ) -> None: ... + @final class AvailableData: def frames(self) -> Iterator[VideoFrame]: ... def get_frame_count(self) -> int: ... + def invalidate(self) -> None: ... def __iter__(self) -> Iterator[VideoFrame]: ... @final @@ -216,7 +224,7 @@ class Properties: class Runtime: def __init__(self, *args: None, **kwargs: Any) -> None: ... def device_manager(self) -> DeviceManager: ... - def get_available_data(self, stream_id: int) -> AvailableData: ... + def get_available_data(self, stream_id: int) -> AvailableDataContext: ... def get_configuration(self) -> Properties: ... def get_capabilities(self) -> Capabilities: ... def get_state(self) -> DeviceState: ... diff --git a/src/lib.rs b/src/lib.rs index ee08844..ec6f344 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,9 @@ use device_manager::DeviceManager; use pyo3::prelude::*; use std::ffi::CStr; -use crate::runtime::{AvailableData, VideoFrame, VideoFrameMetadata, VideoFrameTimestamps}; +use crate::runtime::{ + AvailableData, AvailableDataContext, VideoFrame, VideoFrameMetadata, VideoFrameTimestamps, +}; trait Status: Copy + Sized { fn is_ok(&self) -> bool; @@ -60,6 +62,7 @@ fn acquire(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/runtime.rs b/src/runtime.rs index 3bf812e..b0a0717 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -78,6 +78,22 @@ impl RawRuntime { unsafe { capi::acquire_abort(self.inner.as_ptr()) }.ok()?; Ok(()) } + + fn map_read(&self, stream_id: u32) -> Result<(*mut capi::VideoFrame, *mut capi::VideoFrame)> { + let mut beg = null_mut(); + let mut end = null_mut(); + unsafe { + capi::acquire_map_read(self.inner.as_ptr(), stream_id, &mut beg, &mut end).ok()?; + } + Ok((beg, end)) + } + + fn unmap_read(&self, stream_id: u32, consumed_bytes: usize) -> Result<()> { + unsafe { + capi::acquire_unmap_read(self.inner.as_ptr(), stream_id, consumed_bytes).ok()?; + } + Ok(()) + } } impl Drop for RawRuntime { @@ -176,34 +192,11 @@ impl Runtime { Python::allow_threads(py, || Ok(self.inner.execute_trigger(stream_id)?)) } - fn get_available_data(&self, stream_id: u32) -> PyResult> { - let mut beg = null_mut(); - let mut end = null_mut(); - unsafe { - capi::acquire_map_read(self.as_ref().as_ptr(), stream_id, &mut beg, &mut end).ok()?; - } - let nbytes = unsafe { byte_offset_from(beg, end) }; - if nbytes > 0 { - log::trace!( - "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", - stream_id, - beg, - end, - nbytes - ) - }; - Ok(if nbytes > 0 { - Some(AvailableData { - inner: Arc::new(RawAvailableData { - runtime: self.inner.clone(), - beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, - end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, - stream_id, - consumed_bytes: None, - }), - }) - } else { - None + fn get_available_data(&self, stream_id: u32) -> PyResult { + Ok(AvailableDataContext { + inner: self.inner.clone(), + stream_id, + available_data: None, }) } } @@ -272,62 +265,116 @@ impl Drop for RawAvailableData { self.end.as_ptr(), consumed_bytes ); - unsafe { - capi::acquire_unmap_read( - self.runtime.inner.as_ptr(), - self.stream_id, - consumed_bytes as _, - ) - .ok() - .expect("Unexpected failure: Was the CoreRuntime NULL?"); - } + self.runtime + .unmap_read(self.stream_id, consumed_bytes) + .expect("Unexpected failure: Was the runtime NULL?"); } } #[pyclass] pub(crate) struct AvailableData { - inner: Arc, + inner: Option>, } #[pymethods] impl AvailableData { fn get_frame_count(&self) -> usize { - self.inner.get_frame_count() + if let Some(inner) = &self.inner { + inner.get_frame_count() + } else { + 0 + } } fn frames(&self) -> VideoFrameIterator { VideoFrameIterator { - store: self.inner.clone(), - cur: Mutex::new(self.inner.beg), - end: self.inner.end, + inner: if let Some(frames) = &self.inner { + Some(VideoFrameIteratorInner { + store: frames.clone(), + cur: Mutex::new(frames.beg), + end: frames.end, + }) + } else { + None + }, } } fn __iter__(slf: PyRef<'_, Self>) -> PyResult> { Py::new(slf.py(), slf.frames()) } + + fn invalidate(&mut self) { + // Will drop the RawAvailableData and cause Available data to act like + // an empty iterator. + self.inner = None; + } } #[pyclass] -struct VideoFrameIterator { - store: Arc, - cur: Mutex>, - end: NonNull, +pub(crate) struct AvailableDataContext { + inner: Arc, + stream_id: u32, + available_data: Option>, } -unsafe impl Send for VideoFrameIterator {} - #[pymethods] -impl VideoFrameIterator { - fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { - slf +impl AvailableDataContext { + fn __enter__(&mut self) -> PyResult>> { + let AvailableDataContext { + inner, + stream_id, + available_data, + } = self; + let stream_id = *stream_id; + let (beg, end) = inner.map_read(stream_id)?; + let nbytes = unsafe { byte_offset_from(beg, end) }; + if nbytes > 0 { + log::trace!( + "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", + stream_id, + beg, + end, + nbytes + ) + }; + if nbytes > 0 { + *available_data = Some(Python::with_gil(|py| { + Py::new( + py, + AvailableData { + inner: Some(Arc::new(RawAvailableData { + runtime: self.inner.clone(), + beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, + end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, + stream_id, + consumed_bytes: None, + })), + }, + ) + })?); + } + return Ok(self.available_data.clone()); } - fn __next__(&mut self) -> Option { - self.next() + + fn __exit__(&mut self, _exc_type: &PyAny, _exc_value: &PyAny, _traceback: &PyAny) { + Python::with_gil(|py| { + if let Some(a) = &self.available_data { + a.as_ref(py).borrow_mut().invalidate() + }; + }); } } -impl Iterator for VideoFrameIterator { +struct VideoFrameIteratorInner { + store: Arc, + cur: Mutex>, + end: NonNull, +} + +unsafe impl Send for VideoFrameIteratorInner {} + +impl Iterator for VideoFrameIteratorInner { type Item = VideoFrame; fn next(&mut self) -> Option { @@ -350,6 +397,29 @@ impl Iterator for VideoFrameIterator { } } +#[pyclass] +struct VideoFrameIterator { + inner: Option, +} + +#[pymethods] +impl VideoFrameIterator { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __next__(&mut self) -> Option { + self.next() + } +} + +impl Iterator for VideoFrameIterator { + type Item = VideoFrame; + + fn next(&mut self) -> Option { + self.inner.as_mut().and_then(|it| it.next()) + } +} + #[pyclass] #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] pub(crate) struct VideoFrameTimestamps { @@ -490,6 +560,5 @@ impl VideoFrame { } } -// TODO: Can't ensure the output array doesn't outlive the available data // TODO: Is everything really Send // TODO: mark iterable and videoframe as things that can't be shared across threads diff --git a/tests/test_basic.py b/tests/test_basic.py index 8ef4b7d..09701d8 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -83,29 +83,37 @@ def test_zero_conf_start(runtime: Runtime): def test_repeat_acq(runtime: Runtime): p = acquire.setup(runtime, "simulated: radial sin", "Trash") - assert p.video[0].camera.identifier is not None - assert p.video[0].storage.identifier is not None + assert ( + p.video[0].camera.identifier is not None + ), "Expected a camera identifier" + assert ( + p.video[0].storage.identifier is not None + ), "Expected a storage identifier" assert p.video[0].storage.settings.filename == "out.tif" p.video[0].camera.settings.shape = (192, 108) p.video[0].max_frame_count = 10 p = runtime.set_configuration(p) runtime.start() while True: - if a := runtime.get_available_data(0): - logging.info(f"Got {a.get_frame_count()}") - a = None - break + with runtime.get_available_data(0) as a: + if a: + logging.info(f"Got {a.get_frame_count()}") + break + if a: + assert a.get_frame_count() == 0 + assert next(a.frames()) is None runtime.stop() - assert runtime.get_available_data(0) is None # TODO: (nclack) assert 1 acquired frame. stop should block runtime.start() while True: - if a := runtime.get_available_data(0): - logging.info(f"Got {a.get_frame_count()}") - a = None - break + with runtime.get_available_data(0) as a: + if a: + logging.info(f"Got {a.get_frame_count()}") + break + if a: + assert a.get_frame_count() == 0 + assert next(a.frames()) is None runtime.stop() - assert runtime.get_available_data(0) is None # TODO: (nclack) assert 1 more acquired frame. stop cancels and waits. @@ -122,10 +130,10 @@ def test_repeat_with_no_stop(runtime: Runtime): runtime.start() # wait for 1 frame while True: - if a := runtime.get_available_data(0): - logging.info(f"Got {a.get_frame_count()}") - a = None - break + with runtime.get_available_data(0) as a: + if a: + logging.info(f"Got {a.get_frame_count()} frame") + break # acq is still on going here with pytest.raises(RuntimeError): logging.info("Next start should fail gracefully") @@ -172,18 +180,18 @@ def took_too_long(): while nframes < p.video[0].max_frame_count and not took_too_long(): clock = time.time() - if a := runtime.get_available_data(0): - packet = a.get_frame_count() - for f in a.frames(): + with runtime.get_available_data(0) as a: + if a: + packet = a.get_frame_count() + for f in a.frames(): + logging.info( + f"{f.data().shape} {f.data()[0][0][0][0]} " + + f"{f.metadata()}" + ) + nframes += packet logging.info( - f"{f.data().shape} {f.data()[0][0][0][0]} {f.metadata()}" + f"frame count: {nframes} - frames in packet: {packet}" ) - del f # <-- fails to get the last frames if this is held? - del a # <-- fails to get the last frames if this is held? - nframes += packet - logging.info( - f"frame count: {nframes} - frames in packet: {packet}" - ) elapsed = time.time() - clock sleep(max(0, 0.1 - elapsed)) @@ -222,9 +230,9 @@ def test_change_filename(runtime: Runtime): nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): - nframes += packet.get_frame_count() - packet = None + with runtime.get_available_data(0) as packet: + if packet: + nframes += packet.get_frame_count() logging.info("Stopping") runtime.stop() @@ -248,9 +256,9 @@ def test_write_external_metadata_to_tiff( nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): - nframes += packet.get_frame_count() - packet = None + with runtime.get_available_data(0) as packet: + if packet: + nframes += packet.get_frame_count() runtime.stop() # Check that the written tif has the expected structure @@ -312,19 +320,21 @@ def is_not_done() -> bool: stream_id = 0 while is_not_done(): if nframes[stream_id] < p.video[stream_id].max_frame_count: - if packet := runtime.get_available_data(stream_id): - n = packet.get_frame_count() - for i, frame in enumerate(packet.frames()): - expected_frame_id = nframes[stream_id] + i - assert frame.metadata().frame_id == expected_frame_id, ( - "frame id's didn't match " - + f"({frame.metadata().frame_id}!={expected_frame_id})" - + f" [stream {stream_id} nframes {nframes}]" - ) - del frame - del packet - nframes[stream_id] += n - logging.debug(f"NFRAMES {nframes}") + with runtime.get_available_data(stream_id) as packet: + if packet: + n = packet.get_frame_count() + for i, frame in enumerate(packet.frames()): + expected_frame_id = nframes[stream_id] + i + assert ( + frame.metadata().frame_id == expected_frame_id + ), ( + "frame id's didn't match " + + f"({frame.metadata().frame_id}" + + f"!={expected_frame_id})" + + f" [stream {stream_id} nframes {nframes}]" + ) + nframes[stream_id] += n + logging.debug(f"NFRAMES {nframes}") stream_id = (stream_id + 1) % 2 logging.info("Stopping") @@ -350,10 +360,12 @@ def test_abort(runtime: Runtime): logging.info("Aborting") runtime.abort() - while packet := runtime.get_available_data(0): - nframes += packet.get_frame_count() - - del packet + while True: + with runtime.get_available_data(0) as packet: + if packet: + nframes += packet.get_frame_count() + else: + break logging.debug( f"Frames expected: {p.video[0].max_frame_count}, actual: {nframes}" @@ -370,8 +382,10 @@ def wait_for_data( sleep_duration = timedelta(microseconds=10000) elapsed = timedelta() while elapsed < timeout: - if packet := runtime.get_available_data(stream_id): - return packet + with runtime.get_available_data(stream_id) as packet: + if packet: + frames = list(packet.frames()) + return (len(frames), frames[0].metadata().frame_id) sleep(sleep_duration.total_seconds()) elapsed += sleep_duration raise RuntimeError( @@ -404,17 +418,15 @@ def test_execute_trigger(runtime: Runtime): runtime.start() # No triggers yet, so no data. - assert runtime.get_available_data(0) is None + with runtime.get_available_data(0) as packet: + assert packet is None # Snap a few individual frames for i in range(p.video[0].max_frame_count): runtime.execute_trigger(0) - packet = wait_for_data(runtime, 0) - frames = tuple(packet.frames()) - assert packet.get_frame_count() == 1 - assert frames[0].metadata().frame_id == i - del frames - del packet + count, frame_id = wait_for_data(runtime, 0) + assert count == 1 + assert frame_id == i runtime.stop() diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 6511805..a17b9b6 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -46,9 +46,9 @@ def test_write_external_metadata_to_zarr( nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): - nframes += packet.get_frame_count() - packet = None + with runtime.get_available_data(0) as packet: + if packet: + nframes += packet.get_frame_count() runtime.stop() assert p.video[0].storage.settings.filename