Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed cleanup of java objects in bridge due to weakrefs immediately disappearing #740

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pycromanager/_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version_info = (0, 29, 9)
version_info = (0, 29, 10)
__version__ = ".".join(map(str, version_info))
49 changes: 27 additions & 22 deletions pycromanager/zmq_bridge/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
import threading
import weakref
import atexit
import traceback
import pycromanager.logging as logging


class _DataSocket:
"""
Wrapper for ZMQ socket that sends and recieves dictionaries
Includes ZMQ client, push, and pull sockets
"""

def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"):
def __init__(self, port, type, debug=False, ip_address="127.0.0.1"):
# request reply socket
context = zmq.Context.instance()
self._socket = context.socket(type)
# if 1000 messages are queued up, queue indefinitely until they can be sent
self._socket.setsockopt(zmq.SNDHWM, 1000)
Expand All @@ -37,11 +35,11 @@ def __init__(self, context, port, type, debug=False, ip_address="127.0.0.1"):
self._closed = False
if type == zmq.PUSH:
if debug:
logging.main_logger.debug("binding {}".format(port))
print("binding {}".format(port))
self._socket.bind("tcp://{}:{}".format(ip_address, port))
else:
if debug:
logging.main_logger.debug("connecting {}".format(port))
print("connecting {}".format(port))
self._socket.connect("tcp://{}:{}".format(ip_address, port))

def _register_java_object(self, object):
Expand Down Expand Up @@ -102,6 +100,8 @@ def _remove_bytes(self, bytes_data, structure):
self._remove_bytes(bytes_data, structure[key])

def send(self, message, timeout=None, suppress_debug_message=False):
if zmq.Frame is None:
raise Exception("ZMQ has terminated, cannot send message")
if message is None:
message = {}
# make sure any np types convert to python types so they can be json serialized
Expand All @@ -111,7 +111,7 @@ def send(self, message, timeout=None, suppress_debug_message=False):
self._remove_bytes(bytes_data, message)
message_string = json.dumps(message)
if self._debug and not suppress_debug_message:
logging.main_logger.debug("sending: {}".format(message))
print("sending: {}".format(message))
# convert keys to byte array
key_vals = [(identifier.tobytes(), value) for identifier, value in bytes_data]
message_parts = [bytes(message_string, "iso-8859-1")] + [
Expand Down Expand Up @@ -180,7 +180,7 @@ def receive(self, timeout=None, suppress_debug_message=False):
self._replace_bytes(message, identity_hash, value)

if self._debug and not suppress_debug_message:
logging.main_logger.debug("received: {}".format(message))
print("received: {}".format(message))
self._check_exception(message)
return message

Expand All @@ -203,7 +203,7 @@ def close(self):
time.sleep(0.01)
self._socket = None
if self._debug:
logging.main_logger.debug('closed socket {}'.format(self._port))
print('closed socket {}'.format(self._port))
self._closed = True

def server_terminated(port):
Expand Down Expand Up @@ -249,12 +249,12 @@ def create_or_get_existing_bridge(port: int=DEFAULT_PORT, convert_camel_case: bo
raise Exception("Bridge for port {} and thread {} has been "
"closed but not removed".format(port, threading.current_thread().name))
if debug:
logging.main_logger.debug("returning cached bridge for port {} thread {}".format(
print("returning cached bridge for port {} thread {}".format(
port, threading.current_thread().name))
return bridge
else:
if debug:
logging.main_logger.debug("creating new bridge for port {} thread {}".format(
print("creating new bridge for port {} thread {}".format(
port, threading.current_thread().name))
b = _Bridge(port, convert_camel_case, debug, ip_address, timeout, iterate)
# store weak refs so that the existence of thread/port bridge caching doesn't prevent
Expand Down Expand Up @@ -290,8 +290,7 @@ def __init__(
self._debug = debug
self._timeout = timeout
self._iterate = iterate
self._main_socket = _DataSocket(
zmq.Context.instance(), port, zmq.REQ, debug=debug, ip_address=self._ip_address
self._main_socket = _DataSocket(port, zmq.REQ, debug=debug, ip_address=self._ip_address
)
self._main_socket.send({"command": "connect", "debug": debug})
self._class_factory = _JavaClassFactory()
Expand Down Expand Up @@ -322,9 +321,9 @@ def __del__(self):
# than the one that created the bridge
del _Bridge._cached_bridges_by_port_and_thread[self._port_thread_id]
if self._debug:
logging.main_logger.debug("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format(
print("BRIDGE DESCTRUCTOR for {} on port {} thread {}".format(
str(self), self.port, threading.current_thread().name))
logging.main_logger.debug("Running on thread {}".format(threading.current_thread().name))
print("Running on thread {}".format(threading.current_thread().name))



Expand Down Expand Up @@ -553,17 +552,24 @@ def __init__(self, serialized_object, bridge: _Bridge):
# # In case there's an exception rather than normal garbage collection,
# # this makes sure cleanup occurs properly
# # Need to use a wr to ensure that reference to close doesnt cause memeory leak
wr = weakref.ref(self._close)
wr = weakref.ref(self)
def cleanup():
if self._debug:
print('running cleanup for', self, 'on thread', threading.current_thread().name)
if wr() is not None:
if self._debug:
print('executing cleanup for', self, 'on thread', threading.current_thread().name)
# It hasn't already been garbage collected
wr()()
wr()._close()
atexit.register(cleanup)
# cleanup_functions.append(cleanup)
self._close_lock = Lock()

def _close(self):
with self._close_lock:
if self._closed:
if self._debug:
print("Already closed")
return
if not hasattr(self, "_hash_code"):
return # constructor didnt properly finish, nothing to clean up on java side
Expand Down Expand Up @@ -636,7 +642,7 @@ def _get_bridge(self):
# print('bridge just created', bridge_to_use,
# 'cached', self._bridges_by_port_thread[combo_id])
if self._debug:
logging.main_logger.debug(self)
print(self)
# print current call stack
traceback.print_stack()
warnings.warn("Duplicate bridges on port {} thread {}".format(port, thread_id))
Expand All @@ -649,15 +655,14 @@ def __del__(self):
Tell java side this object is garbage collected so it can do the same if needed
"""
if self._debug:
logging.main_logger.debug('destructor for {} on thread {}'.format(
print('Java object shadow destructor for {} on thread {}'.format(
str(self), threading.current_thread().name))
logging.main_logger.debug('Thread name: {}'.format(threading.current_thread().name))
try:
self._close()
except Exception as e:
traceback.print_exc()
logging.main_logger.error('Exception in destructor for {} on thread {}'.format(
print('Exception in destructor for {} on thread {}'.format(
str(self), threading.current_thread().name))
raise e

def _access_field(self, name):
"""
Expand Down
6 changes: 2 additions & 4 deletions pycromanager/zmq_bridge/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def __init__(
debug=False,
ip_address="127.0.0.1"
):
_DataSocket.__init__(self,
context=zmq.Context.instance(), port=port, type=zmq.PULL, debug=debug, ip_address=ip_address)
_DataSocket.__init__(self, port=port, type=zmq.PULL, debug=debug, ip_address=ip_address)


class PushSocket(_DataSocket):
Expand All @@ -31,8 +30,7 @@ def __init__(
debug=False,
ip_address="127.0.0.1"
):
_DataSocket.__init__(self,
context=zmq.Context.instance(), port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address)
_DataSocket.__init__(self, port=port, type=zmq.PUSH, debug=debug, ip_address=ip_address)



Expand Down
Loading