From 3c9288f0c81eebaec5048f7d5d148387d5371d10 Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Mon, 1 Apr 2024 10:09:00 -0700 Subject: [PATCH 1/2] fixed cleanup of java objects in bridge due to weakrefs immediately disappearing --- pycromanager/_version.py | 2 +- pycromanager/zmq_bridge/bridge.py | 64 +++++++++++++++++++---------- pycromanager/zmq_bridge/wrappers.py | 6 +-- 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pycromanager/_version.py b/pycromanager/_version.py index 42cd0936..7be1c6e2 100644 --- a/pycromanager/_version.py +++ b/pycromanager/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 29, 9) +version_info = (0, 29, 10) __version__ = ".".join(map(str, version_info)) diff --git a/pycromanager/zmq_bridge/bridge.py b/pycromanager/zmq_bridge/bridge.py index 71036f71..272ba0aa 100644 --- a/pycromanager/zmq_bridge/bridge.py +++ b/pycromanager/zmq_bridge/bridge.py @@ -12,8 +12,17 @@ import threading import weakref import atexit -import traceback -import pycromanager.logging as logging + +# global map from threads to zmq contexts +ZMQ_CONTEXTS = {} + +# this is the last thing that should happen, so we register it first +def context_cleanup(contexts): + for context in contexts.values(): + context.term() + del contexts + +atexit.register(context_cleanup, ZMQ_CONTEXTS) class _DataSocket: @@ -22,8 +31,14 @@ class _DataSocket: Includes ZMQ client, push, and pull sockets """ - def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"): + def __init__(self, port, type, debug=False, ip_address="127.0.0.1"): # request reply socket + # check if context already exists for this thread + if threading.current_thread().ident in ZMQ_CONTEXTS: + context = ZMQ_CONTEXTS[threading.current_thread().ident] + else: + context = zmq.Context() + ZMQ_CONTEXTS[threading.current_thread().ident] = context self._socket = context.socket(type) # if 1000 messages are queued up, queue indefinitely until they can be sent self._socket.setsockopt(zmq.SNDHWM, 1000) @@ -37,11 +52,11 @@ def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"): self._closed = False if type == zmq.PUSH: if debug: - logging.main_logger.debug("binding {}".format(port)) + print("binding {}".format(port)) self._socket.bind("tcp://{}:{}".format(ip_address, port)) else: if debug: - logging.main_logger.debug("connecting {}".format(port)) + print("connecting {}".format(port)) self._socket.connect("tcp://{}:{}".format(ip_address, port)) def _register_java_object(self, object): @@ -102,6 +117,8 @@ def _remove_bytes(self, bytes_data, structure): self._remove_bytes(bytes_data, structure[key]) def send(self, message, timeout=None, suppress_debug_message=False): + if zmq.Frame is None: + raise Exception("ZMQ has terminated, cannot send message") if message is None: message = {} # make sure any np types convert to python types so they can be json serialized @@ -111,7 +128,7 @@ def send(self, message, timeout=None, suppress_debug_message=False): self._remove_bytes(bytes_data, message) message_string = json.dumps(message) if self._debug and not suppress_debug_message: - logging.main_logger.debug("sending: {}".format(message)) + print("sending: {}".format(message)) # convert keys to byte array key_vals = [(identifier.tobytes(), value) for identifier, value in bytes_data] message_parts = [bytes(message_string, "iso-8859-1")] + [ @@ -180,7 +197,7 @@ def receive(self, timeout=None, suppress_debug_message=False): self._replace_bytes(message, identity_hash, value) if self._debug and not suppress_debug_message: - logging.main_logger.debug("received: {}".format(message)) + print("received: {}".format(message)) self._check_exception(message) return message @@ -203,7 +220,7 @@ def close(self): time.sleep(0.01) self._socket = None if self._debug: - logging.main_logger.debug('closed socket {}'.format(self._port)) + print('closed socket {}'.format(self._port)) self._closed = True def server_terminated(port): @@ -249,12 +266,12 @@ def create_or_get_existing_bridge(port: int=DEFAULT_PORT, convert_camel_case: bo raise Exception("Bridge for port {} and thread {} has been " "closed but not removed".format(port, threading.current_thread().name)) if debug: - logging.main_logger.debug("returning cached bridge for port {} thread {}".format( + print("returning cached bridge for port {} thread {}".format( port, threading.current_thread().name)) return bridge else: if debug: - logging.main_logger.debug("creating new bridge for port {} thread {}".format( + print("creating new bridge for port {} thread {}".format( port, threading.current_thread().name)) b = _Bridge(port, convert_camel_case, debug, ip_address, timeout, iterate) # store weak refs so that the existence of thread/port bridge caching doesn't prevent @@ -290,8 +307,7 @@ def __init__( self._debug = debug self._timeout = timeout self._iterate = iterate - self._main_socket = _DataSocket( - zmq.Context.instance(), port, zmq.REQ, debug=debug, ip_address=self._ip_address + self._main_socket = _DataSocket(port, zmq.REQ, debug=debug, ip_address=self._ip_address ) self._main_socket.send({"command": "connect", "debug": debug}) self._class_factory = _JavaClassFactory() @@ -322,9 +338,9 @@ def __del__(self): # than the one that created the bridge del _Bridge._cached_bridges_by_port_and_thread[self._port_thread_id] if self._debug: - logging.main_logger.debug("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format( + print("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format( str(self), self.port, threading.current_thread().name)) - logging.main_logger.debug("Running on thread {}".format(threading.current_thread().name)) + print("Running on thread {}".format(threading.current_thread().name)) @@ -553,17 +569,24 @@ def __init__(self, serialized_object, bridge: _Bridge): # # In case there's an exception rather than normal garbage collection, # # this makes sure cleanup occurs properly # # Need to use a wr to ensure that reference to close doesnt cause memeory leak - wr = weakref.ref(self._close) + wr = weakref.ref(self) def cleanup(): + if self._debug: + print('running cleanup for', self, 'on thread', threading.current_thread().name) if wr() is not None: + if self._debug: + print('executing cleanup for', self, 'on thread', threading.current_thread().name) # It hasn't already been garbage collected - wr()() + wr()._close() atexit.register(cleanup) + # cleanup_functions.append(cleanup) self._close_lock = Lock() def _close(self): with self._close_lock: if self._closed: + if self._debug: + print("Already closed") return if not hasattr(self, "_hash_code"): return # constructor didnt properly finish, nothing to clean up on java side @@ -636,7 +659,7 @@ def _get_bridge(self): # print('bridge just created', bridge_to_use, # 'cached', self._bridges_by_port_thread[combo_id]) if self._debug: - logging.main_logger.debug(self) + print(self) # print current call stack traceback.print_stack() warnings.warn("Duplicate bridges on port {} thread {}".format(port, thread_id)) @@ -649,15 +672,14 @@ def __del__(self): Tell java side this object is garbage collected so it can do the same if needed """ if self._debug: - logging.main_logger.debug('destructor for {} on thread {}'.format( + print('Java object shadow destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) - logging.main_logger.debug('Thread name: {}'.format(threading.current_thread().name)) try: self._close() except Exception as e: - traceback.print_exc() - logging.main_logger.error('Exception in destructor for {} on thread {}'.format( + print('Exception in destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) + raise e def _access_field(self, name): """ diff --git a/pycromanager/zmq_bridge/wrappers.py b/pycromanager/zmq_bridge/wrappers.py index 4f92e668..1e9c1624 100644 --- a/pycromanager/zmq_bridge/wrappers.py +++ b/pycromanager/zmq_bridge/wrappers.py @@ -17,8 +17,7 @@ def __init__( debug=False, ip_address="127.0.0.1" ): - _DataSocket.__init__(self, - context=zmq.Context.instance(), port=port, type=zmq.PULL, debug=debug, ip_address=ip_address) + _DataSocket.__init__(self, port=port, type=zmq.PULL, debug=debug, ip_address=ip_address) class PushSocket(_DataSocket): @@ -31,8 +30,7 @@ def __init__( debug=False, ip_address="127.0.0.1" ): - _DataSocket.__init__(self, - context=zmq.Context.instance(), port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address) + _DataSocket.__init__(self, port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address) From a6ef1623d2b8d69e19f5c84556e84a3535617d6f Mon Sep 17 00:00:00 2001 From: Henry Pinkard <7969470+henrypinkard@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:32:30 -0700 Subject: [PATCH 2/2] simplify ZMQ context --- pycromanager/zmq_bridge/bridge.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pycromanager/zmq_bridge/bridge.py b/pycromanager/zmq_bridge/bridge.py index 272ba0aa..fe153d66 100644 --- a/pycromanager/zmq_bridge/bridge.py +++ b/pycromanager/zmq_bridge/bridge.py @@ -13,18 +13,6 @@ import weakref import atexit -# global map from threads to zmq contexts -ZMQ_CONTEXTS = {} - -# this is the last thing that should happen, so we register it first -def context_cleanup(contexts): - for context in contexts.values(): - context.term() - del contexts - -atexit.register(context_cleanup, ZMQ_CONTEXTS) - - class _DataSocket: """ Wrapper for ZMQ socket that sends and recieves dictionaries @@ -33,12 +21,7 @@ class _DataSocket: def __init__(self, port, type, debug=False, ip_address="127.0.0.1"): # request reply socket - # check if context already exists for this thread - if threading.current_thread().ident in ZMQ_CONTEXTS: - context = ZMQ_CONTEXTS[threading.current_thread().ident] - else: - context = zmq.Context() - ZMQ_CONTEXTS[threading.current_thread().ident] = context + context = zmq.Context.instance() self._socket = context.socket(type) # if 1000 messages are queued up, queue indefinitely until they can be sent self._socket.setsockopt(zmq.SNDHWM, 1000)