Skip to content

Commit

Permalink
blocking client: fix connect and timeout (#499)
Browse files Browse the repository at this point in the history
* fix ipv6 connection issues
* Fix wait_for() in blocking client to raise internal TimeoutError
* Add manual name resolve in blocking client

   This will try to connect to all resolved addresses. Timeout handling is also improved.

---------

Co-authored-by: Zachary Juang <[email protected]>
  • Loading branch information
fantix and zachary822 authored Jun 19, 2024
1 parent d88187a commit 28a83fd
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
73 changes: 54 additions & 19 deletions edgedb/blocking_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,57 @@ async def connect_addr(self, addr, timeout):

if isinstance(addr, str):
# UNIX socket
sock = socket.socket(socket.AF_UNIX)
res_list = [(socket.AF_UNIX, socket.SOCK_STREAM, -1, None, addr)]
else:
sock = socket.socket(socket.AF_INET)

try:
sock.settimeout(timeout)
host, port = addr
try:
# getaddrinfo() doesn't take timeout!!
res_list = socket.getaddrinfo(
host, port, socket.AF_UNSPEC, socket.SOCK_STREAM
)
except socket.gaierror as e:
# All name resolution errors are considered temporary
err = errors.ClientConnectionFailedTemporarilyError(str(e))
raise err from e

for i, res in enumerate(res_list):
af, socktype, proto, _, sa = res
try:
sock.connect(addr)
sock = socket.socket(af, socktype, proto)
except OSError as e:
sock.close()
if i < len(res_list) - 1:
continue
else:
raise con_utils.wrap_error(e) from e
try:
await self._connect_addr(sock, addr, sa, deadline)
except TimeoutError:
raise
except Exception:
if i < len(res_list) - 1:
continue
else:
raise
else:
break

if not isinstance(addr, str):
time_left = deadline - time.monotonic()
if time_left <= 0:
raise TimeoutError
async def _connect_addr(self, sock, addr, sa, deadline):
try:
time_left = deadline - time.monotonic()
if time_left <= 0:
raise TimeoutError
try:
sock.settimeout(time_left)
sock.connect(sa)
except OSError as e:
raise con_utils.wrap_error(e) from e

if not isinstance(addr, str):
time_left = deadline - time.monotonic()
if time_left <= 0:
raise TimeoutError
try:
# Upgrade to TLS
sock.settimeout(time_left)
try:
Expand All @@ -74,12 +110,8 @@ async def connect_addr(self, addr, timeout):
raise con_utils.wrap_error(e) from e
else:
con_utils.check_alpn_protocol(sock)
except socket.gaierror as e:
# All name resolution errors are considered temporary
err = errors.ClientConnectionFailedTemporarilyError(str(e))
raise err from e
except OSError as e:
raise con_utils.wrap_error(e) from e
except OSError as e:
raise con_utils.wrap_error(e) from e

time_left = deadline - time.monotonic()
if time_left <= 0:
Expand All @@ -92,9 +124,9 @@ async def connect_addr(self, addr, timeout):
proto.set_connection(self)

try:
sock.settimeout(time_left)
await proto.connect()
sock.settimeout(None)
await proto.wait_for(proto.connect(), time_left)
except TimeoutError:
raise
except OSError as e:
raise con_utils.wrap_error(e) from e

Expand Down Expand Up @@ -133,6 +165,9 @@ async def close(self, timeout=None):
await self._protocol.wait_for(
self._protocol.wait_for_disconnect(), timeout
)
except TimeoutError:
self.terminate()
raise errors.QueryTimeoutError()
except Exception:
self.terminate()
raise
Expand Down
12 changes: 4 additions & 8 deletions edgedb/protocol/blocking_proto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,14 @@ cdef class BlockingIOProtocol(protocol.SansIOProtocolBackwardsCompatible):
async def wait_for_message(self):
cdef float timeout
if self.deadline > 0:
timeout = self.deadline - time.monotonic()
if timeout <= 0:
self.abort()
raise errors.QueryTimeoutError()
while not self.buffer.take_message():
timeout = self.deadline - time.monotonic()
if timeout <= 0:
self.abort()
raise TimeoutError
try:
self.sock.settimeout(timeout)
data = self.sock.recv(RECV_BUF)
timeout = self.deadline - time.monotonic()
if timeout <= 0:
self.abort()
raise TimeoutError
except OSError as e:
self._disconnect()
raise con_utils.wrap_error(e) from e
Expand Down

0 comments on commit 28a83fd

Please sign in to comment.