Skip to content

Commit

Permalink
Use selectors module instead of poll directly
Browse files Browse the repository at this point in the history
selectors provides implementation-indepandent interface to interact with
various IO multiplexing methods. Using poll directly means that the code
would fail if select.poll was missing (for example with gevent or Eventlet
monkey patching enabled) while selectors.DefaultSelector can be depended
on to be present.

Related: seb-m#78
  • Loading branch information
jstasiak authored and Jakub Stasiak committed Jun 13, 2016
1 parent c1c35fa commit fe83134
Showing 1 changed file with 17 additions and 28 deletions.
45 changes: 17 additions & 28 deletions pyinotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, version):
# Import directives
import threading
import os
import select
import selectors
import struct
import fcntl
import errno
Expand Down Expand Up @@ -1107,7 +1107,7 @@ def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
if read_freq is > 0, this thread sleeps
max(0, read_freq - (timeout / 1000)) seconds. But if
timeout is None it may be different because
poll is blocking waiting for something to read.
selector is blocking waiting for something to read.
@type read_freq: int
@param threshold: File descriptor will be read only if the accumulated
size to read becomes >= threshold. If != 0, you likely
Expand All @@ -1118,17 +1118,16 @@ def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
At least with read_freq set you might sleep.
@type threshold: int
@param timeout: see read_freq above. If provided, it must be set in
milliseconds. See
https://docs.python.org/3/library/select.html#select.poll.poll
milliseconds.
@type timeout: int
"""
# Watch Manager instance
self._watch_manager = watch_manager
# File descriptor
self._fd = self._watch_manager.get_fd()
# Poll object and registration
self._pollobj = select.poll()
self._pollobj.register(self._fd, select.POLLIN)
# Selector object and registration
self._selector = selectors.DefaultSelector()
self._selector.register(self._fd, selectors.EVENT_READ)
# This pipe is correctely initialized and used by ThreadedNotifier
self._pipe = (-1, -1)
# Event queue
Expand Down Expand Up @@ -1192,24 +1191,15 @@ def check_events(self, timeout=None):
@return: New events to read.
@rtype: bool
"""
while True:
try:
# blocks up to 'timeout' milliseconds
if timeout is None:
timeout = self._timeout
ret = self._pollobj.poll(timeout)
except select.error as err:
if err.args[0] == errno.EINTR:
continue # interrupted, retry
else:
raise
else:
break
# blocks up to 'timeout' milliseconds
if timeout is None:
timeout = self._timeout
ret = self._selector.select(timeout / 1000.0 if timeout is not None else None)

if not ret or (self._pipe[0] == ret[0][0]):
if not ret or (self._pipe[0] == ret[0][0].fileobj):
return False
# only one fd is polled
return ret[0][1] & select.POLLIN
return ret[0][1] & selectors.EVENT_READ

def read_events(self):
"""
Expand Down Expand Up @@ -1405,7 +1395,7 @@ def stop(self):
Afterward it is invalid to access this instance.
"""
if self._fd is not None:
self._pollobj.unregister(self._fd)
self._selector.unregister(self._fd)
os.close(self._fd)
self._fd = None
self._sys_proc_fun = None
Expand Down Expand Up @@ -1443,8 +1433,7 @@ def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
least with read_freq you might sleep.
@type threshold: int
@param timeout: see read_freq above. If provided, it must be set in
milliseconds. See
https://docs.python.org/3/library/select.html#select.poll.poll
milliseconds.
@type timeout: int
"""
# Init threading base class
Expand All @@ -1456,7 +1445,7 @@ def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
threshold, timeout)
# Create a new pipe used for thread termination
self._pipe = os.pipe()
self._pollobj.register(self._pipe[0], select.POLLIN)
self._selector.register(self._pipe[0], selectors.EVENT_READ)

def stop(self):
"""
Expand All @@ -1466,7 +1455,7 @@ def stop(self):
os.write(self._pipe[1], b'stop')
threading.Thread.join(self)
Notifier.stop(self)
self._pollobj.unregister(self._pipe[0])
self._selector.unregister(self._pipe[0])
os.close(self._pipe[0])
os.close(self._pipe[1])

Expand All @@ -1479,7 +1468,7 @@ def loop(self):
seconds at best and only if the size of events to read is >= threshold.
"""
# When the loop must be terminated .stop() is called, 'stop'
# is written to pipe fd so poll() returns and .check_events()
# is written to pipe fd so selector returns and .check_events()
# returns False which make evaluate the While's stop condition
# ._stop_event.isSet() wich put an end to the thread's execution.
while not self._stop_event.isSet():
Expand Down

0 comments on commit fe83134

Please sign in to comment.