-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add multi-function synchronization examples (#589)
* Add synchronization examples * Add synchronization examples * reworked as per suggestions * removed try-except and some reworks * raise RuntimeError instead of returning empty string
- Loading branch information
1 parent
860d840
commit cc140f7
Showing
4 changed files
with
208 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
"""Example of analog input voltage acquisition with reference trigger. | ||
This example demonstrates how to acquire a finite amount of data | ||
using a digital reference trigger. | ||
""" | ||
|
||
import nidaqmx | ||
from nidaqmx.constants import READ_ALL_AVAILABLE, AcquisitionType | ||
|
||
with nidaqmx.Task() as task: | ||
task.ai_channels.add_ai_voltage_chan("Dev1/ai0") | ||
task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=100) | ||
task.triggers.reference_trigger.cfg_dig_edge_ref_trig("/Dev1/PFI8", pretrigger_samples=50) | ||
|
||
task.start() | ||
data = task.read(READ_ALL_AVAILABLE) | ||
print("Acquired data: [" + ", ".join(f"{value:f}" for value in data) + "]") | ||
task.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
"""Example of analog input and output synchronization. | ||
This example demonstrates how to continuously acquire and | ||
generate data at the same time, synchronized with one another. | ||
""" | ||
|
||
from typing import Tuple | ||
|
||
import numpy as np | ||
import numpy.typing | ||
|
||
import nidaqmx | ||
from nidaqmx.constants import AcquisitionType, ProductCategory | ||
|
||
|
||
def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: | ||
"""Gets the terminal name with the device prefix. | ||
Args: | ||
task: Specifies the task to get the device name from. | ||
terminal_name: Specifies the terminal name to get. | ||
Returns: | ||
Indicates the terminal name with the device prefix. | ||
""" | ||
for device in task.devices: | ||
if device.product_category not in [ | ||
ProductCategory.C_SERIES_MODULE, | ||
ProductCategory.SCXI_MODULE, | ||
]: | ||
return f"/{device.name}/{terminal_name}" | ||
|
||
raise RuntimeError("Suitable device not found in task.") | ||
|
||
|
||
def generate_sine_wave( | ||
frequency: float, | ||
amplitude: float, | ||
sampling_rate: float, | ||
number_of_samples: int, | ||
phase_in: float = 0.0, | ||
) -> Tuple[numpy.typing.NDArray[numpy.double], float]: | ||
"""Generates a sine wave with a specified phase. | ||
Args: | ||
frequency: Specifies the frequency of the sine wave. | ||
amplitude: Specifies the amplitude of the sine wave. | ||
sampling_rate: Specifies the sampling rate of the sine wave. | ||
number_of_samples: Specifies the number of samples to generate. | ||
phase_in: Specifies the phase of the sine wave in radians. | ||
Returns: | ||
Indicates a tuple containing the generated data and the phase | ||
of the sine wave after generation. | ||
""" | ||
duration_time = number_of_samples / sampling_rate | ||
duration_radians = duration_time * 2 * np.pi | ||
phase_out = (phase_in + duration_radians) % (2 * np.pi) | ||
t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False) | ||
|
||
return (amplitude * np.sin(frequency * t), phase_out) | ||
|
||
|
||
def main(): | ||
"""Continuously acquires and generate data at the same time.""" | ||
total_read = 0 | ||
number_of_samples = 1000 | ||
|
||
with nidaqmx.Task() as ai_task, nidaqmx.Task() as ao_task: | ||
|
||
def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): | ||
"""Callback function for reading signals.""" | ||
nonlocal total_read | ||
read = ai_task.read(number_of_samples_per_channel=number_of_samples) | ||
total_read += len(read) | ||
print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r") | ||
|
||
return 0 | ||
|
||
ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") | ||
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) | ||
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) | ||
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/StartTrigger") | ||
|
||
ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0") | ||
ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) | ||
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name) | ||
|
||
actual_sampling_rate = ao_task.timing.samp_clk_rate | ||
print(f"Actual sampling rate: {actual_sampling_rate:g} S/s") | ||
|
||
ao_data, _ = generate_sine_wave( | ||
frequency=10.0, | ||
amplitude=1.0, | ||
sampling_rate=actual_sampling_rate, | ||
number_of_samples=number_of_samples, | ||
) | ||
|
||
ao_task.write(ao_data) | ||
ao_task.start() | ||
ai_task.start() | ||
|
||
input("Acquiring samples continuously. Press Enter to stop.\n") | ||
|
||
ai_task.stop() | ||
ao_task.stop() | ||
|
||
print(f"\nAcquired {total_read} total samples.") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
"""Example of analog and digital data acquisition at the same time. | ||
This example demonstrates how to continuously acquire analog and | ||
digital data at the same time, synchronized with one another on | ||
the same device. | ||
""" | ||
|
||
import nidaqmx | ||
from nidaqmx.constants import AcquisitionType, LineGrouping, ProductCategory | ||
|
||
|
||
def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: | ||
"""Gets the terminal name with the device prefix. | ||
Args: | ||
task: Specifies the task to get the device name from. | ||
terminal_name: Specifies the terminal name to get. | ||
Returns: | ||
Indicates the terminal name with the device prefix. | ||
""" | ||
for device in task.devices: | ||
if device.product_category not in [ | ||
ProductCategory.C_SERIES_MODULE, | ||
ProductCategory.SCXI_MODULE, | ||
]: | ||
return f"/{device.name}/{terminal_name}" | ||
|
||
raise RuntimeError("Suitable device not found in task.") | ||
|
||
|
||
def main(): | ||
"""Continuously acquire analog and digital data at the same time.""" | ||
total_ai_read = 0 | ||
total_di_read = 0 | ||
|
||
with nidaqmx.Task() as ai_task, nidaqmx.Task() as di_task: | ||
|
||
def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): | ||
"""Callback function for reading signals.""" | ||
nonlocal total_ai_read | ||
nonlocal total_di_read | ||
ai_read = ai_task.read(number_of_samples_per_channel=number_of_samples) | ||
di_read = di_task.read(number_of_samples_per_channel=number_of_samples) | ||
total_ai_read += len(ai_read) | ||
total_di_read += len(di_read) | ||
print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r") | ||
|
||
return 0 | ||
|
||
ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") | ||
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) | ||
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) | ||
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/SampleClock") | ||
|
||
di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) | ||
di_task.timing.cfg_samp_clk_timing( | ||
1000.0, terminal_name, sample_mode=AcquisitionType.CONTINUOUS | ||
) | ||
|
||
di_task.start() | ||
ai_task.start() | ||
|
||
print("Acquiring samples continuously. Press Enter to stop.\n") | ||
print("Read:\tAI\tDI\tTotal:\tAI\tDI") | ||
input() | ||
|
||
ai_task.stop() | ||
di_task.stop() | ||
|
||
print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |