Skip to content

Commit

Permalink
Merge pull request #748 from henrypinkard/improve_notifications
Browse files Browse the repository at this point in the history
Add ability to do event generators; change notificaiton terminology; Add python mirrors to Java RAM datasets
  • Loading branch information
henrypinkard authored Apr 9, 2024
2 parents bf5bf0a + 7f22dfe commit 7fca760
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 101 deletions.
4 changes: 2 additions & 2 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.micro-manager.pycro-manager</groupId>
<artifactId>PycroManagerJava</artifactId>
<version>0.45.3</version>
<version>0.46</version>
<packaging>jar</packaging>
<name>Pycro-Manager Java</name>
<description>The Java components of Pycro-Manager</description>
Expand Down Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>org.micro-manager.acqengj</groupId>
<artifactId>AcqEngJ</artifactId>
<version>0.34.4</version>
<version>0.35.0</version>
</dependency>
<dependency>
<groupId>org.micro-manager.ndviewer</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,19 @@ public void run() {
});
}
try {
return added.get();
Object result = added.get();

JSONObject json = new JSONObject();
// Indicate the storage format of the image
if (storage_ instanceof NDTiffStorage) {
return result;
} else if (storage_ instanceof NDRAMStorage) {
return AcqEngMetadata.serializeAxes(axes);
} else {
throw new RuntimeException("Unknown storage type");
}


} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion pycromanager/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (0, 30, 3)
version_info = (0, 31, 0)
__version__ = ".".join(map(str, version_info))
110 changes: 88 additions & 22 deletions pycromanager/acq_future.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import threading
from pycromanager.acquisition.acq_eng_py.main.acq_notification import AcqNotification
from types import GeneratorType

def _axes_to_key(axes):
""" Turn axes into a hashable key """
return None if axes is None else frozenset(axes.items())

class AcquisitionFuture:

def __init__(self, acq, axes_or_axes_list):
def __init__(self, acq, axes_or_axes_list=None):
"""
:param event_or_events: a single event (dictionary) or a list of events
:param axes_or_axes_list: a single axes (dictionary) or a list of axes
"""
self._acq = acq
self._condition = threading.Condition()
self._notification_recieved = {}
self._generator_events = axes_or_axes_list is None
if not self._generator_events:
self._add_notifications(axes_or_axes_list)
self._last_notification = None

def _add_notifications(self, axes_or_axes_list):
if isinstance(axes_or_axes_list, dict):
axes_or_axes_list = [axes_or_axes_list]
for axes in axes_or_axes_list:
# single event
# TODO maybe unify snap and sequence cause this is confusing
self._notification_recieved[_axes_to_key(axes)] = {
AcqNotification.Hardware.PRE_HARDWARE: False,
Expand All @@ -37,43 +43,103 @@ def _notify(self, notification):
received. Want to store this, rather than just waiting around for it, in case the await methods are called
after the notification has already been sent.
"""
if notification.phase == AcqNotification.Acquisition.ACQ_EVENTS_FINISHED or \
notification.phase == AcqNotification.Image.DATA_SINK_FINISHED:
return # ignore for now...
if isinstance(notification.id, list):
keys = [_axes_to_key(ax) for ax in notification.id]
if notification.milestone == AcqNotification.Acquisition.ACQ_EVENTS_FINISHED or \
notification.milestone == AcqNotification.Image.DATA_SINK_FINISHED:
with self._condition:
self._last_notification = notification
self._condition.notify_all()
return
if isinstance(notification.payload, list):
keys = [_axes_to_key(ax) for ax in notification.payload]
else:
keys = [_axes_to_key(notification.id)]
keys = [_axes_to_key(notification.payload)]
# check if any keys are present in the notification_recieved dict
if not any([key in self._notification_recieved.keys() for key in keys]):
return # ignore notifications that aren't relevant to this future
for key in keys:
self._notification_recieved[key][notification.phase] = True
self._notification_recieved[key][notification.milestone] = True
with self._condition:
self._last_notification = notification
self._condition.notify_all()

def await_execution(self, axes, phase):
def _monitor_axes(self, axes_or_axes_list):
"""
In the case where the acquisition future is constructed for a Generator, the events to be monitored
are not known until the generator is run. If user code awaits for an event and that event has already
passed, the future must be able to check if the event has already passed and return immediately.
So this function is called by the generator as events are created to add them to the list of events to
keep track of.
:param axes_or_axes_list: the axes of the event
"""
if self._generator_events:
self._add_notifications(axes_or_axes_list)
else:
raise ValueError("This future was not constructed with a generator")

def await_execution(self, milestone, axes=None):
"""
Block until the given milestone is executed for the given axes
:param axes: the axes to wait for
:param milestone: the milestone to wait for
"""
key = _axes_to_key(axes)
if key not in self._notification_recieved.keys() or phase not in self._notification_recieved[key].keys():
notification = AcqNotification(None, axes, phase)
raise ValueError("this future is not expecting a notification for: " + str(notification.to_json()))
if not self._generator_events:
if key not in self._notification_recieved.keys() or milestone not in self._notification_recieved[key].keys():
notification = AcqNotification(None, axes, milestone)
raise ValueError("this future is not expecting a notification for: " + str(notification.to_json()))
with self._condition:
while not self._notification_recieved[key][phase]:
while not self._notification_recieved[key][milestone]:
self._condition.wait()

def await_image_saved(self, axes, return_image=False):
if isinstance(axes, list):
keys = [_axes_to_key(ax) for ax in axes]
else:
keys = [_axes_to_key(axes)]
for key in keys:
def await_image_saved(self, axes=None, return_image=False, return_metadata=False):
"""
Block until the image with the given axes is saved. Return the image and/or metadata if requested.
:param axes: the axes of the image to wait for. In the case of None, wait for the next image
:param return_image: if True, return the image
:param return_metadata: if True, return the metadata
"""

if axes is None:
# wait for the next image to be saved
with self._condition:
while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]:
# wait until something happens
self._condition.wait()
while self._last_notification is None or \
(not self._last_notification.milestone == AcqNotification.Image.IMAGE_SAVED and \
not self._last_notification.milestone == AcqNotification.Image.DATA_SINK_FINISHED):
self._condition.wait()
axes = self._last_notification.payload
else:
if isinstance(axes, list):
keys = [_axes_to_key(ax) for ax in axes]
else:
keys = [_axes_to_key(axes)]
if not self._generator_events and axes is not None:
# make sure this is a valid axes to wait for associated with this Future
if any([key not in self._notification_recieved.keys() for key in keys]):
raise ValueError("This AcquisitionFuture is not expecting a notification for the given axes")
# wait until all images are saved
for key in keys:
with self._condition:
if not self._generator_events:
while not self._notification_recieved[key][AcqNotification.Image.IMAGE_SAVED]:
self._condition.wait()

if return_image:
if isinstance(axes, list):
return [self._acq.get_dataset().read_image(**ax) for ax in axes]
else:
return self._acq.get_dataset().read_image(**axes)
if return_metadata:
if isinstance(axes, list):
return [self._acq.get_dataset().read_metadata(**ax) for ax in axes]
else:
return self._acq.get_dataset().read_metadata(**axes)
if return_image and return_metadata:
if isinstance(axes, list):
return [(self._acq.get_dataset().read_image(**ax), self._acq.get_dataset().read_metadata(**ax)) for ax in axes]
else:
return self._acq.get_dataset().read_image(**axes), self._acq.get_dataset().read_metadata(**axes)


48 changes: 26 additions & 22 deletions pycromanager/acquisition/acq_eng_py/main/acq_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,29 @@ class Image:
def to_string():
return "image"

def __init__(self, type, id, phase=None):
def __init__(self, type, payload, milestone=None):
if type == AcqNotification.Acquisition or type == AcqNotification.Acquisition.to_string():
self.type = AcqNotification.Acquisition
self.id = id
self.phase = phase
self.payload = payload
self.milestone = milestone
elif (type == AcqNotification.Image or type == AcqNotification.Image.to_string()) and \
phase == AcqNotification.Image.DATA_SINK_FINISHED:
milestone == AcqNotification.Image.DATA_SINK_FINISHED:
self.type = AcqNotification.Image
self.id = id
self.phase = phase
elif phase in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE,
self.payload = payload
self.milestone = milestone
elif milestone in [AcqNotification.Camera.PRE_SNAP, AcqNotification.Camera.POST_EXPOSURE,
AcqNotification.Camera.PRE_SEQUENCE_STARTED]:
self.type = AcqNotification.Camera
self.id = json.loads(id) if isinstance(id, str) else id # convert from '{'time': 5}' to {'time': 5}
elif phase in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]:
self.payload = json.loads(payload) if isinstance(payload, str) else payload # convert from '{'time': 5}' to {'time': 5}
elif milestone in [AcqNotification.Hardware.PRE_HARDWARE, AcqNotification.Hardware.POST_HARDWARE]:
self.type = AcqNotification.Hardware
self.id = json.loads(id) if isinstance(id, str) else id # convert from '{'time': 5}' to {'time': 5}
elif phase == AcqNotification.Image.IMAGE_SAVED:
self.payload = json.loads(payload) if isinstance(payload, str) else payload # convert from '{'time': 5}' to {'time': 5}
elif milestone == AcqNotification.Image.IMAGE_SAVED:
self.type = AcqNotification.Image
self.id = id
self.payload = payload
else:
raise ValueError("Unknown phase")
self.phase = phase
raise ValueError("Unknown milestone")
self.milestone = milestone


@staticmethod
Expand All @@ -76,25 +76,29 @@ def create_data_sink_finished_notification():
def create_image_saved_notification(image_descriptor):
return AcqNotification(AcqNotification.Image, image_descriptor, AcqNotification.Image.IMAGE_SAVED)

def __repr__(self):
json = self.to_json()
return f"AcqNotification({json})"

def to_json(self):
n = {}
n['type'] = self.type
n['phase'] = self.phase
if self.id:
n['id'] = self.id
n['milestone'] = self.milestone
if self.payload:
n['payload'] = self.payload
return n

@staticmethod
def from_json(json):
return AcqNotification(json['type'],
json['id'] if 'id' in json else None,
json['phase'] if 'phase' in json else None)
json['payload'] if 'payload' in json else None,
json['milestone'] if 'milestone' in json else None)

def is_acquisition_finished_notification(self):
return self.phase == AcqNotification.Acquisition.ACQ_EVENTS_FINISHED
return self.milestone == AcqNotification.Acquisition.ACQ_EVENTS_FINISHED

def is_data_sink_finished_notification(self):
return self.phase == AcqNotification.Image.DATA_SINK_FINISHED
return self.milestone == AcqNotification.Image.DATA_SINK_FINISHED

def is_image_saved_notification(self):
return self.phase == AcqNotification.Image.IMAGE_SAVED
return self.milestone == AcqNotification.Image.IMAGE_SAVED
Loading

0 comments on commit 7fca760

Please sign in to comment.