From 8c9ddf78a53ea3ed50e2383b16b817ae5fdc18b9 Mon Sep 17 00:00:00 2001 From: Andy Sweet Date: Mon, 20 Nov 2023 08:49:50 -0800 Subject: [PATCH] Expose software trigger to snap frames (#98) This functionality is already available in the C API. This PR simply exposes it in the Python API and adds a basic test with a simulated camera to exercise it. --- python/acquire/acquire.pyi | 1 + src/runtime.rs | 9 +++++++ tests/test_basic.py | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/python/acquire/acquire.pyi b/python/acquire/acquire.pyi index 2caf221..c21589d 100644 --- a/python/acquire/acquire.pyi +++ b/python/acquire/acquire.pyi @@ -149,6 +149,7 @@ class Runtime: def get_state(self) -> DeviceState: ... def set_configuration(self, properties: Properties) -> Properties: ... def start(self) -> None: ... + def execute_trigger(self, stream_id: int) -> None: ... def stop(self) -> None: ... def abort(self) -> None: ... diff --git a/src/runtime.rs b/src/runtime.rs index 8a0c588..e33a77c 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -63,6 +63,11 @@ impl RawRuntime { Ok(()) } + fn execute_trigger(&self, stream_id: u32) -> Result<()> { + unsafe { capi::acquire_execute_trigger(self.inner.as_ptr(), stream_id) }.ok()?; + Ok(()) + } + fn stop(&self) -> Result<()> { unsafe { capi::acquire_stop(self.inner.as_ptr()) }.ok()?; Ok(()) @@ -157,6 +162,10 @@ impl Runtime { .try_into()?) } + fn execute_trigger(&self, stream_id: u32, py: Python<'_>) -> PyResult<()> { + Python::allow_threads(py, || Ok(self.inner.execute_trigger(stream_id)?)) + } + fn get_available_data(&self, stream_id: u32) -> PyResult> { let mut beg = null_mut(); let mut end = null_mut(); diff --git a/tests/test_basic.py b/tests/test_basic.py index 91f8212..f2fd623 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -1,6 +1,7 @@ import json import logging import time +from datetime import timedelta from time import sleep from typing import Any, Dict, List, Optional @@ -621,6 +622,55 @@ def test_abort(runtime: Runtime): assert nframes < p.video[0].max_frame_count +def wait_for_data(runtime: Runtime, stream_id: int = 0, timeout: Optional[timedelta] = None)-> acquire.AvailableData: + # None is used as a missing sentinel value, not to indicate no timeout. + if timeout is None: + timeout = timedelta(seconds=5) + sleep_duration = timedelta(microseconds=10000) + elapsed = timedelta() + while elapsed < timeout: + if packet := runtime.get_available_data(stream_id): + return packet + sleep(sleep_duration.total_seconds()) + elapsed += sleep_duration + raise RuntimeError(f"Timed out waiting for condition after {elapsed.total_seconds()} seconds.") + + +def test_execute_trigger(runtime: Runtime): + dm = runtime.device_manager() + p = runtime.get_configuration() + + p.video[0].camera.identifier = dm.select(DeviceKind.Camera, "simulated.*empty.*") + p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Trash") + p.video[0].camera.settings.binning = 1 + p.video[0].camera.settings.shape = (64, 48) + p.video[0].camera.settings.exposure_time_us = 1e3 + p.video[0].camera.settings.pixel_type = acquire.SampleType.U8 + p.video[0].camera.settings.input_triggers.frame_start = Trigger(enable=True, line=0, edge='Rising') + p.video[0].max_frame_count = 10 + + p = runtime.set_configuration(p) + + assert p.video[0].camera.settings.input_triggers.frame_start.enable + + runtime.start() + + # No triggers yet, so no data. + assert runtime.get_available_data(0) is None + + # Snap a few individual frames + for i in range(p.video[0].max_frame_count): + runtime.execute_trigger(0) + packet = wait_for_data(runtime, 0) + frames = tuple(packet.frames()) + assert packet.get_frame_count() == 1 + assert frames[0].metadata().frame_id == i + del frames + del packet + + runtime.stop() + + # FIXME: (nclack) awkwardness around references (available frames, f) # NOTES: