Skip to content

Commit

Permalink
Added channel lock (#324)
Browse files Browse the repository at this point in the history
* Added channel lock
* Updated changelog
  • Loading branch information
pkittenis authored Oct 31, 2021
1 parent a98114e commit 004379f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
1 change: 1 addition & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Fixes
-----

* ``HostOutput`` would have empty host on some exceptions when ``stop_on_errors`` is ``False`` - #297
* Race condition when forcefully closing channel via ``SSHClient.close_channel`` while channel data was left unread.

2.6.0
+++++
Expand Down
13 changes: 9 additions & 4 deletions pssh/clients/native/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from warnings import warn

from gevent import sleep, spawn, get_hub
from gevent.lock import RLock
from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN
from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \
Timeout as SSH2Timeout
Expand Down Expand Up @@ -127,6 +128,7 @@ def __init__(self, host,
identity_auth=identity_auth,
)
proxy_host = '127.0.0.1'
self._chan_lock = RLock()
super(SSHClient, self).__init__(
host, user=user, password=password, port=port, pkey=pkey,
num_retries=num_retries, retry_delay=retry_delay,
Expand Down Expand Up @@ -291,10 +293,12 @@ def execute(self, cmd, use_pty=False, channel=None):
def _read_output_to_buffer(self, read_func, _buffer):
try:
while True:
size, data = read_func()
with self._chan_lock:
size, data = read_func()
while size == LIBSSH2_ERROR_EAGAIN:
self.poll()
size, data = read_func()
with self._chan_lock:
size, data = read_func()
if size <= 0:
break
_buffer.write(data)
Expand Down Expand Up @@ -325,8 +329,9 @@ def wait_finished(self, host_output, timeout=None):
self.close_channel(channel)

def close_channel(self, channel):
logger.debug("Closing channel")
self._eagain(channel.close)
with self._chan_lock:
logger.debug("Closing channel")
self._eagain(channel.close)

def _eagain(self, func, *args, **kwargs):
return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs)
Expand Down

0 comments on commit 004379f

Please sign in to comment.