diff --git a/.github/workflows/test_pr.yml b/.github/workflows/test_pr.yml index 7d14f0a..e34c515 100644 --- a/.github/workflows/test_pr.yml +++ b/.github/workflows/test_pr.yml @@ -103,53 +103,53 @@ jobs: - name: Test run: | python -m pytest -k test_dcam - - egrabber: - name: Python ${{ matrix.python }} (eGrabber) - runs-on: - - self-hosted - - egrabber - - VC-151MX-M6H00 - timeout-minutes: 20 - strategy: - fail-fast: false - matrix: - python: [ "3.8", "3.9", "3.10" ] - - permissions: - actions: write - env: - GH_TOKEN: ${{ github.token }} - steps: - - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@0.11.0 - with: - access_token: ${{ github.token }} - - - uses: actions/checkout@v3 - with: - submodules: true - ref: ${{ github.event.pull_request.head.sha }} - - - name: Get CMake 3.24 - uses: lukka/get-cmake@latest - with: - cmakeVersion: 3.24.3 - - - name: Set up Python ${{ matrix.python }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python }} - - - name: Install - run: | - pip install --upgrade pip - pip install -e .[testing] - - - name: Test - run: | - python -m pytest -k test_egrabber - + + # TODO (aliddell): uncomment when we get an eGrabber runner up again + # egrabber: + # name: Python ${{ matrix.python }} (eGrabber) + # runs-on: + # - self-hosted + # - egrabber + # - VC-151MX-M6H00 + # timeout-minutes: 20 + # strategy: + # fail-fast: false + # matrix: + # python: [ "3.8", "3.9", "3.10" ] + + # permissions: + # actions: write + # env: + # GH_TOKEN: ${{ github.token }} + # steps: + # - name: Cancel Previous Runs + # uses: styfle/cancel-workflow-action@0.11.0 + # with: + # access_token: ${{ github.token }} + + # - uses: actions/checkout@v3 + # with: + # submodules: true + # ref: ${{ github.event.pull_request.head.sha }} + + # - name: Get CMake 3.24 + # uses: lukka/get-cmake@latest + # with: + # cmakeVersion: 3.24.3 + + # - name: Set up Python ${{ matrix.python }} + # uses: actions/setup-python@v4 + # with: + # python-version: ${{ matrix.python }} + + # - name: Install + # run: | + # pip install --upgrade pip + # pip install -e .[testing] + + # - name: Test + # run: | + # python -m pytest -k test_egrabber spinnaker: name: Python ${{ matrix.python }} (Spinnaker) diff --git a/Cargo.toml b/Cargo.toml index 44c228a..3580714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "acquire-imaging" authors = ["Nathan Clack "] -version = "0.3.0" +version = "0.3.0-rc1" edition = "2021" [lib] diff --git a/acquire-libs/acquire-core-libs b/acquire-libs/acquire-core-libs index 60cb52c..f2dd747 160000 --- a/acquire-libs/acquire-core-libs +++ b/acquire-libs/acquire-core-libs @@ -1 +1 @@ -Subproject commit 60cb52cd3e42a557ade1bd3d57f32308392ab8ec +Subproject commit f2dd7475b1e2667e0682baf4034b1a4c07196ac9 diff --git a/acquire-libs/acquire-video-runtime b/acquire-libs/acquire-video-runtime index 8ff1105..ba0fdc4 160000 --- a/acquire-libs/acquire-video-runtime +++ b/acquire-libs/acquire-video-runtime @@ -1 +1 @@ -Subproject commit 8ff1105935ba1f952bdfd436543e0e1b3ea885f6 +Subproject commit ba0fdc4bbb1474ea4cec48016b3440cc46708c4d diff --git a/drivers.json b/drivers.json index 64c033c..49feaae 100644 --- a/drivers.json +++ b/drivers.json @@ -1,6 +1,6 @@ { "acquire-driver-common": "0.1.6", - "acquire-driver-zarr": "0.1.7", + "acquire-driver-zarr": "0.1.8", "acquire-driver-egrabber": "0.1.5", "acquire-driver-hdcam": "0.1.7", "acquire-driver-spinnaker": "0.1.1" diff --git a/python/acquire/__init__.py b/python/acquire/__init__.py index adf6627..9d95d2a 100644 --- a/python/acquire/__init__.py +++ b/python/acquire/__init__.py @@ -190,17 +190,16 @@ def next_frame() -> Optional[npt.NDArray[Any]]: """Get the next frame from the current stream.""" if nframes[stream_id] < p.video[stream_id].max_frame_count: with runtime.get_available_data(stream_id) as packet: - if packet: - n = packet.get_frame_count() - nframes[stream_id] += n - logging.info( - f"[stream {stream_id}] frame count: {nframes}" - ) - f = next(packet.frames()) - logging.debug( - f"stream {stream_id} frame {f.metadata().frame_id}" - ) - return f.data().squeeze().copy() + n = packet.get_frame_count() + nframes[stream_id] += n + logging.info( + f"[stream {stream_id}] frame count: {nframes}" + ) + f = next(packet.frames()) + logging.debug( + f"stream {stream_id} frame {f.metadata().frame_id}" + ) + return f.data().squeeze().copy() return None while is_not_done(): # runtime.get_state()==DeviceState.Running: diff --git a/python/acquire/acquire.pyi b/python/acquire/acquire.pyi index 0311752..74b1b79 100644 --- a/python/acquire/acquire.pyi +++ b/python/acquire/acquire.pyi @@ -14,7 +14,7 @@ from numpy.typing import NDArray @final class AvailableDataContext: - def __enter__(self) -> Optional[AvailableData]: ... + def __enter__(self) -> AvailableData: ... def __exit__( self, exc_type: Any, exc_value: Any, traceback: Any ) -> None: ... @@ -23,7 +23,6 @@ class AvailableDataContext: class AvailableData: def frames(self) -> Iterator[VideoFrame]: ... def get_frame_count(self) -> int: ... - def invalidate(self) -> None: ... def __iter__(self) -> Iterator[VideoFrame]: ... @final diff --git a/src/runtime.rs b/src/runtime.rs index b0a0717..664fe4d 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -5,6 +5,7 @@ use numpy::{ Ix4, ToPyArray, }; use parking_lot::Mutex; +use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use serde::{Deserialize, Serialize}; use std::{ @@ -196,7 +197,14 @@ impl Runtime { Ok(AvailableDataContext { inner: self.inner.clone(), stream_id, - available_data: None, + available_data: Python::with_gil(|py| { + Py::new( + py, + AvailableData { + inner: Arc::new(Mutex::new(None)), + }, + ) + })?, }) } } @@ -273,13 +281,13 @@ impl Drop for RawAvailableData { #[pyclass] pub(crate) struct AvailableData { - inner: Option>, + inner: Arc>>, } #[pymethods] impl AvailableData { fn get_frame_count(&self) -> usize { - if let Some(inner) = &self.inner { + if let Some(inner) = &*self.inner.lock() { inner.get_frame_count() } else { 0 @@ -288,9 +296,9 @@ impl AvailableData { fn frames(&self) -> VideoFrameIterator { VideoFrameIterator { - inner: if let Some(frames) = &self.inner { + inner: if let Some(frames) = &*self.inner.lock() { Some(VideoFrameIteratorInner { - store: frames.clone(), + store: self.inner.clone(), cur: Mutex::new(frames.beg), end: frames.end, }) @@ -303,11 +311,13 @@ impl AvailableData { fn __iter__(slf: PyRef<'_, Self>) -> PyResult> { Py::new(slf.py(), slf.frames()) } +} +impl AvailableData { fn invalidate(&mut self) { // Will drop the RawAvailableData and cause Available data to act like // an empty iterator. - self.inner = None; + *self.inner.lock() = None; } } @@ -315,12 +325,12 @@ impl AvailableData { pub(crate) struct AvailableDataContext { inner: Arc, stream_id: u32, - available_data: Option>, + available_data: Py, } #[pymethods] impl AvailableDataContext { - fn __enter__(&mut self) -> PyResult>> { + fn __enter__(&mut self) -> PyResult> { let AvailableDataContext { inner, stream_id, @@ -329,45 +339,40 @@ impl AvailableDataContext { let stream_id = *stream_id; let (beg, end) = inner.map_read(stream_id)?; let nbytes = unsafe { byte_offset_from(beg, end) }; - if nbytes > 0 { - log::trace!( - "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", - stream_id, - beg, - end, - nbytes + + log::trace!( + "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", + stream_id, + beg, + end, + nbytes + ); + *available_data = Python::with_gil(|py| { + Py::new( + py, + AvailableData { + inner: Arc::new(Mutex::new(Some(RawAvailableData { + runtime: self.inner.clone(), + beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, + end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, + stream_id, + consumed_bytes: None, + }))), + }, ) - }; - if nbytes > 0 { - *available_data = Some(Python::with_gil(|py| { - Py::new( - py, - AvailableData { - inner: Some(Arc::new(RawAvailableData { - runtime: self.inner.clone(), - beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, - end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, - stream_id, - consumed_bytes: None, - })), - }, - ) - })?); - } + })?; return Ok(self.available_data.clone()); } fn __exit__(&mut self, _exc_type: &PyAny, _exc_value: &PyAny, _traceback: &PyAny) { Python::with_gil(|py| { - if let Some(a) = &self.available_data { - a.as_ref(py).borrow_mut().invalidate() - }; + (&self.available_data).as_ref(py).borrow_mut().invalidate(); }); } } struct VideoFrameIteratorInner { - store: Arc, + store: Arc>>, cur: Mutex>, end: NonNull, } @@ -379,7 +384,7 @@ impl Iterator for VideoFrameIteratorInner { fn next(&mut self) -> Option { let mut cur = self.cur.lock(); - if *cur < self.end { + if (*self.store.lock()).is_some() && *cur < self.end { let out = VideoFrame { _store: self.store.clone(), cur: *cur, @@ -503,7 +508,7 @@ impl IntoDimension for capi::ImageShape { #[pyclass] pub(crate) struct VideoFrame { - _store: Arc, + _store: Arc>>, cur: NonNull, } @@ -512,6 +517,11 @@ unsafe impl Send for VideoFrame {} #[pymethods] impl VideoFrame { fn metadata(slf: PyRef<'_, Self>) -> PyResult { + if (*slf._store.lock()).is_none() { + return Err(PyRuntimeError::new_err( + "VideoFrame is not valid outside of context", + )); + } let cur = slf.cur.as_ptr(); let meta = unsafe { VideoFrameMetadata { @@ -522,7 +532,12 @@ impl VideoFrame { Ok(meta) } - fn data<'py>(&self, py: Python<'py>) -> Py { + fn data<'py>(&self, py: Python<'py>) -> PyResult> { + if (*self._store.lock()).is_none() { + return Err(PyRuntimeError::new_err( + "VideoFrame is not valid outside of context", + )); + } let cur = self.cur.as_ptr(); macro_rules! gen_match { @@ -556,7 +571,7 @@ impl VideoFrame { } .unwrap(); - array.to_pyobject(py) + Ok(array.to_pyobject(py)) } } diff --git a/tests/test_basic.py b/tests/test_basic.py index 09701d8..7e08271 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -96,9 +96,8 @@ def test_repeat_acq(runtime: Runtime): runtime.start() while True: with runtime.get_available_data(0) as a: - if a: - logging.info(f"Got {a.get_frame_count()}") - break + logging.info(f"Got {a.get_frame_count()}") + break if a: assert a.get_frame_count() == 0 assert next(a.frames()) is None @@ -107,12 +106,11 @@ def test_repeat_acq(runtime: Runtime): runtime.start() while True: with runtime.get_available_data(0) as a: - if a: - logging.info(f"Got {a.get_frame_count()}") - break - if a: - assert a.get_frame_count() == 0 - assert next(a.frames()) is None + logging.info(f"Got {a.get_frame_count()}") + break + + assert a.get_frame_count() == 0 + assert next(a.frames()) is None runtime.stop() # TODO: (nclack) assert 1 more acquired frame. stop cancels and waits. @@ -131,7 +129,7 @@ def test_repeat_with_no_stop(runtime: Runtime): # wait for 1 frame while True: with runtime.get_available_data(0) as a: - if a: + if a.get_frame_count() > 0: logging.info(f"Got {a.get_frame_count()} frame") break # acq is still on going here @@ -181,17 +179,16 @@ def took_too_long(): while nframes < p.video[0].max_frame_count and not took_too_long(): clock = time.time() with runtime.get_available_data(0) as a: - if a: - packet = a.get_frame_count() - for f in a.frames(): - logging.info( - f"{f.data().shape} {f.data()[0][0][0][0]} " - + f"{f.metadata()}" - ) - nframes += packet + packet = a.get_frame_count() + for f in a.frames(): logging.info( - f"frame count: {nframes} - frames in packet: {packet}" + f"{f.data().shape} {f.data()[0][0][0][0]} " + + f"{f.metadata()}" ) + nframes += packet + logging.info( + f"frame count: {nframes} - frames in packet: {packet}" + ) elapsed = time.time() - clock sleep(max(0, 0.1 - elapsed)) @@ -231,8 +228,7 @@ def test_change_filename(runtime: Runtime): runtime.start() while nframes < p.video[0].max_frame_count: with runtime.get_available_data(0) as packet: - if packet: - nframes += packet.get_frame_count() + nframes += packet.get_frame_count() logging.info("Stopping") runtime.stop() @@ -257,8 +253,7 @@ def test_write_external_metadata_to_tiff( runtime.start() while nframes < p.video[0].max_frame_count: with runtime.get_available_data(0) as packet: - if packet: - nframes += packet.get_frame_count() + nframes += packet.get_frame_count() runtime.stop() # Check that the written tif has the expected structure @@ -321,20 +316,17 @@ def is_not_done() -> bool: while is_not_done(): if nframes[stream_id] < p.video[stream_id].max_frame_count: with runtime.get_available_data(stream_id) as packet: - if packet: - n = packet.get_frame_count() - for i, frame in enumerate(packet.frames()): - expected_frame_id = nframes[stream_id] + i - assert ( - frame.metadata().frame_id == expected_frame_id - ), ( - "frame id's didn't match " - + f"({frame.metadata().frame_id}" - + f"!={expected_frame_id})" - + f" [stream {stream_id} nframes {nframes}]" - ) - nframes[stream_id] += n - logging.debug(f"NFRAMES {nframes}") + n = packet.get_frame_count() + for i, frame in enumerate(packet.frames()): + expected_frame_id = nframes[stream_id] + i + assert frame.metadata().frame_id == expected_frame_id, ( + "frame id's didn't match " + + f"({frame.metadata().frame_id}" + + f"!={expected_frame_id})" + + f" [stream {stream_id} nframes {nframes}]" + ) + nframes[stream_id] += n + logging.debug(f"NFRAMES {nframes}") stream_id = (stream_id + 1) % 2 logging.info("Stopping") @@ -362,9 +354,9 @@ def test_abort(runtime: Runtime): while True: with runtime.get_available_data(0) as packet: - if packet: - nframes += packet.get_frame_count() - else: + frame_count = packet.get_frame_count() + nframes += frame_count + if frame_count == 0: break logging.debug( @@ -383,7 +375,7 @@ def wait_for_data( elapsed = timedelta() while elapsed < timeout: with runtime.get_available_data(stream_id) as packet: - if packet: + if packet.get_frame_count() > 0: frames = list(packet.frames()) return (len(frames), frames[0].metadata().frame_id) sleep(sleep_duration.total_seconds()) @@ -419,7 +411,7 @@ def test_execute_trigger(runtime: Runtime): # No triggers yet, so no data. with runtime.get_available_data(0) as packet: - assert packet is None + assert packet.get_frame_count() == 0 # Snap a few individual frames for i in range(p.video[0].max_frame_count): @@ -601,6 +593,29 @@ def test_storage_capabilities( assert storage.multiscale.is_supported == multiscale +def test_invalidated_frame(runtime: Runtime): + dm = runtime.device_manager() + p = runtime.get_configuration() + p.video[0].camera.identifier = dm.select(DeviceKind.Camera, ".*empty") + p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "trash") + p.video[0].max_frame_count = 1 + runtime.set_configuration(p) + + frame = None + runtime.start() + while frame is None: + with runtime.get_available_data(0) as packet: + if packet.get_frame_count() > 0: + frame = next(packet.frames()) + frame.data() + with pytest.raises(RuntimeError): + frame.metadata() + with pytest.raises(RuntimeError): + frame.data() + + runtime.stop() + + # FIXME: (nclack) awkwardness around references (available frames, f) # NOTES: diff --git a/tests/test_zarr.py b/tests/test_zarr.py index a17b9b6..b14638b 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -47,8 +47,7 @@ def test_write_external_metadata_to_zarr( runtime.start() while nframes < p.video[0].max_frame_count: with runtime.get_available_data(0) as packet: - if packet: - nframes += packet.get_frame_count() + nframes += packet.get_frame_count() runtime.stop() assert p.video[0].storage.settings.filename