diff --git a/pycromanager/_version.py b/pycromanager/_version.py index 42cd0936..7be1c6e2 100644 --- a/pycromanager/_version.py +++ b/pycromanager/_version.py @@ -1,2 +1,2 @@ -version_info = (0, 29, 9) +version_info = (0, 29, 10) __version__ = ".".join(map(str, version_info)) diff --git a/pycromanager/zmq_bridge/bridge.py b/pycromanager/zmq_bridge/bridge.py index 71036f71..fe153d66 100644 --- a/pycromanager/zmq_bridge/bridge.py +++ b/pycromanager/zmq_bridge/bridge.py @@ -12,9 +12,6 @@ import threading import weakref import atexit -import traceback -import pycromanager.logging as logging - class _DataSocket: """ @@ -22,8 +19,9 @@ class _DataSocket: Includes ZMQ client, push, and pull sockets """ - def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"): + def __init__(self, port, type, debug=False, ip_address="127.0.0.1"): # request reply socket + context = zmq.Context.instance() self._socket = context.socket(type) # if 1000 messages are queued up, queue indefinitely until they can be sent self._socket.setsockopt(zmq.SNDHWM, 1000) @@ -37,11 +35,11 @@ def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"): self._closed = False if type == zmq.PUSH: if debug: - logging.main_logger.debug("binding {}".format(port)) + print("binding {}".format(port)) self._socket.bind("tcp://{}:{}".format(ip_address, port)) else: if debug: - logging.main_logger.debug("connecting {}".format(port)) + print("connecting {}".format(port)) self._socket.connect("tcp://{}:{}".format(ip_address, port)) def _register_java_object(self, object): @@ -102,6 +100,8 @@ def _remove_bytes(self, bytes_data, structure): self._remove_bytes(bytes_data, structure[key]) def send(self, message, timeout=None, suppress_debug_message=False): + if zmq.Frame is None: + raise Exception("ZMQ has terminated, cannot send message") if message is None: message = {} # make sure any np types convert to python types so they can be json serialized @@ -111,7 +111,7 @@ def send(self, message, timeout=None, suppress_debug_message=False): self._remove_bytes(bytes_data, message) message_string = json.dumps(message) if self._debug and not suppress_debug_message: - logging.main_logger.debug("sending: {}".format(message)) + print("sending: {}".format(message)) # convert keys to byte array key_vals = [(identifier.tobytes(), value) for identifier, value in bytes_data] message_parts = [bytes(message_string, "iso-8859-1")] + [ @@ -180,7 +180,7 @@ def receive(self, timeout=None, suppress_debug_message=False): self._replace_bytes(message, identity_hash, value) if self._debug and not suppress_debug_message: - logging.main_logger.debug("received: {}".format(message)) + print("received: {}".format(message)) self._check_exception(message) return message @@ -203,7 +203,7 @@ def close(self): time.sleep(0.01) self._socket = None if self._debug: - logging.main_logger.debug('closed socket {}'.format(self._port)) + print('closed socket {}'.format(self._port)) self._closed = True def server_terminated(port): @@ -249,12 +249,12 @@ def create_or_get_existing_bridge(port: int=DEFAULT_PORT, convert_camel_case: bo raise Exception("Bridge for port {} and thread {} has been " "closed but not removed".format(port, threading.current_thread().name)) if debug: - logging.main_logger.debug("returning cached bridge for port {} thread {}".format( + print("returning cached bridge for port {} thread {}".format( port, threading.current_thread().name)) return bridge else: if debug: - logging.main_logger.debug("creating new bridge for port {} thread {}".format( + print("creating new bridge for port {} thread {}".format( port, threading.current_thread().name)) b = _Bridge(port, convert_camel_case, debug, ip_address, timeout, iterate) # store weak refs so that the existence of thread/port bridge caching doesn't prevent @@ -290,8 +290,7 @@ def __init__( self._debug = debug self._timeout = timeout self._iterate = iterate - self._main_socket = _DataSocket( - zmq.Context.instance(), port, zmq.REQ, debug=debug, ip_address=self._ip_address + self._main_socket = _DataSocket(port, zmq.REQ, debug=debug, ip_address=self._ip_address ) self._main_socket.send({"command": "connect", "debug": debug}) self._class_factory = _JavaClassFactory() @@ -322,9 +321,9 @@ def __del__(self): # than the one that created the bridge del _Bridge._cached_bridges_by_port_and_thread[self._port_thread_id] if self._debug: - logging.main_logger.debug("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format( + print("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format( str(self), self.port, threading.current_thread().name)) - logging.main_logger.debug("Running on thread {}".format(threading.current_thread().name)) + print("Running on thread {}".format(threading.current_thread().name)) @@ -553,17 +552,24 @@ def __init__(self, serialized_object, bridge: _Bridge): # # In case there's an exception rather than normal garbage collection, # # this makes sure cleanup occurs properly # # Need to use a wr to ensure that reference to close doesnt cause memeory leak - wr = weakref.ref(self._close) + wr = weakref.ref(self) def cleanup(): + if self._debug: + print('running cleanup for', self, 'on thread', threading.current_thread().name) if wr() is not None: + if self._debug: + print('executing cleanup for', self, 'on thread', threading.current_thread().name) # It hasn't already been garbage collected - wr()() + wr()._close() atexit.register(cleanup) + # cleanup_functions.append(cleanup) self._close_lock = Lock() def _close(self): with self._close_lock: if self._closed: + if self._debug: + print("Already closed") return if not hasattr(self, "_hash_code"): return # constructor didnt properly finish, nothing to clean up on java side @@ -636,7 +642,7 @@ def _get_bridge(self): # print('bridge just created', bridge_to_use, # 'cached', self._bridges_by_port_thread[combo_id]) if self._debug: - logging.main_logger.debug(self) + print(self) # print current call stack traceback.print_stack() warnings.warn("Duplicate bridges on port {} thread {}".format(port, thread_id)) @@ -649,15 +655,14 @@ def __del__(self): Tell java side this object is garbage collected so it can do the same if needed """ if self._debug: - logging.main_logger.debug('destructor for {} on thread {}'.format( + print('Java object shadow destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) - logging.main_logger.debug('Thread name: {}'.format(threading.current_thread().name)) try: self._close() except Exception as e: - traceback.print_exc() - logging.main_logger.error('Exception in destructor for {} on thread {}'.format( + print('Exception in destructor for {} on thread {}'.format( str(self), threading.current_thread().name)) + raise e def _access_field(self, name): """ diff --git a/pycromanager/zmq_bridge/wrappers.py b/pycromanager/zmq_bridge/wrappers.py index 4f92e668..1e9c1624 100644 --- a/pycromanager/zmq_bridge/wrappers.py +++ b/pycromanager/zmq_bridge/wrappers.py @@ -17,8 +17,7 @@ def __init__( debug=False, ip_address="127.0.0.1" ): - _DataSocket.__init__(self, - context=zmq.Context.instance(), port=port, type=zmq.PULL, debug=debug, ip_address=ip_address) + _DataSocket.__init__(self, port=port, type=zmq.PULL, debug=debug, ip_address=ip_address) class PushSocket(_DataSocket): @@ -31,8 +30,7 @@ def __init__( debug=False, ip_address="127.0.0.1" ): - _DataSocket.__init__(self, - context=zmq.Context.instance(), port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address) + _DataSocket.__init__(self, port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address)