Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and update AO examples #570

Merged
merged 13 commits into from
May 31, 2024
24 changes: 24 additions & 0 deletions examples/analog_out/analog_out_helper.py
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Helper functions for the analog_out example."""

import numpy
import numpy.typing

np = numpy
bkeryan marked this conversation as resolved.
Show resolved Hide resolved


def create_sine_wave(
frequency: float, amplitude: float, sampling_rate: float, duration: float
) -> numpy.typing.NDArray[numpy.double]:
"""Generate a sine wave."""
t = np.arange(0, duration, 1 / sampling_rate)

return amplitude * np.sin(2 * np.pi * frequency * t)


def create_voltage_sample(voltage: float, number_of_samples: int) -> float:
"""Generate a voltage sample."""
data = []
for i in range(number_of_samples):
data.append(voltage * i / number_of_samples)

return data
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
21 changes: 7 additions & 14 deletions examples/analog_out/cont_gen_voltage_wfm_int_clk.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,22 @@
waveform using an internal sample clock.
"""

import numpy as np
from analog_out_helper import create_sine_wave

import nidaqmx
from nidaqmx.constants import AcquisitionType

with nidaqmx.Task() as task:
frequency = 10
amplitude = 1
sampling_rate = 100
duration = 1

# generate the time array
t = np.arange(0, duration, 1 / sampling_rate)
# Generate the sine wave
data = amplitude * np.sin(2 * np.pi * frequency * t)
sampling_rate = 1000.0
data = create_sine_wave(
frequency=10.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
)

task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
task.timing.cfg_samp_clk_timing(
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
)
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)
task.write(data)
task.start()

input("Running task. Press Enter to stop.\n")
input("Generating voltage continuously. Press Enter to stop.\n")

task.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,29 @@
the specified number of samples generation is complete.
"""

import numpy as np
from analog_out_helper import create_sine_wave

import nidaqmx
from nidaqmx.constants import AcquisitionType

with nidaqmx.Task() as task:
frequency = 10
amplitude = 1
sampling_rate = 100
duration = 1

# generate the time array
t = np.arange(0, duration, 1 / sampling_rate)
# Generate the sine wave
data = amplitude * np.sin(2 * np.pi * frequency * t)
sampling_rate = 1000.0
data = create_sine_wave(
frequency=10.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
)

def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for written data."""
print(f"transferred {number_of_samples} samples event invoked.")
print("Transferred N samples")
bkeryan marked this conversation as resolved.
Show resolved Hide resolved

return 0

task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
task.timing.cfg_samp_clk_timing(
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
)
task.register_every_n_samples_transferred_from_buffer_event(100, callback)
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)
task.register_every_n_samples_transferred_from_buffer_event(1000, callback)
task.write(data)
task.start()

input("Running task. Press Enter to stop.\n")
input("Generating voltage continuously. Press Enter to stop.\n")

task.stop()
47 changes: 17 additions & 30 deletions examples/analog_out/cont_gen_voltage_wfm_int_clk_non_regen.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,37 @@
"""Example of analog output voltage generation.

This example demonstrates how to output a continuous periodic
waveform with new data using an internal sample clock while
the task is running.
This example demonstrates how to continuously generate an
analog output waveform by providing new data to the output buffer
as the task is running.

This example is useful if you want to generate a non-repeating waveform,
make updates on-the-fly, or generate a frequency that is not an
even divide-down of your sample clock. In this example,
the default frequency value is 17.0 to demonstrate that non-regenerative output
can be used to create a signal with a frequency that is not an even divide-down
of your sample clock.
"""

import time

import numpy as np
from analog_out_helper import create_sine_wave

import nidaqmx
from nidaqmx.constants import AcquisitionType, RegenerationMode


def get_sine_wave_data(cycle):
"""Generate a sine wave data."""
frequency = (10, 50)
amplitude = (1, 3)
sampling_rate = (100, 500)
duration = 1
selection = cycle % 2

# generate the time array
t = np.arange(0, duration, 1 / sampling_rate[selection])
# Generate the sine wave
data = amplitude[selection] * np.sin(2 * np.pi * frequency[selection] * t)
return data


with nidaqmx.Task() as task:
is_first_run = True
sampling_rate = 1000.0
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
task.out_stream.regen_mode = RegenerationMode.DONT_ALLOW_REGENERATION
task.timing.cfg_samp_clk_timing(
1000, sample_mode=AcquisitionType.CONTINUOUS, samps_per_chan=100
)
task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.CONTINUOUS)

try:
cycle = 1
print("Starting task. Press Ctrl+C to stop.")
time.sleep(1.0)

print("Generating voltage continuously. Press Ctrl+C to stop.")
while True:
data = get_sine_wave_data(cycle)
data = create_sine_wave(
frequency=17.0, amplitude=1.0, sampling_rate=sampling_rate, duration=1.0
)
task.write(data)
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
cycle = cycle + 1
if is_first_run:
is_first_run = False
task.start()
Expand Down
12 changes: 10 additions & 2 deletions examples/analog_out/gen_voltage_wfm_int_clk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
sample clock.
"""

from analog_out_helper import create_voltage_sample

import nidaqmx
from nidaqmx.constants import AcquisitionType

with nidaqmx.Task() as task:
data = []
total_samples = 1000
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
task.timing.cfg_samp_clk_timing(1000, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000)
task.timing.cfg_samp_clk_timing(
1000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=total_samples
)

print(f"Generate {task.write([1.1, 2.2, 3.3, 4.4, 5.5])} voltage samples.")
data = create_voltage_sample(5.0, total_samples)
number_of_samples_written = task.write(data, auto_start=True)
print(f"Generate {number_of_samples_written} voltage samples.")
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
task.wait_until_done()
task.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@
task.ao_channels.add_ao_voltage_chan("Dev1/ao0")

print(f"Generate {task.write(1.1)} voltage sample.")
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
bkeryan marked this conversation as resolved.
Show resolved Hide resolved
task.stop()
10 changes: 10 additions & 0 deletions tests/acceptance/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import contextlib
import os
import re
import runpy
import sys
import warnings
from pathlib import Path

Expand All @@ -29,6 +31,8 @@ def test___shipping_example___run___no_errors(example_path: Path, system):
)
if example_path.name == "ci_pulse_freq.py":
pytest.skip("Example times out if there is no signal.")
if example_path.name == "analog_out_helper.py":
pytest.skip("Helper for analog output.")
if re.search(r"\binput\(|\bKeyboardInterrupt\b", example_source):
pytest.skip("Example waits for keyboard input.")
if example_path.name == "nidaqmx_warnings.py":
Expand All @@ -38,6 +42,12 @@ def test___shipping_example___run___no_errors(example_path: Path, system):
context_manager = contextlib.nullcontext()

with context_manager:
example_dir = os.path.dirname(os.path.abspath(example_path))
if example_dir not in sys.path:
# Solving ModuleNotFoundError.
# Add the example directory to sys.path so that the
# helper module can be found.
sys.path.append(example_dir)
runpy.run_path(str(example_path))


Expand Down
Loading