Skip to content

Commit

Permalink
Expose software trigger to snap frames (#98)
Browse files Browse the repository at this point in the history
This functionality is already available in the C API. This PR simply
exposes it in the Python API and adds a basic test with a simulated
camera to exercise it.
  • Loading branch information
andy-sweet authored Nov 20, 2023
1 parent 920a34b commit 8c9ddf7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
1 change: 1 addition & 0 deletions python/acquire/acquire.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class Runtime:
def get_state(self) -> DeviceState: ...
def set_configuration(self, properties: Properties) -> Properties: ...
def start(self) -> None: ...
def execute_trigger(self, stream_id: int) -> None: ...
def stop(self) -> None: ...
def abort(self) -> None: ...

Expand Down
9 changes: 9 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ impl RawRuntime {
Ok(())
}

fn execute_trigger(&self, stream_id: u32) -> Result<()> {
unsafe { capi::acquire_execute_trigger(self.inner.as_ptr(), stream_id) }.ok()?;
Ok(())
}

fn stop(&self) -> Result<()> {
unsafe { capi::acquire_stop(self.inner.as_ptr()) }.ok()?;
Ok(())
Expand Down Expand Up @@ -157,6 +162,10 @@ impl Runtime {
.try_into()?)
}

fn execute_trigger(&self, stream_id: u32, py: Python<'_>) -> PyResult<()> {
Python::allow_threads(py, || Ok(self.inner.execute_trigger(stream_id)?))
}

fn get_available_data(&self, stream_id: u32) -> PyResult<Option<AvailableData>> {
let mut beg = null_mut();
let mut end = null_mut();
Expand Down
50 changes: 50 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import time
from datetime import timedelta
from time import sleep
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -621,6 +622,55 @@ def test_abort(runtime: Runtime):
assert nframes < p.video[0].max_frame_count


def wait_for_data(runtime: Runtime, stream_id: int = 0, timeout: Optional[timedelta] = None)-> acquire.AvailableData:
# None is used as a missing sentinel value, not to indicate no timeout.
if timeout is None:
timeout = timedelta(seconds=5)
sleep_duration = timedelta(microseconds=10000)
elapsed = timedelta()
while elapsed < timeout:
if packet := runtime.get_available_data(stream_id):
return packet
sleep(sleep_duration.total_seconds())
elapsed += sleep_duration
raise RuntimeError(f"Timed out waiting for condition after {elapsed.total_seconds()} seconds.")


def test_execute_trigger(runtime: Runtime):
dm = runtime.device_manager()
p = runtime.get_configuration()

p.video[0].camera.identifier = dm.select(DeviceKind.Camera, "simulated.*empty.*")
p.video[0].storage.identifier = dm.select(DeviceKind.Storage, "Trash")
p.video[0].camera.settings.binning = 1
p.video[0].camera.settings.shape = (64, 48)
p.video[0].camera.settings.exposure_time_us = 1e3
p.video[0].camera.settings.pixel_type = acquire.SampleType.U8
p.video[0].camera.settings.input_triggers.frame_start = Trigger(enable=True, line=0, edge='Rising')
p.video[0].max_frame_count = 10

p = runtime.set_configuration(p)

assert p.video[0].camera.settings.input_triggers.frame_start.enable

runtime.start()

# No triggers yet, so no data.
assert runtime.get_available_data(0) is None

# Snap a few individual frames
for i in range(p.video[0].max_frame_count):
runtime.execute_trigger(0)
packet = wait_for_data(runtime, 0)
frames = tuple(packet.frames())
assert packet.get_frame_count() == 1
assert frames[0].metadata().frame_id == i
del frames
del packet

runtime.stop()


# FIXME: (nclack) awkwardness around references (available frames, f)

# NOTES:
Expand Down

0 comments on commit 8c9ddf7

Please sign in to comment.