Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prototype] Address Event Callbacks Contain Unusable Parameters Issue #472

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions generated/nidaqmx/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,46 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET,

return data.tolist()

def add_arguments_if_necessary(self, callback_function, expected_number_of_arguments):
if (callback_function.__code__.co_argcount < expected_number_of_arguments):
Copy link
Collaborator

@bkeryan bkeryan Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__code__ is a CPython implementation detail and probably won't work with PyPy.

Using the inspect module is probably more portable.

print("Not enough arguments! Adding arguments")
if ("every_n_samples_event_type" in parameters):
DeborahOoi96 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[style] If-statements in Python do not have parens unless they are needed for grouping or splitting across multiple lines. Running poetry run ni-python-styleguide fix will probably fix this.

result = self.n_samples_event_wrapper(parameters, callback_function)
elif ("signal_type" in parameters):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every N and signal events pass the callback an enum, but they use a bare ctypes or int type rather than the enum classes from nidaqmx.constants. See my prototype: https://github.com/ni/nidaqmx-python/pull/474/files

callback_method(self, EveryNSamplesEventType(every_n_samples_event_type), number_of_samples)

callback_method(self, Signal(signal_type))

result = self.register_signal_event_wrapper(parameters, callback_function)
elif ("status" in parameters):
result = self.register_done_event_wrapper(parameters, callback_function)
else:
result = callback_function
return result

def n_samples_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples)
elif ("task_handle" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples, callback_data)
elif ("callback_data" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(task_handle, every_n_samples_event_type, number_of_samples)
return result

def register_signal_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, signal_type, callback_data: callback_function(signal_type)
elif ("task_handle" not in parameters):
DeborahOoi96 marked this conversation as resolved.
Show resolved Hide resolved
result = lambda task_handle, signal_type, callback_data: callback_function(signal_type, callback_data)
elif ("callback_data" not in parameters):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last parameter would be useful if the user was allowed to pass in an optional Python object.

Copy link
Collaborator

@bkeryan bkeryan Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of what I mean:

def register_done_event(self, callback_method, callback_data = None):
   ...
def on_done(task, data):
    data.signal()

done_event = threading.Event()
task.register_done_event(on_done, done_event)

It's not critical because Python makes it easy to bind a closure or to pass in a member function as the callback, but it's a potential nice-to-have and it would avoid removing parameters from the signature.

result = lambda task_handle, signal_type, callback_data: callback_function(task_handle, signal_type)
return result

def register_done_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(status)
elif ("task_handle" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(status, callback_data)
elif ("callback_data" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(task_handle, status)
return result

def register_done_event(self, callback_method):
"""
Registers a callback function to receive an event when a task stops due
Expand Down Expand Up @@ -788,6 +828,7 @@ def register_done_event(self, callback_method):
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 3)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.DONE_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_done_event(self._handle, 0, callback_method, None)
Expand Down Expand Up @@ -836,6 +877,7 @@ def register_every_n_samples_acquired_into_buffer_event(
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 4)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.EVERY_N_SAMPS_ACQ_INTO_BUFFER_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_every_n_samples_event(
Expand Down Expand Up @@ -887,6 +929,7 @@ def register_every_n_samples_transferred_from_buffer_event(
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 4)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.EVERY_N_SAMPS_TRANSFERRED_FROM_BUFFER_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_every_n_samples_event(
Expand Down Expand Up @@ -933,6 +976,7 @@ def register_signal_event(self, signal_type, callback_method):
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 3)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.SIGNAL_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_signal_event(
Expand Down
44 changes: 44 additions & 0 deletions src/handwritten/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,46 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET,

return data.tolist()

def add_arguments_if_necessary(self, callback_function, expected_number_of_arguments):
if (callback_function.__code__.co_argcount < expected_number_of_arguments):
print("Not enough arguments! Adding arguments")
if ("every_n_samples_event_type" in parameters):
result = self.n_samples_event_wrapper(parameters, callback_function)
elif ("signal_type" in parameters):
result = self.register_signal_event_wrapper(parameters, callback_function)
elif ("status" in parameters):
result = self.register_done_event_wrapper(parameters, callback_function)
else:
result = callback_function
return result

def n_samples_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples)
elif ("task_handle" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples, callback_data)
elif ("callback_data" not in parameters):
result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(task_handle, every_n_samples_event_type, number_of_samples)
return result

def register_signal_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, signal_type, callback_data: callback_function(signal_type)
elif ("task_handle" not in parameters):
result = lambda task_handle, signal_type, callback_data: callback_function(signal_type, callback_data)
elif ("callback_data" not in parameters):
result = lambda task_handle, signal_type, callback_data: callback_function(task_handle, signal_type)
return result

def register_done_event_wrapper(self, parameters, callback_function):
if ("task_handle" and "callback_data" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(status)
elif ("task_handle" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(status, callback_data)
elif ("callback_data" not in parameters):
result = lambda task_handle, status, callback_data: callback_function(task_handle, status)
return result

def register_done_event(self, callback_method):
"""
Registers a callback function to receive an event when a task stops due
Expand Down Expand Up @@ -788,6 +828,7 @@ def register_done_event(self, callback_method):
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 3)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.DONE_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_done_event(self._handle, 0, callback_method, None)
Expand Down Expand Up @@ -836,6 +877,7 @@ def register_every_n_samples_acquired_into_buffer_event(
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 4)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.EVERY_N_SAMPS_ACQ_INTO_BUFFER_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_every_n_samples_event(
Expand Down Expand Up @@ -887,6 +929,7 @@ def register_every_n_samples_transferred_from_buffer_event(
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 4)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.EVERY_N_SAMPS_TRANSFERRED_FROM_BUFFER_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_every_n_samples_event(
Expand Down Expand Up @@ -933,6 +976,7 @@ def register_signal_event(self, signal_type, callback_method):
function.
"""
if callback_method is not None:
callback_method = self.add_arguments_if_necessary(callback_method, 3)
# If the event is already registered, the interpreter should raise DaqError with code
# DAQmxErrors.SIGNAL_EVENT_ALREADY_REGISTERED.
event_handler = self._interpreter.register_signal_event(
Expand Down
38 changes: 38 additions & 0 deletions tests/_event_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ def handle_done_event(self, task_handle: object, status: int, callback_data: obj
self._invoke_side_effect()
return 0

class DoneEventBasicParametersObserver(BaseEventObserver[DoneEvent]):
"""An observer for Done events with only basic parameters in callback function."""

def handle_done_event(self, status: int) -> int:
"""Handles a Done event."""
with self._lock:
self._events.append(DoneEvent(status))
self._event_semaphore.release()
self._invoke_side_effect()
return 0


class EveryNSamplesEventObserver(BaseEventObserver[EveryNSamplesEvent]):
"""An observer for Every N Samples events."""
Expand All @@ -87,6 +98,20 @@ def handle_every_n_samples_event(
self._invoke_side_effect()
return 0

class EveryNSamplesEventBasicParametersObserver(BaseEventObserver[EveryNSamplesEvent]):
"""An observer for Every N Samples events with only basic parameters in callback function."""

def handle_every_n_samples_event(
self,
every_n_samples_event_type: int,
number_of_samples: int,
) -> int:
"""Handles an Every N Samples event."""
with self._lock:
self._events.append(EveryNSamplesEvent(every_n_samples_event_type, number_of_samples))
self._event_semaphore.release()
self._invoke_side_effect()
return 0

class SignalEventObserver(BaseEventObserver[SignalEvent]):
"""An observer for Signal events."""
Expand All @@ -100,3 +125,16 @@ def handle_signal_event(
self._event_semaphore.release()
self._invoke_side_effect()
return 0

class SignalEventBasicParametersObserver(BaseEventObserver[SignalEvent]):
"""An observer for Signal events with only basic parameters in callback function."""

def handle_signal_event(
self, signal_type: int
) -> int:
"""Handles a Signal event."""
with self._lock:
self._events.append(SignalEvent(signal_type))
self._event_semaphore.release()
self._invoke_side_effect()
return 0
63 changes: 63 additions & 0 deletions tests/component/test_task_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
from nidaqmx.task import _TaskEventType
from tests._event_utils import (
DoneEventObserver,
DoneEventBasicParametersObserver,
EveryNSamplesEventObserver,
EveryNSamplesEventBasicParametersObserver,
SignalEventObserver,
SignalEventBasicParametersObserver,
)


Expand Down Expand Up @@ -55,6 +58,24 @@ def test___done_event_registered___run_finite_acquisition___callback_invoked_onc
assert all(e.status == 0 for e in event_observer.events)


def test___done_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_once_with_success_status(
ai_task: nidaqmx.Task,
) -> None:
event_observer = DoneEventBasicParametersObserver()
ai_task.register_done_event(event_observer.handle_done_event)
ai_task.timing.cfg_samp_clk_timing(
rate=10000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000
)

ai_task.start()

event_observer.wait_for_events()
with pytest.raises(TimeoutError):
event_observer.wait_for_events(timeout=100e-3)
assert len(event_observer.events) == 1
assert all(e.status == 0 for e in event_observer.events)


def test___every_n_samples_event_registered___run_finite_acquisition___callback_invoked_n_times_with_type_and_num_samples(
ai_task: nidaqmx.Task,
) -> None:
Expand All @@ -78,6 +99,29 @@ def test___every_n_samples_event_registered___run_finite_acquisition___callback_
)
assert all(e.number_of_samples == 100 for e in event_observer.events)

def test___every_n_samples_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_n_times_with_type_and_num_samples(
ai_task: nidaqmx.Task,
) -> None:
event_observer = EveryNSamplesEventBasicParametersObserver()
ai_task.register_every_n_samples_acquired_into_buffer_event(
100, event_observer.handle_every_n_samples_event
)
ai_task.timing.cfg_samp_clk_timing(
rate=10000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000
)

ai_task.start()

event_observer.wait_for_events(10)
with pytest.raises(TimeoutError):
event_observer.wait_for_events(timeout=100e-3)
assert len(event_observer.events) == 10
assert all(
e.event_type == EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value
for e in event_observer.events
)
assert all(e.number_of_samples == 100 for e in event_observer.events)


def test___signal_event_registered___run_finite_acquisition___callback_invoked_n_times_with_type(
ai_task_with_real_device: nidaqmx.Task,
Expand All @@ -98,6 +142,25 @@ def test___signal_event_registered___run_finite_acquisition___callback_invoked_n
assert all(e.signal_type == Signal.SAMPLE_COMPLETE.value for e in event_observer.events)


def test___signal_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_n_times_with_type(
ai_task_with_real_device: nidaqmx.Task,
) -> None:
ai_task = ai_task_with_real_device
event_observer = SignalEventBasicParametersObserver()
ai_task.register_signal_event(Signal.SAMPLE_COMPLETE, event_observer.handle_signal_event)
ai_task.timing.cfg_samp_clk_timing(
rate=10.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=10
)

ai_task.start()

event_observer.wait_for_events(10)
with pytest.raises(TimeoutError):
event_observer.wait_for_events(timeout=100e-3)
assert len(event_observer.events) == 10
assert all(e.signal_type == Signal.SAMPLE_COMPLETE.value for e in event_observer.events)


@pytest.mark.grpc_xfail(
reason="Requires NI gRPC Device Server version 2.2 or later", raises=RpcError
)
Expand Down
Loading