diff --git a/examples/analog_in/cont_voltage_acq_int_clk_every_n_samples_event.py b/examples/analog_in/cont_voltage_acq_int_clk_every_n_samples_event.py index c50e6242..aef6f700 100644 --- a/examples/analog_in/cont_voltage_acq_int_clk_every_n_samples_event.py +++ b/examples/analog_in/cont_voltage_acq_int_clk_every_n_samples_event.py @@ -18,9 +18,9 @@ def main(): def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): """Callback function for reading signals.""" nonlocal total_read - read = len(task.read(number_of_samples_per_channel=number_of_samples)) - total_read += read - print(f"Acquired data: {read} samples. Total {total_read}.", end="\r") + read = task.read(number_of_samples_per_channel=number_of_samples) + total_read += len(read) + print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r") return 0 diff --git a/examples/analog_in/voltage_acq_int_clk_dig_ref.py b/examples/analog_in/voltage_acq_int_clk_dig_ref.py new file mode 100644 index 00000000..077744ad --- /dev/null +++ b/examples/analog_in/voltage_acq_int_clk_dig_ref.py @@ -0,0 +1,18 @@ +"""Example of analog input voltage acquisition with reference trigger. + +This example demonstrates how to acquire a finite amount of data +using a digital reference trigger. +""" + +import nidaqmx +from nidaqmx.constants import READ_ALL_AVAILABLE, AcquisitionType + +with nidaqmx.Task() as task: + task.ai_channels.add_ai_voltage_chan("Dev1/ai0") + task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=100) + task.triggers.reference_trigger.cfg_dig_edge_ref_trig("/Dev1/PFI8", pretrigger_samples=50) + + task.start() + data = task.read(READ_ALL_AVAILABLE) + print("Acquired data: [" + ", ".join(f"{value:f}" for value in data) + "]") + task.stop() diff --git a/examples/synchronization/multi_function/ai_ao_sync.py b/examples/synchronization/multi_function/ai_ao_sync.py new file mode 100644 index 00000000..7c935b09 --- /dev/null +++ b/examples/synchronization/multi_function/ai_ao_sync.py @@ -0,0 +1,112 @@ +"""Example of analog input and output synchronization. + +This example demonstrates how to continuously acquire and +generate data at the same time, synchronized with one another. +""" + +from typing import Tuple + +import numpy as np +import numpy.typing + +import nidaqmx +from nidaqmx.constants import AcquisitionType, ProductCategory + + +def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: + """Gets the terminal name with the device prefix. + + Args: + task: Specifies the task to get the device name from. + terminal_name: Specifies the terminal name to get. + + Returns: + Indicates the terminal name with the device prefix. + """ + for device in task.devices: + if device.product_category not in [ + ProductCategory.C_SERIES_MODULE, + ProductCategory.SCXI_MODULE, + ]: + return f"/{device.name}/{terminal_name}" + + raise RuntimeError("Suitable device not found in task.") + + +def generate_sine_wave( + frequency: float, + amplitude: float, + sampling_rate: float, + number_of_samples: int, + phase_in: float = 0.0, +) -> Tuple[numpy.typing.NDArray[numpy.double], float]: + """Generates a sine wave with a specified phase. + + Args: + frequency: Specifies the frequency of the sine wave. + amplitude: Specifies the amplitude of the sine wave. + sampling_rate: Specifies the sampling rate of the sine wave. + number_of_samples: Specifies the number of samples to generate. + phase_in: Specifies the phase of the sine wave in radians. + + Returns: + Indicates a tuple containing the generated data and the phase + of the sine wave after generation. + """ + duration_time = number_of_samples / sampling_rate + duration_radians = duration_time * 2 * np.pi + phase_out = (phase_in + duration_radians) % (2 * np.pi) + t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False) + + return (amplitude * np.sin(frequency * t), phase_out) + + +def main(): + """Continuously acquires and generate data at the same time.""" + total_read = 0 + number_of_samples = 1000 + + with nidaqmx.Task() as ai_task, nidaqmx.Task() as ao_task: + + def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): + """Callback function for reading signals.""" + nonlocal total_read + read = ai_task.read(number_of_samples_per_channel=number_of_samples) + total_read += len(read) + print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r") + + return 0 + + ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") + ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/StartTrigger") + + ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0") + ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name) + + actual_sampling_rate = ao_task.timing.samp_clk_rate + print(f"Actual sampling rate: {actual_sampling_rate:g} S/s") + + ao_data, _ = generate_sine_wave( + frequency=10.0, + amplitude=1.0, + sampling_rate=actual_sampling_rate, + number_of_samples=number_of_samples, + ) + + ao_task.write(ao_data) + ao_task.start() + ai_task.start() + + input("Acquiring samples continuously. Press Enter to stop.\n") + + ai_task.stop() + ao_task.stop() + + print(f"\nAcquired {total_read} total samples.") + + +if __name__ == "__main__": + main() diff --git a/examples/synchronization/multi_function/cont_ai_di_acq.py b/examples/synchronization/multi_function/cont_ai_di_acq.py new file mode 100644 index 00000000..e3ed773d --- /dev/null +++ b/examples/synchronization/multi_function/cont_ai_di_acq.py @@ -0,0 +1,75 @@ +"""Example of analog and digital data acquisition at the same time. + +This example demonstrates how to continuously acquire analog and +digital data at the same time, synchronized with one another on +the same device. +""" + +import nidaqmx +from nidaqmx.constants import AcquisitionType, LineGrouping, ProductCategory + + +def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str: + """Gets the terminal name with the device prefix. + + Args: + task: Specifies the task to get the device name from. + terminal_name: Specifies the terminal name to get. + + Returns: + Indicates the terminal name with the device prefix. + """ + for device in task.devices: + if device.product_category not in [ + ProductCategory.C_SERIES_MODULE, + ProductCategory.SCXI_MODULE, + ]: + return f"/{device.name}/{terminal_name}" + + raise RuntimeError("Suitable device not found in task.") + + +def main(): + """Continuously acquire analog and digital data at the same time.""" + total_ai_read = 0 + total_di_read = 0 + + with nidaqmx.Task() as ai_task, nidaqmx.Task() as di_task: + + def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data): + """Callback function for reading signals.""" + nonlocal total_ai_read + nonlocal total_di_read + ai_read = ai_task.read(number_of_samples_per_channel=number_of_samples) + di_read = di_task.read(number_of_samples_per_channel=number_of_samples) + total_ai_read += len(ai_read) + total_di_read += len(di_read) + print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r") + + return 0 + + ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0") + ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS) + ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback) + terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/SampleClock") + + di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) + di_task.timing.cfg_samp_clk_timing( + 1000.0, terminal_name, sample_mode=AcquisitionType.CONTINUOUS + ) + + di_task.start() + ai_task.start() + + print("Acquiring samples continuously. Press Enter to stop.\n") + print("Read:\tAI\tDI\tTotal:\tAI\tDI") + input() + + ai_task.stop() + di_task.stop() + + print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.") + + +if __name__ == "__main__": + main()