From 3ac59e9be21a78206c4b3aa2fe74cb14486b7d57 Mon Sep 17 00:00:00 2001 From: Jun Lin Date: Fri, 21 Jun 2024 21:12:32 -0500 Subject: [PATCH 1/3] add test for timing APIs --- tests/component/task/test_timing.py | 120 ++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 tests/component/task/test_timing.py diff --git a/tests/component/task/test_timing.py b/tests/component/task/test_timing.py new file mode 100644 index 00000000..1c759df0 --- /dev/null +++ b/tests/component/task/test_timing.py @@ -0,0 +1,120 @@ +import pytest + +from nidaqmx.constants import AcquisitionType, Edge, Level, LineGrouping, Polarity +from nidaqmx.task import Task + + +@pytest.fixture() +def sim_6535_di_single_line_task(task, sim_6535_device) -> Task: + """Gets DI task.""" + task.di_channels.add_di_chan( + sim_6535_device.di_lines[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES + ) + return task + + +def test___timing___cfg_handshaking___no_errors( + sim_6535_di_single_line_task: Task, +): + sim_6535_di_single_line_task.timing.cfg_handshaking_timing( + AcquisitionType.FINITE, samps_per_chan=2000 + ) + + +def test___timing___cfg_change_detection___no_errors( + sim_6535_di_single_line_task: Task, +): + sim_6535_di_single_line_task.timing.cfg_change_detection_timing( + "port0/line0:1", "port0/line3:5", AcquisitionType.FINITE, samps_per_chan=2000 + ) + + assert ( + sim_6535_di_single_line_task.timing.change_detect_di_rising_edge_physical_chans.name + == "port0/line0, port0/line1" + ) + assert ( + sim_6535_di_single_line_task.timing.change_detect_di_falling_edge_physical_chans.name + == "port0/line3, port0/line4, port0/line5" + ) + + +@pytest.mark.parametrize( + "clk_source, active_edge", + [ + ("/Dev1/PFI0", Edge.RISING), + ("/Dev1/PFI1", Edge.FALLING), + ], +) +def test___timing___cfg_pipelined_samp_clk___no_errors( + sim_6535_di_single_line_task: Task, + clk_source: str, + active_edge: int, +): + sim_6535_di_single_line_task.timing.cfg_pipelined_samp_clk_timing( + rate=32000.0, + source=clk_source, + active_edge=active_edge, + sample_mode=AcquisitionType.FINITE, + samps_per_chan=2000, + ) + + assert sim_6535_di_single_line_task.timing.samp_clk_src == clk_source + assert sim_6535_di_single_line_task.timing.samp_clk_active_edge == active_edge + + +@pytest.mark.parametrize( + "clk_source, active_edge, pause_when, ready_event_active_level", + [ + ("/Dev1/PFI0", Edge.RISING, Level.HIGH, Polarity.ACTIVE_HIGH), + ("/Dev1/PFI1", Edge.FALLING, Level.HIGH, Polarity.ACTIVE_LOW), + ("/Dev1/PFI1", Edge.FALLING, Level.LOW, Polarity.ACTIVE_LOW), + ], +) +def test___timing___cfg_burst_handshaking_import_clock___no_errors( + sim_6535_di_single_line_task: Task, + clk_source: str, + active_edge: int, + pause_when: int, + ready_event_active_level: int, +): + sim_6535_di_single_line_task.timing.cfg_burst_handshaking_timing_import_clock( + sample_clk_rate=32000.0, + sample_clk_src=clk_source, + sample_mode=AcquisitionType.FINITE, + samps_per_chan=2000, + sample_clk_active_edge=active_edge, + pause_when=pause_when, + ready_event_active_level=ready_event_active_level, + ) + + assert sim_6535_di_single_line_task.timing.samp_clk_rate == 32000 + assert sim_6535_di_single_line_task.timing.samp_clk_src == clk_source + assert sim_6535_di_single_line_task.timing.samp_clk_active_edge == active_edge + + +@pytest.mark.parametrize( + "clk_outp_term, clk_pulse_polarity, pause_when, ready_event_active_level", + [ + ("/Dev1/PFI0", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_HIGH), + ("/Dev1/PFI1", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_LOW), + ("/Dev1/PFI1", Polarity.ACTIVE_LOW, Level.LOW, Polarity.ACTIVE_LOW), + ], +) +def test___timing___cfg_burst_handshaking_export_clock___no_errors( + sim_6535_di_single_line_task: Task, + clk_outp_term: str, + clk_pulse_polarity: int, + pause_when: int, + ready_event_active_level: int, +): + sim_6535_di_single_line_task.timing.cfg_burst_handshaking_timing_export_clock( + sample_clk_rate=32000.0, + sample_clk_outp_term=clk_outp_term, + sample_mode=AcquisitionType.FINITE, + samps_per_chan=2000, + sample_clk_pulse_polarity=clk_pulse_polarity, + pause_when=pause_when, + ready_event_active_level=ready_event_active_level, + ) + + assert sim_6535_di_single_line_task.timing.samp_clk_rate == 32000 From 2800a9d99857bb264ea6af801651b7abc01466a7 Mon Sep 17 00:00:00 2001 From: Jun Lin Date: Mon, 24 Jun 2024 12:46:19 -0500 Subject: [PATCH 2/3] address review feedback --- tests/component/task/test_timing.py | 62 +++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/tests/component/task/test_timing.py b/tests/component/task/test_timing.py index 1c759df0..41ea0d12 100644 --- a/tests/component/task/test_timing.py +++ b/tests/component/task/test_timing.py @@ -1,11 +1,19 @@ import pytest -from nidaqmx.constants import AcquisitionType, Edge, Level, LineGrouping, Polarity +from nidaqmx.constants import ( + AcquisitionType, + Edge, + Level, + LineGrouping, + Polarity, + SampleTimingType, +) +from nidaqmx.system import Device from nidaqmx.task import Task @pytest.fixture() -def sim_6535_di_single_line_task(task, sim_6535_device) -> Task: +def sim_6535_di_single_line_task(task: Task, sim_6535_device: Device) -> Task: """Gets DI task.""" task.di_channels.add_di_chan( sim_6535_device.di_lines[0].name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES @@ -13,17 +21,21 @@ def sim_6535_di_single_line_task(task, sim_6535_device) -> Task: return task -def test___timing___cfg_handshaking___no_errors( +def test___timing___cfg_handshaking___sets_properties( sim_6535_di_single_line_task: Task, -): +) -> None: sim_6535_di_single_line_task.timing.cfg_handshaking_timing( AcquisitionType.FINITE, samps_per_chan=2000 ) + assert sim_6535_di_single_line_task.timing.samp_timing_type == SampleTimingType.HANDSHAKE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_mode == AcquisitionType.FINITE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_per_chan == 2000 + -def test___timing___cfg_change_detection___no_errors( +def test___timing___cfg_change_detection___sets_properties( sim_6535_di_single_line_task: Task, -): +) -> None: sim_6535_di_single_line_task.timing.cfg_change_detection_timing( "port0/line0:1", "port0/line3:5", AcquisitionType.FINITE, samps_per_chan=2000 ) @@ -36,6 +48,9 @@ def test___timing___cfg_change_detection___no_errors( sim_6535_di_single_line_task.timing.change_detect_di_falling_edge_physical_chans.name == "port0/line3, port0/line4, port0/line5" ) + assert sim_6535_di_single_line_task.timing.samp_timing_type == SampleTimingType.CHANGE_DETECTION + assert sim_6535_di_single_line_task.timing.samp_quant_samp_mode == AcquisitionType.FINITE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_per_chan == 2000 @pytest.mark.parametrize( @@ -45,11 +60,11 @@ def test___timing___cfg_change_detection___no_errors( ("/Dev1/PFI1", Edge.FALLING), ], ) -def test___timing___cfg_pipelined_samp_clk___no_errors( +def test___timing___cfg_pipelined_samp_clk___sets_properties( sim_6535_di_single_line_task: Task, clk_source: str, active_edge: int, -): +) -> None: sim_6535_di_single_line_task.timing.cfg_pipelined_samp_clk_timing( rate=32000.0, source=clk_source, @@ -60,6 +75,12 @@ def test___timing___cfg_pipelined_samp_clk___no_errors( assert sim_6535_di_single_line_task.timing.samp_clk_src == clk_source assert sim_6535_di_single_line_task.timing.samp_clk_active_edge == active_edge + assert ( + sim_6535_di_single_line_task.timing.samp_timing_type + == SampleTimingType.PIPELINED_SAMPLE_CLOCK + ) + assert sim_6535_di_single_line_task.timing.samp_quant_samp_mode == AcquisitionType.FINITE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_per_chan == 2000 @pytest.mark.parametrize( @@ -70,13 +91,13 @@ def test___timing___cfg_pipelined_samp_clk___no_errors( ("/Dev1/PFI1", Edge.FALLING, Level.LOW, Polarity.ACTIVE_LOW), ], ) -def test___timing___cfg_burst_handshaking_import_clock___no_errors( +def test___timing___cfg_burst_handshaking_import_clock___sets_properties( sim_6535_di_single_line_task: Task, clk_source: str, active_edge: int, pause_when: int, ready_event_active_level: int, -): +) -> None: sim_6535_di_single_line_task.timing.cfg_burst_handshaking_timing_import_clock( sample_clk_rate=32000.0, sample_clk_src=clk_source, @@ -87,9 +108,17 @@ def test___timing___cfg_burst_handshaking_import_clock___no_errors( ready_event_active_level=ready_event_active_level, ) + assert sim_6535_di_single_line_task.timing.samp_timing_type == SampleTimingType.BURST_HANDSHAKE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_mode == AcquisitionType.FINITE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_per_chan == 2000 assert sim_6535_di_single_line_task.timing.samp_clk_rate == 32000 assert sim_6535_di_single_line_task.timing.samp_clk_src == clk_source assert sim_6535_di_single_line_task.timing.samp_clk_active_edge == active_edge + assert sim_6535_di_single_line_task.triggers.pause_trigger.dig_lvl_when == pause_when + assert ( + sim_6535_di_single_line_task.export_signals.rdy_for_xfer_event_lvl_active_lvl + == ready_event_active_level + ) @pytest.mark.parametrize( @@ -100,13 +129,13 @@ def test___timing___cfg_burst_handshaking_import_clock___no_errors( ("/Dev1/PFI1", Polarity.ACTIVE_LOW, Level.LOW, Polarity.ACTIVE_LOW), ], ) -def test___timing___cfg_burst_handshaking_export_clock___no_errors( +def test___timing___cfg_burst_handshaking_export_clock___sets_properties( sim_6535_di_single_line_task: Task, clk_outp_term: str, clk_pulse_polarity: int, pause_when: int, ready_event_active_level: int, -): +) -> None: sim_6535_di_single_line_task.timing.cfg_burst_handshaking_timing_export_clock( sample_clk_rate=32000.0, sample_clk_outp_term=clk_outp_term, @@ -117,4 +146,13 @@ def test___timing___cfg_burst_handshaking_export_clock___no_errors( ready_event_active_level=ready_event_active_level, ) + assert sim_6535_di_single_line_task.timing.samp_timing_type == SampleTimingType.BURST_HANDSHAKE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_mode == AcquisitionType.FINITE + assert sim_6535_di_single_line_task.timing.samp_quant_samp_per_chan == 2000 assert sim_6535_di_single_line_task.timing.samp_clk_rate == 32000 + assert sim_6535_di_single_line_task.export_signals.samp_clk_pulse_polarity == clk_pulse_polarity + assert sim_6535_di_single_line_task.triggers.pause_trigger.dig_lvl_when == pause_when + assert ( + sim_6535_di_single_line_task.export_signals.rdy_for_xfer_event_lvl_active_lvl + == ready_event_active_level + ) From 57ba612b7669ca881126da297e88691ed1325c1b Mon Sep 17 00:00:00 2001 From: Jun Lin Date: Mon, 24 Jun 2024 14:16:24 -0500 Subject: [PATCH 3/3] fix test failure --- tests/component/task/test_timing.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/component/task/test_timing.py b/tests/component/task/test_timing.py index 41ea0d12..b8e56bd4 100644 --- a/tests/component/task/test_timing.py +++ b/tests/component/task/test_timing.py @@ -56,8 +56,8 @@ def test___timing___cfg_change_detection___sets_properties( @pytest.mark.parametrize( "clk_source, active_edge", [ - ("/Dev1/PFI0", Edge.RISING), - ("/Dev1/PFI1", Edge.FALLING), + ("PFI5", Edge.RISING), + ("RTSI7", Edge.FALLING), ], ) def test___timing___cfg_pipelined_samp_clk___sets_properties( @@ -86,9 +86,9 @@ def test___timing___cfg_pipelined_samp_clk___sets_properties( @pytest.mark.parametrize( "clk_source, active_edge, pause_when, ready_event_active_level", [ - ("/Dev1/PFI0", Edge.RISING, Level.HIGH, Polarity.ACTIVE_HIGH), - ("/Dev1/PFI1", Edge.FALLING, Level.HIGH, Polarity.ACTIVE_LOW), - ("/Dev1/PFI1", Edge.FALLING, Level.LOW, Polarity.ACTIVE_LOW), + ("PFI5", Edge.RISING, Level.HIGH, Polarity.ACTIVE_HIGH), + ("PFI5", Edge.FALLING, Level.HIGH, Polarity.ACTIVE_LOW), + ("RTSI7", Edge.FALLING, Level.LOW, Polarity.ACTIVE_LOW), ], ) def test___timing___cfg_burst_handshaking_import_clock___sets_properties( @@ -124,9 +124,9 @@ def test___timing___cfg_burst_handshaking_import_clock___sets_properties( @pytest.mark.parametrize( "clk_outp_term, clk_pulse_polarity, pause_when, ready_event_active_level", [ - ("/Dev1/PFI0", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_HIGH), - ("/Dev1/PFI1", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_LOW), - ("/Dev1/PFI1", Polarity.ACTIVE_LOW, Level.LOW, Polarity.ACTIVE_LOW), + ("PFI0", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_HIGH), + ("PFI1", Polarity.ACTIVE_HIGH, Level.HIGH, Polarity.ACTIVE_LOW), + ("PFI1", Polarity.ACTIVE_LOW, Level.LOW, Polarity.ACTIVE_LOW), ], ) def test___timing___cfg_burst_handshaking_export_clock___sets_properties(