Skip to content

Commit

Permalink
fixed DummyConnection issue
Browse files Browse the repository at this point in the history
Signed-off-by: Margulanz <[email protected]>
  • Loading branch information
margulanz committed Apr 16, 2023
1 parent 03b72c3 commit d79a368
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
8 changes: 5 additions & 3 deletions opensearchpy/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def mark_dead(self, connection, now=None):
"""
# allow inject for testing purposes
now = now if now else time.time()
print(id(connection), now)
try:
self.connections.remove(connection)
except ValueError:
Expand All @@ -187,7 +188,8 @@ def mark_dead(self, connection, now=None):
dead_count = self.dead_count.get(connection, 0) + 1
self.dead_count[connection] = dead_count
timeout = self.dead_timeout * 2 ** min(dead_count - 1, self.timeout_cutoff)
self.dead.put((now + timeout, connection))
#time.sleep(.00001)
self.dead.put((now + timeout,id(connection),connection))
logger.warning(
"Connection %r has failed for %i times in a row, putting on %i second timeout.",
connection,
Expand Down Expand Up @@ -232,7 +234,7 @@ def resurrect(self, force=False):

try:
# retrieve a connection to check
timeout, connection = self.dead.get(block=False)
timeout,connection_id, connection = self.dead.get(block=False)
except Empty:
# other thread has been faster and the queue is now empty. If we
# are forced, return a connection at random again.
Expand All @@ -242,7 +244,7 @@ def resurrect(self, force=False):

if not force and timeout > time.time():
# return it back if not eligible and not forced
self.dead.put((timeout, connection))
self.dead.put((timeout,id(connection), connection))
return

# either we were forced or the connection is elligible to be retried
Expand Down
6 changes: 3 additions & 3 deletions test_opensearchpy/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_dead_nodes_are_removed_from_active_connections(self):
pool.mark_dead(42, now=now)
self.assertEqual(99, len(pool.connections))
self.assertEqual(1, pool.dead.qsize())
self.assertEqual((now + 60, 42), pool.dead.get())
self.assertEqual((now + 60,id(42), 42), pool.dead.get())

def test_connection_is_skipped_when_dead(self):
pool = ConnectionPool([(x, {}) for x in range(2)])
Expand Down Expand Up @@ -145,7 +145,7 @@ def test_already_failed_connection_has_longer_timeout(self):
pool.mark_dead(42, now=now)

self.assertEqual(3, pool.dead_count[42])
self.assertEqual((now + 4 * 60, 42), pool.dead.get())
self.assertEqual((now + 4 * 60,id(42), 42), pool.dead.get())

def test_timeout_for_failed_connections_is_limitted(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
Expand All @@ -154,7 +154,7 @@ def test_timeout_for_failed_connections_is_limitted(self):
pool.mark_dead(42, now=now)

self.assertEqual(246, pool.dead_count[42])
self.assertEqual((now + 32 * 60, 42), pool.dead.get())
self.assertEqual((now + 32 * 60,id(42), 42), pool.dead.get())

def test_dead_count_is_wiped_clean_for_connection_if_marked_live(self):
pool = ConnectionPool([(x, {}) for x in range(100)])
Expand Down
2 changes: 1 addition & 1 deletion test_opensearchpy/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_kwargs_passed_on_to_connections(self):
self.assertEqual("http://google.com:123", t.connection_pool.connections[0].host)

def test_kwargs_passed_on_to_connection_pool(self):
dt = object()
dt = 60
t = Transport([{}, {}], dead_timeout=dt)
self.assertIs(dt, t.connection_pool.dead_timeout)

Expand Down

0 comments on commit d79a368

Please sign in to comment.