Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-function synchronization examples #589

Merged
merged 6 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def main():
def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for reading signals."""
nonlocal total_read
read = len(task.read(number_of_samples_per_channel=number_of_samples))
total_read += read
print(f"Acquired data: {read} samples. Total {total_read}.", end="\r")
read = task.read(number_of_samples_per_channel=number_of_samples)
total_read += len(read)
print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r")

return 0

Expand Down
18 changes: 18 additions & 0 deletions examples/analog_in/voltage_acq_int_clk_dig_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Example of analog input voltage acquisition with reference trigger.

This example demonstrates how to acquire a finite amount of data
using a digital reference trigger.
"""

import nidaqmx
from nidaqmx.constants import READ_ALL_AVAILABLE, AcquisitionType

with nidaqmx.Task() as task:
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=100)
task.triggers.reference_trigger.cfg_dig_edge_ref_trig("/Dev1/PFI8", pretrigger_samples=50)

task.start()
data = task.read(READ_ALL_AVAILABLE)
print("Acquired data: [" + ", ".join(f"{value:f}" for value in data) + "]")
task.stop()
112 changes: 112 additions & 0 deletions examples/synchronization/multi_function/ai_ao_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Example of analog input and output synchronization.

This example demonstrates how to continuously acquire and
generate data at the same time, synchronized with one another.
"""

from typing import Tuple

import numpy as np
import numpy.typing

import nidaqmx
from nidaqmx.constants import AcquisitionType, ProductCategory


def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str:
"""Gets the terminal name with the device prefix.

Args:
task: Specifies the task to get the device name from.
terminal_name: Specifies the terminal name to get.

Returns:
Indicates the terminal name with the device prefix.
"""
for device in task.devices:
if device.product_category not in [
ProductCategory.C_SERIES_MODULE,
ProductCategory.SCXI_MODULE,
]:
return f"/{device.name}/{terminal_name}"

raise RuntimeError("Suitable device not found in task.")


def generate_sine_wave(
frequency: float,
amplitude: float,
sampling_rate: float,
number_of_samples: int,
phase_in: float = 0.0,
) -> Tuple[numpy.typing.NDArray[numpy.double], float]:
"""Generates a sine wave with a specified phase.

Args:
frequency: Specifies the frequency of the sine wave.
amplitude: Specifies the amplitude of the sine wave.
sampling_rate: Specifies the sampling rate of the sine wave.
number_of_samples: Specifies the number of samples to generate.
phase_in: Specifies the phase of the sine wave in radians.

Returns:
Indicates a tuple containing the generated data and the phase
of the sine wave after generation.
"""
duration_time = number_of_samples / sampling_rate
duration_radians = duration_time * 2 * np.pi
phase_out = (phase_in + duration_radians) % (2 * np.pi)
t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False)

return (amplitude * np.sin(frequency * t), phase_out)


def main():
"""Continuously acquires and generate data at the same time."""
total_read = 0
number_of_samples = 1000

with nidaqmx.Task() as ai_task, nidaqmx.Task() as ao_task:

def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for reading signals."""
nonlocal total_read
read = ai_task.read(number_of_samples_per_channel=number_of_samples)
total_read += len(read)
print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r")

return 0

ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/StartTrigger")

ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name)

actual_sampling_rate = ao_task.timing.samp_clk_rate
print(f"Actual sampling rate: {actual_sampling_rate:g} S/s")

ao_data, _ = generate_sine_wave(
frequency=10.0,
amplitude=1.0,
sampling_rate=actual_sampling_rate,
number_of_samples=number_of_samples,
)

ao_task.write(ao_data)
ao_task.start()
ai_task.start()

input("Acquiring samples continuously. Press Enter to stop.\n")

ai_task.stop()
ao_task.stop()

print(f"\nAcquired {total_read} total samples.")


if __name__ == "__main__":
main()
75 changes: 75 additions & 0 deletions examples/synchronization/multi_function/cont_ai_di_acq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Example of analog and digital data acquisition at the same time.

This example demonstrates how to continuously acquire analog and
digital data at the same time, synchronized with one another on
the same device.
"""

import nidaqmx
from nidaqmx.constants import AcquisitionType, LineGrouping, ProductCategory


def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str:
"""Gets the terminal name with the device prefix.

Args:
task: Specifies the task to get the device name from.
terminal_name: Specifies the terminal name to get.

Returns:
Indicates the terminal name with the device prefix.
"""
for device in task.devices:
if device.product_category not in [
ProductCategory.C_SERIES_MODULE,
ProductCategory.SCXI_MODULE,
]:
return f"/{device.name}/{terminal_name}"

raise RuntimeError("Suitable device not found in task.")


def main():
"""Continuously acquire analog and digital data at the same time."""
total_ai_read = 0
total_di_read = 0

with nidaqmx.Task() as ai_task, nidaqmx.Task() as di_task:

def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for reading signals."""
nonlocal total_ai_read
nonlocal total_di_read
ai_read = ai_task.read(number_of_samples_per_channel=number_of_samples)
di_read = di_task.read(number_of_samples_per_channel=number_of_samples)
total_ai_read += len(ai_read)
total_di_read += len(di_read)
print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r")

return 0

ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/SampleClock")

di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
di_task.timing.cfg_samp_clk_timing(
1000.0, terminal_name, sample_mode=AcquisitionType.CONTINUOUS
)

di_task.start()
ai_task.start()

print("Acquiring samples continuously. Press Enter to stop.\n")
print("Read:\tAI\tDI\tTotal:\tAI\tDI")
input()

ai_task.stop()
di_task.stop()

print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.")


if __name__ == "__main__":
main()
Loading