From 1b16e15908323f2c3ff8c5c844b1f501ce7b828b Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 16 Aug 2023 12:42:28 -0400 Subject: [PATCH 1/5] wip --- src/runtime.rs | 29 +++++++++++++++++++++++++++++ tests/test_basic.py | 43 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 766bdbf..670b3cf 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -287,6 +287,35 @@ impl AvailableData { fn __iter__(slf: PyRef<'_, Self>) -> PyResult> { Py::new(slf.py(), slf.frames()) } + + fn __enter__(&self) -> PyResult { + Ok(AvailableData { + inner: self.inner.clone(), + }) + } + + fn __exit__(&self, _exc_type: Option<&PyAny>, _exc_value: Option<&PyAny>, _traceback: Option<&PyAny>) -> bool { + let consumed_bytes = self.inner.consumed_bytes.unwrap_or(unsafe { + byte_offset_from(self.inner.beg.as_ptr(), self.inner.end.as_ptr()) + } as usize); + log::debug!( + "[stream {}] DROP read region: {:p}-{:p}:{}", + self.inner.stream_id, + self.inner.beg.as_ptr(), + self.inner.end.as_ptr(), + consumed_bytes + ); + unsafe { + capi::acquire_unmap_read( + self.inner.runtime.inner.as_ptr(), + self.inner.stream_id, + consumed_bytes as _, + ) + .ok() + .expect("Unexpected failure: Was the CoreRuntime NULL?"); + } + true + } } #[pyclass] diff --git a/tests/test_basic.py b/tests/test_basic.py index c76dccb..9fab03c 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -16,6 +16,8 @@ from skimage.transform import downscale_local_mean import numpy as np +logging.getLogger().setLevel(logging.DEBUG) + @pytest.fixture(scope="module") def runtime(): @@ -87,19 +89,52 @@ def test_zero_conf_start(runtime: Runtime): runtime.start() +def test_get_available_data(runtime: Runtime): + p = acquire.setup(runtime, "simulated.*empty.*", "Trash") + assert p.video[0].camera.identifier is not None + assert p.video[0].storage.identifier is not None + assert p.video[0].storage.settings.filename == "out.tif" + p.video[0].camera.settings.shape = (192, 108) + p.video[0].camera.settings.exposure_time_us = 1e4 + p.video[0].max_frame_count = 10 + p = runtime.set_configuration(p) + + runtime.start() + + while True: + if a := runtime.get_available_data(0): + with a: + frame_count = a.get_frame_count() + assert frame_count > 0 + logging.info(f"Got {frame_count}") + + i = 0 + for _ in a.frames(): + i += 1 + + assert i == frame_count + break + + runtime.stop() + assert runtime.get_available_data(0) is None + + assert a.get_frame_count() == 0 + + def test_repeat_acq(runtime: Runtime): p = acquire.setup(runtime, "simulated: radial sin", "Trash") assert p.video[0].camera.identifier is not None assert p.video[0].storage.identifier is not None assert p.video[0].storage.settings.filename == "out.tif" p.video[0].camera.settings.shape = (192, 108) + p.video[0].camera.settings.exposure_time_us = 1e4 p.video[0].max_frame_count = 10 p = runtime.set_configuration(p) runtime.start() while True: if a := runtime.get_available_data(0): - logging.info(f"Got {a.get_frame_count()}") - a = None + with a: + logging.info(f"Got {a.get_frame_count()}") break runtime.stop() assert runtime.get_available_data(0) is None @@ -107,8 +142,8 @@ def test_repeat_acq(runtime: Runtime): runtime.start() while True: if a := runtime.get_available_data(0): - logging.info(f"Got {a.get_frame_count()}") - a = None + with a: + logging.info(f"Got {a.get_frame_count()}") break runtime.stop() assert runtime.get_available_data(0) is None From 16b6c803b0ba66aaf8f6defc7c4e9ecf3bc4f8b3 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 16 Aug 2023 13:12:52 -0400 Subject: [PATCH 2/5] Drop inner RawAvailableData on AvailableData.__exit__ --- src/runtime.rs | 30 ++++++++++-------------------- tests/test_basic.py | 8 +++++--- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 670b3cf..61865a1 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -294,26 +294,16 @@ impl AvailableData { }) } - fn __exit__(&self, _exc_type: Option<&PyAny>, _exc_value: Option<&PyAny>, _traceback: Option<&PyAny>) -> bool { - let consumed_bytes = self.inner.consumed_bytes.unwrap_or(unsafe { - byte_offset_from(self.inner.beg.as_ptr(), self.inner.end.as_ptr()) - } as usize); - log::debug!( - "[stream {}] DROP read region: {:p}-{:p}:{}", - self.inner.stream_id, - self.inner.beg.as_ptr(), - self.inner.end.as_ptr(), - consumed_bytes - ); - unsafe { - capi::acquire_unmap_read( - self.inner.runtime.inner.as_ptr(), - self.inner.stream_id, - consumed_bytes as _, - ) - .ok() - .expect("Unexpected failure: Was the CoreRuntime NULL?"); - } + fn __exit__(&mut self, _exc_type: Option<&PyAny>, _exc_value: Option<&PyAny>, _traceback: Option<&PyAny>) -> bool { + // Create a new RawAvailableData, drops the old one + self.inner = Arc::new(RawAvailableData { + runtime: self.inner.runtime.clone(), + beg: self.inner.end, + end: self.inner.end, + stream_id: self.inner.stream_id, + consumed_bytes: None, + }); + true } } diff --git a/tests/test_basic.py b/tests/test_basic.py index 9fab03c..f6dfda8 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -16,8 +16,6 @@ from skimage.transform import downscale_local_mean import numpy as np -logging.getLogger().setLevel(logging.DEBUG) - @pytest.fixture(scope="module") def runtime(): @@ -89,7 +87,7 @@ def test_zero_conf_start(runtime: Runtime): runtime.start() -def test_get_available_data(runtime: Runtime): +def test_available_data_context_manager(runtime: Runtime): p = acquire.setup(runtime, "simulated.*empty.*", "Trash") assert p.video[0].camera.identifier is not None assert p.video[0].storage.identifier is not None @@ -119,6 +117,10 @@ def test_get_available_data(runtime: Runtime): assert runtime.get_available_data(0) is None assert a.get_frame_count() == 0 + i = 0 + for _ in a.frames(): + i += 1 + assert i == 0 def test_repeat_acq(runtime: Runtime): From 55bbc43bfcdb2e0fd83a61045bde180138539a60 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 16 Aug 2023 13:50:24 -0400 Subject: [PATCH 3/5] Update AvailableData stub. --- python/acquire/acquire.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/acquire/acquire.pyi b/python/acquire/acquire.pyi index 07c5e98..b96af0f 100644 --- a/python/acquire/acquire.pyi +++ b/python/acquire/acquire.pyi @@ -17,6 +17,8 @@ class AvailableData: def frames(self) -> Iterator[VideoFrame]: ... def get_frame_count(self) -> int: ... def __iter__(self) -> Iterator[VideoFrame]: ... + def __enter__(self) -> "AvailableData": ... + def __exit__(self, type = Ellipsis, value = Ellipsis, traceback = Ellipsis) -> bool: ... @final class Camera: From b913c8d15c3e192cafc45048f90918ecd50ab625 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Tue, 22 Aug 2023 14:33:41 -0400 Subject: [PATCH 4/5] Make AvailableData::inner Optional. Get video slice only on __enter__. Move RawAvailableData drop behavior into AvailableData.__exit__. --- src/runtime.rs | 163 ++++++++++++++++++++++++++------------------ tests/test_basic.py | 87 ++++++++++++++--------- 2 files changed, 151 insertions(+), 99 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 61865a1..4815abf 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -157,34 +157,11 @@ impl Runtime { .try_into()?) } - fn get_available_data(&self, stream_id: u32) -> PyResult> { - let mut beg = null_mut(); - let mut end = null_mut(); - unsafe { - capi::acquire_map_read(self.as_ref().as_ptr(), stream_id, &mut beg, &mut end).ok()?; - } - let nbytes = unsafe { byte_offset_from(beg, end) }; - if nbytes > 0 { - log::trace!( - "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", - stream_id, - beg, - end, - nbytes - ) - }; - Ok(if nbytes > 0 { - Some(AvailableData { - inner: Arc::new(RawAvailableData { - runtime: self.inner.clone(), - beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, - end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, - stream_id, - consumed_bytes: None, - }), - }) - } else { - None + fn get_available_data(&self, stream_id: u32) -> PyResult { + Ok(AvailableData { + runtime: self.inner.clone(), + inner: None, + stream_id, }) } } @@ -241,46 +218,56 @@ impl RawAvailableData { } } -impl Drop for RawAvailableData { - fn drop(&mut self) { - let consumed_bytes = self - .consumed_bytes - .unwrap_or(unsafe { byte_offset_from(self.beg.as_ptr(), self.end.as_ptr()) } as usize); - log::debug!( - "[stream {}] DROP read region: {:p}-{:p}:{}", - self.stream_id, - self.beg.as_ptr(), - self.end.as_ptr(), - consumed_bytes - ); - unsafe { - capi::acquire_unmap_read( - self.runtime.inner.as_ptr(), - self.stream_id, - consumed_bytes as _, - ) - .ok() - .expect("Unexpected failure: Was the CoreRuntime NULL?"); - } - } -} - #[pyclass] pub(crate) struct AvailableData { - inner: Arc, + runtime: Arc, + inner: Option>, + stream_id: u32, } #[pymethods] impl AvailableData { fn get_frame_count(&self) -> usize { - self.inner.get_frame_count() + if let Some(inner) = &self.inner { + inner.get_frame_count() + } else { + 0 + } } fn frames(&self) -> VideoFrameIterator { + let (store, cur, end) = if let Some(inner) = &self.inner { + (inner.clone(), Mutex::new(inner.beg), inner.end) + } else { + let mut v = capi::VideoFrame { + frame_id: 0, + hardware_frame_id: 0, + shape: Default::default(), + data: Default::default(), + bytes_of_frame: 0, + timestamps: capi::VideoFrame_video_frame_timestamps_s { + hardware: 0, + acq_thread: 0, + }, + }; + let vp: *mut capi::VideoFrame = &mut v; + + let inner = Arc::new(RawAvailableData { + runtime: self.runtime.clone(), + beg: NonNull::new(vp).unwrap(), + end: NonNull::new(vp).unwrap(), + stream_id: self.stream_id, + consumed_bytes: None, + }); + + let beg = NonNull::new(vp as _).unwrap(); + (inner, Mutex::new(beg), beg) + }; + VideoFrameIterator { - store: self.inner.clone(), - cur: Mutex::new(self.inner.beg), - end: self.inner.end, + store, + cur, + end, } } @@ -288,22 +275,66 @@ impl AvailableData { Py::new(slf.py(), slf.frames()) } - fn __enter__(&self) -> PyResult { + fn __enter__(&mut self) -> PyResult { + let mut beg = null_mut(); + let mut end = null_mut(); + unsafe { + capi::acquire_map_read(self.runtime.inner.as_ptr(), self.stream_id, &mut beg, &mut end).ok()?; + } + let nbytes = unsafe { byte_offset_from(beg, end) }; + if nbytes > 0 { + log::trace!( + "[stream {}] ACQUIRED {:p}-{:p}:{} bytes", + self.stream_id, + beg, + end, + nbytes + ) + }; + self.inner = if nbytes > 0 { + Some(Arc::new(RawAvailableData { + runtime: self.runtime.clone(), + beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, + end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, + stream_id: self.stream_id, + consumed_bytes: None, + })) + } else { + None + }; + Ok(AvailableData { + runtime: self.runtime.clone(), inner: self.inner.clone(), + stream_id: self.stream_id, }) } fn __exit__(&mut self, _exc_type: Option<&PyAny>, _exc_value: Option<&PyAny>, _traceback: Option<&PyAny>) -> bool { - // Create a new RawAvailableData, drops the old one - self.inner = Arc::new(RawAvailableData { - runtime: self.inner.runtime.clone(), - beg: self.inner.end, - end: self.inner.end, - stream_id: self.inner.stream_id, - consumed_bytes: None, - }); + // Drop the inner RawAvailableData + if let Some(inner) = &self.inner { + let consumed_bytes = inner + .consumed_bytes + .unwrap_or(unsafe { byte_offset_from(inner.beg.as_ptr(), inner.end.as_ptr()) } as usize); + log::debug!( + "[stream {}] DROP read region: {:p}-{:p}:{}", + self.stream_id, + inner.beg.as_ptr(), + inner.end.as_ptr(), + consumed_bytes + ); + unsafe { + capi::acquire_unmap_read( + self.runtime.inner.as_ptr(), + self.stream_id, + consumed_bytes as _, + ) + .ok() + .expect("Unexpected failure: Was the CoreRuntime NULL?"); + }; + } + self.inner = None; true } } diff --git a/tests/test_basic.py b/tests/test_basic.py index f6dfda8..108df3f 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -99,29 +99,52 @@ def test_available_data_context_manager(runtime: Runtime): runtime.start() - while True: - if a := runtime.get_available_data(0): - with a: - frame_count = a.get_frame_count() - assert frame_count > 0 - logging.info(f"Got {frame_count}") + # get_available_data() only maps data if it is called in a context manager + a = runtime.get_available_data(0) + assert a.get_frame_count() == 0 - i = 0 - for _ in a.frames(): - i += 1 + # a is an empty iterator + i = 0 + for _ in a: + i += 1 + assert i == 0 - assert i == frame_count - break + with runtime.get_available_data(0) as b: + frame_count = b.get_frame_count() + assert frame_count > 0 + logging.info(f"Got {frame_count}") - runtime.stop() - assert runtime.get_available_data(0) is None + i = 0 + for _ in b.frames(): + i += 1 - assert a.get_frame_count() == 0 + assert i == frame_count + + # no data outside the context manager + assert b.get_frame_count() == 0 + + # b is an empty iterator i = 0 - for _ in a.frames(): + for _ in b: i += 1 assert i == 0 + # reentering the context manager fetches more data + with b: + frame_count = b.get_frame_count() + assert frame_count > 0 + logging.info(f"Got {frame_count}") + + i = 0 + for _ in b.frames(): + i += 1 + + assert i == frame_count + + runtime.stop() + + assert b.get_frame_count() == 0 + def test_repeat_acq(runtime: Runtime): p = acquire.setup(runtime, "simulated: radial sin", "Trash") @@ -134,21 +157,17 @@ def test_repeat_acq(runtime: Runtime): p = runtime.set_configuration(p) runtime.start() while True: - if a := runtime.get_available_data(0): - with a: - logging.info(f"Got {a.get_frame_count()}") + with runtime.get_available_data(0) as a: + logging.info(f"Got {a.get_frame_count()}") break runtime.stop() - assert runtime.get_available_data(0) is None # TODO: (nclack) assert 1 acquired frame. stop should block runtime.start() while True: - if a := runtime.get_available_data(0): - with a: - logging.info(f"Got {a.get_frame_count()}") - break + with runtime.get_available_data(0) as a: + logging.info(f"Got {a.get_frame_count()}") + break runtime.stop() - assert runtime.get_available_data(0) is None # TODO: (nclack) assert 1 more acquired frame. stop cancels and waits. @@ -165,7 +184,7 @@ def test_repeat_with_no_stop(runtime: Runtime): runtime.start() # wait for 1 frame while True: - if a := runtime.get_available_data(0): + with runtime.get_available_data(0) as a: logging.info(f"Got {a.get_frame_count()}") a = None break @@ -215,14 +234,13 @@ def took_too_long(): while nframes < p.video[0].max_frame_count and not took_too_long(): clock = time.time() - if a := runtime.get_available_data(0): + with runtime.get_available_data(0) as a: packet = a.get_frame_count() for f in a.frames(): logging.info( f"{f.data().shape} {f.data()[0][0][0][0]} {f.metadata()}" ) del f # <-- fails to get the last frames if this is held? - del a # <-- fails to get the last frames if this is held? nframes += packet logging.info( f"frame count: {nframes} - frames in packet: {packet}" @@ -265,7 +283,7 @@ def test_change_filename(runtime: Runtime): nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): + with runtime.get_available_data(0) as packet: nframes += packet.get_frame_count() packet = None logging.info("Stopping") @@ -291,7 +309,7 @@ def test_write_external_metadata_to_tiff( nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): + with runtime.get_available_data(0) as packet: nframes += packet.get_frame_count() packet = None runtime.stop() @@ -337,7 +355,7 @@ def test_write_external_metadata_to_zarr( nframes = 0 runtime.start() while nframes < p.video[0].max_frame_count: - if packet := runtime.get_available_data(0): + with runtime.get_available_data(0) as packet: nframes += packet.get_frame_count() packet = None runtime.stop() @@ -641,10 +659,13 @@ def test_abort(runtime: Runtime): logging.info("Aborting") runtime.abort() - while packet := runtime.get_available_data(0): - nframes += packet.get_frame_count() + while True: + with runtime.get_available_data(0) as packet: + if 0 == packet.get_frame_count(): + break + nframes += packet.get_frame_count() - del packet + # del packet logging.debug( f"Frames expected: {p.video[0].max_frame_count}, actual: {nframes}" From 77a5329780b3b9312e1330e062f1712d796f4519 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 23 Aug 2023 11:24:56 -0400 Subject: [PATCH 5/5] wip --- .github/workflows/build.yml | 63 +++++++++++++++++++------------------ src/runtime.rs | 5 +-- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b949658..69e94ce 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,39 +1,42 @@ name: Build on: - push: - branches: - - main + push: + branches: + - main + pull_request: # TODO (aliddell): remove this + branches: + - main jobs: - build: - strategy: - matrix: - platform: [windows-latest, macos-latest, ubuntu-latest] + build: + strategy: + matrix: + platform: [ windows-latest, macos-latest, ubuntu-latest ] - runs-on: ${{matrix.platform}} - steps: - - uses: actions/checkout@v3 - with: - submodules: recursive + runs-on: ${{matrix.platform}} + steps: + - uses: actions/checkout@v3 + with: + submodules: recursive - - name: Build (macOS) - if: matrix.platform == 'macos-latest' - uses: messense/maturin-action@v1 - with: - command: build - target: universal2-apple-darwin - args: --release -o dist + - name: Build (macOS) + if: matrix.platform == 'macos-latest' + uses: messense/maturin-action@v1 + with: + command: build + target: universal2-apple-darwin + args: --release -o dist - - name: Build - if: matrix.platform != 'macos-latest' - uses: messense/maturin-action@v1 - with: - command: build - args: --release -o dist + - name: Build + if: matrix.platform != 'macos-latest' + uses: messense/maturin-action@v1 + with: + command: build + args: --release -o dist - - name: Upload wheels - uses: actions/upload-artifact@v3 - with: - name: ${{matrix.platform}} wheels - path: dist + - name: Upload wheels + uses: actions/upload-artifact@v3 + with: + name: ${{matrix.platform}} wheels + path: dist diff --git a/src/runtime.rs b/src/runtime.rs index 4815abf..eb29ad1 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -168,8 +168,6 @@ impl Runtime { /// References to a region of raw data being read from a video stream. struct RawAvailableData { - /// Reference to the context that owns the region - runtime: Arc, /// Pointer to the reserved region beg: NonNull, end: NonNull, @@ -220,6 +218,7 @@ impl RawAvailableData { #[pyclass] pub(crate) struct AvailableData { + /// Reference to the context that owns the region runtime: Arc, inner: Option>, stream_id: u32, @@ -253,7 +252,6 @@ impl AvailableData { let vp: *mut capi::VideoFrame = &mut v; let inner = Arc::new(RawAvailableData { - runtime: self.runtime.clone(), beg: NonNull::new(vp).unwrap(), end: NonNull::new(vp).unwrap(), stream_id: self.stream_id, @@ -293,7 +291,6 @@ impl AvailableData { }; self.inner = if nbytes > 0 { Some(Arc::new(RawAvailableData { - runtime: self.runtime.clone(), beg: NonNull::new(beg).ok_or(anyhow!("Expected non-null buffer"))?, end: NonNull::new(end).ok_or(anyhow!("Expected non-null buffer"))?, stream_id: self.stream_id,