Skip to content

Commit

Permalink
reworked as per suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
WayneDroid committed May 29, 2024
1 parent 59484bc commit 284edaa
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 155 deletions.
98 changes: 0 additions & 98 deletions examples/multi-functions-synchronization/ai_ao_sync.py

This file was deleted.

57 changes: 0 additions & 57 deletions examples/multi-functions-synchronization/cont_ai_di_acq.py

This file was deleted.

94 changes: 94 additions & 0 deletions examples/synchronization/multi_function/ai_ao_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Example of analog input and output synchronization.
This example demonstrates how to continuously acquire and
generate data at the same time, synchronized with one another.
"""

from typing import Tuple

import numpy as np
import numpy.typing

import nidaqmx
from nidaqmx.constants import AcquisitionType


def generate_sine_wave(
frequency: float,
amplitude: float,
sampling_rate: float,
number_of_samples: int,
phase_in: float = 0.0,
) -> Tuple[numpy.typing.NDArray[numpy.double], float]:
"""Generates a sine wave with a specified phase.
Args:
frequency (float): Specifies the frequency of the sine wave.
amplitude (float): Specifies the amplitude of the sine wave.
sampling_rate (float): Specifies the sampling rate of the sine wave.
number_of_samples (int): Specifies the number of samples to generate.
phase_in (Optional[float]): Specifies the phase of the sine wave in radians.
Returns:
Tuple[numpy.typing.NDArray[numpy.double], float]: Indicates a tuple
containing the generated data and the phase of the sine wave after generation.
"""
duration_time = number_of_samples / sampling_rate
duration_radians = duration_time * 2 * np.pi
phase_out = (phase_in + duration_radians) % (2 * np.pi)
t = np.linspace(phase_in, phase_in + duration_radians, number_of_samples, endpoint=False)

return (amplitude * np.sin(frequency * t), phase_out)


def main():
"""Continuously acquires and generate data at the same time."""
total_read = 0
number_of_samples = 1000

with nidaqmx.Task() as ai_task, nidaqmx.Task() as ao_task:

def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for reading signals."""
nonlocal total_read
read = ai_task.read(number_of_samples_per_channel=number_of_samples)
total_read += len(read)
print(f"Acquired data: {len(read)} samples. Total {total_read}.", end="\r")

return 0

ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0")
ao_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ao_task.triggers.start_trigger.cfg_dig_edge_start_trig("ai/StartTrigger")

actual_sampling_rate = ao_task.timing.samp_clk_rate
print(f"Actual sampling rate: {actual_sampling_rate:g} S/s")

ao_data, _ = generate_sine_wave(
frequency=10.0,
amplitude=1.0,
sampling_rate=actual_sampling_rate,
number_of_samples=number_of_samples,
)

try:
ao_task.write(ao_data)
ao_task.start()
ai_task.start()

input("Acquiring samples continuously. Press Enter to stop.\n")

except nidaqmx.DaqError as e:
print(e)
finally:
ai_task.stop()
ao_task.stop()

print(f"\nAcquired {total_read} total samples.")


if __name__ == "__main__":
main()
57 changes: 57 additions & 0 deletions examples/synchronization/multi_function/cont_ai_di_acq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Example of analog and digital data acquisition at the same time.
This example demonstrates how to continuously acquire analog and
digital data at the same time, synchronized with one another on
the same device.
"""

import nidaqmx
from nidaqmx.constants import AcquisitionType, LineGrouping


def main():
"""Continuously acquire analog and digital data at the same time."""
total_ai_read = 0
total_di_read = 0

with nidaqmx.Task() as ai_task, nidaqmx.Task() as di_task:

def callback(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
"""Callback function for reading signals."""
nonlocal total_ai_read
nonlocal total_di_read
ai_read = ai_task.read(number_of_samples_per_channel=1000)
di_read = di_task.read(number_of_samples_per_channel=1000)
total_ai_read += len(ai_read)
total_di_read += len(di_read)
print(f"\t{len(ai_read)}\t{len(di_read)}\t\t{total_ai_read}\t{total_di_read}", end="\r")

return 0

ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
ai_task.timing.cfg_samp_clk_timing(1000.0, sample_mode=AcquisitionType.CONTINUOUS)
ai_task.register_every_n_samples_acquired_into_buffer_event(1000, callback)
di_task.di_channels.add_di_chan("Dev1/port0", line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
di_task.timing.cfg_samp_clk_timing(
1000.0, "ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS
)

try:
di_task.start()
ai_task.start()

print("Acquiring samples continuously. Press Enter to stop.\n")
print("Read:\tAI\tDI\tTotal:\tAI\tDI")
input()

except nidaqmx.DaqError as e:
print(e)
finally:
ai_task.stop()
di_task.stop()

print(f"\nAcquired {total_ai_read} total AI samples and {total_di_read} total DI samples.")


if __name__ == "__main__":
main()

0 comments on commit 284edaa

Please sign in to comment.