Skip to content

Commit

Permalink
removed try-except and some reworks
Browse files Browse the repository at this point in the history
  • Loading branch information
WayneDroid committed May 30, 2024
1 parent 284edaa commit e2dd503
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 34 deletions.
56 changes: 37 additions & 19 deletions examples/synchronization/multi_function/ai_ao_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@
import numpy.typing

import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx.constants import AcquisitionType, ProductCategory


def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str:
"""Gets the terminal name with the device prefix.
Args:
task: Specifies the task to get the device name from.
terminal_name: Specifies the terminal name to get.
Returns:
Indicates the terminal name with the device prefix.
"""
for device in task.devices:
if device.product_category not in [
ProductCategory.C_SERIES_MODULE,
ProductCategory.SCXI_MODULE,
]:
return f"/{device.name}/{terminal_name}"

return ""


def generate_sine_wave(
Expand All @@ -23,15 +43,15 @@ def generate_sine_wave(
"""Generates a sine wave with a specified phase.
Args:
frequency (float): Specifies the frequency of the sine wave.
amplitude (float): Specifies the amplitude of the sine wave.
sampling_rate (float): Specifies the sampling rate of the sine wave.
number_of_samples (int): Specifies the number of samples to generate.
phase_in (Optional[float]): Specifies the phase of the sine wave in radians.
frequency: Specifies the frequency of the sine wave.
amplitude: Specifies the amplitude of the sine wave.
sampling_rate: Specifies the sampling rate of the sine wave.
number_of_samples: Specifies the number of samples to generate.
phase_in: Specifies the phase of the sine wave in radians.
Returns:
Tuple[numpy.typing.NDArray[numpy.double], float]: Indicates a tuple
containing the generated data and the phase of the sine wave after generation.
Indicates a tuple containing the generated data and the phase
of the sine wave after generation.
"""
duration_time = number_of_samples / sampling_rate
duration_radians = duration_time * 2 * np.pi
Expand Down Expand Up @@ -60,9 +80,11 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac
ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/StartTrigger")

ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig("ai/StartTrigger")
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig(terminal_name)

actual_sampling_rate = ao_task.timing.samp_clk_rate
print(f"Actual sampling rate: {actual_sampling_rate:g} S/s")
Expand All @@ -74,18 +96,14 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac
number_of_samples=number_of_samples,
)

try:
ao_task.write(ao_data)
ao_task.start()
ai_task.start()
ao_task.write(ao_data)
ao_task.start()
ai_task.start()

input("Acquiring samples continuously. Press Enter to stop.\n")
input("Acquiring samples continuously. Press Enter to stop.\n")

except nidaqmx.DaqError as e:
print(e)
finally:
ai_task.stop()
ao_task.stop()
ai_task.stop()
ao_task.stop()

print(f"\nAcquired {total_read} total samples.")

Expand Down
48 changes: 33 additions & 15 deletions examples/synchronization/multi_function/cont_ai_di_acq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,27 @@
"""

import nidaqmx
from nidaqmx.constants import AcquisitionType, LineGrouping
from nidaqmx.constants import AcquisitionType, LineGrouping, ProductCategory


def get_terminal_name_with_dev_prefix(task: nidaqmx.Task, terminal_name: str) -> str:
"""Gets the terminal name with the device prefix.
Args:
task: Specifies the task to get the device name from.
terminal_name: Specifies the terminal name to get.
Returns:
Indicates the terminal name with the device prefix.
"""
for device in task.devices:
if device.product_category not in [
ProductCategory.C_SERIES_MODULE,
ProductCategory.SCXI_MODULE,
]:
return f"/{device.name}/{terminal_name}"

return ""


def main():
Expand All @@ -20,8 +40,8 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac
"""Callback function for reading signals."""
nonlocal total_ai_read
nonlocal total_di_read
ai_read = ai_task.read(number_of_samples_per_channel=1000)
di_read = di_task.read(number_of_samples_per_channel=1000)
ai_read = ai_task.read(number_of_samples_per_channel=number_of_samples)
di_read = di_task.read(number_of_samples_per_channel=number_of_samples)
total_ai_read += len(ai_read)
total_di_read += len(di_read)
print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r")
Expand All @@ -31,24 +51,22 @@ def callback(task_handle, every_n_samples_event_type, number_of_samples, callbac
ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
terminal_name = get_terminal_name_with_dev_prefix(ai_task, "ai/SampleClock")

di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
di_task.timing.cfg_samp_clk_timing(
1000.0, "ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS
1000.0, terminal_name, sample_mode=AcquisitionType.CONTINUOUS
)

try:
di_task.start()
ai_task.start()
di_task.start()
ai_task.start()

print("Acquiring samples continuously. Press Enter to stop.\n")
print("Read:\tAI\tDI\tTotal:\tAI\tDI")
input()
print("Acquiring samples continuously. Press Enter to stop.\n")
print("Read:\tAI\tDI\tTotal:\tAI\tDI")
input()

except nidaqmx.DaqError as e:
print(e)
finally:
ai_task.stop()
di_task.stop()
ai_task.stop()
di_task.stop()

print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.")

Expand Down

0 comments on commit e2dd503

Please sign in to comment.