Skip to content

Commit

Permalink
Revert "Fix test failures (mongodb#1970)"
Browse files Browse the repository at this point in the history
This reverts commit cd69b36.
  • Loading branch information
NoahStapp committed Nov 4, 2024
1 parent 05d2abb commit 966eb4e
Show file tree
Hide file tree
Showing 18 changed files with 2,756 additions and 623 deletions.
2,747 changes: 2,655 additions & 92 deletions .evergreen/config.yml

Large diffs are not rendered by default.

547 changes: 67 additions & 480 deletions .evergreen/scripts/generate_config.py

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"""
from __future__ import annotations

import asyncio
import contextlib
import os
import warnings
Expand Down Expand Up @@ -2037,8 +2036,6 @@ async def _process_kill_cursors(self) -> None:
for address, cursor_id, conn_mgr in pinned_cursors:
try:
await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it
Expand All @@ -2053,8 +2050,6 @@ async def _process_kill_cursors(self) -> None:
for address, cursor_ids in address_to_cursor_ids.items():
try:
await self._kill_cursors(cursor_ids, address, topology, session=None)
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
raise
Expand All @@ -2069,8 +2064,6 @@ async def _process_periodic_tasks(self) -> None:
try:
await self._process_kill_cursors()
await self._topology.update_pool()
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
return
Expand Down
29 changes: 12 additions & 17 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import annotations

import asyncio
import atexit
import logging
import time
Expand All @@ -27,7 +26,7 @@
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _async_create_lock
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
from pymongo.periodic_executor import _shutdown_executors
from pymongo.pool_options import _is_faas
Expand Down Expand Up @@ -277,7 +276,7 @@ async def _check_server(self) -> ServerDescription:
await self._reset_connection()
if isinstance(error, _OperationCancelled):
raise
await self._rtt_monitor.reset()
self._rtt_monitor.reset()
# Server type defaults to Unknown.
return ServerDescription(address, error=error)

Expand Down Expand Up @@ -316,9 +315,9 @@ async def _check_once(self) -> ServerDescription:
self._cancel_context = conn.cancel_context
response, round_trip_time = await self._check_with_socket(conn)
if not response.awaitable:
await self._rtt_monitor.add_sample(round_trip_time)
self._rtt_monitor.add_sample(round_trip_time)

avg_rtt, min_rtt = await self._rtt_monitor.get()
avg_rtt, min_rtt = self._rtt_monitor.get()
sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt)
if self._publish:
assert self._listeners is not None
Expand Down Expand Up @@ -414,8 +413,6 @@ def _get_seedlist(self) -> Optional[list[tuple[str, Any]]]:
if len(seedlist) == 0:
# As per the spec: this should be treated as a failure.
raise Exception
except asyncio.CancelledError:
raise
except Exception:
# As per the spec, upon encountering an error:
# - An error must not be raised
Expand Down Expand Up @@ -444,28 +441,28 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool
self._pool = pool
self._moving_average = MovingAverage()
self._moving_min = MovingMinimum()
self._lock = _async_create_lock()
self._lock = _create_lock()

async def close(self) -> None:
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
await self._pool.reset()

async def add_sample(self, sample: float) -> None:
def add_sample(self, sample: float) -> None:
"""Add a RTT sample."""
async with self._lock:
with self._lock:
self._moving_average.add_sample(sample)
self._moving_min.add_sample(sample)

async def get(self) -> tuple[Optional[float], float]:
def get(self) -> tuple[Optional[float], float]:
"""Get the calculated average, or None if no samples yet and the min."""
async with self._lock:
with self._lock:
return self._moving_average.get(), self._moving_min.get()

async def reset(self) -> None:
def reset(self) -> None:
"""Reset the average RTT."""
async with self._lock:
with self._lock:
self._moving_average.reset()
self._moving_min.reset()

Expand All @@ -475,12 +472,10 @@ async def _run(self) -> None:
# heartbeat protocol (MongoDB 4.4+).
# XXX: Skip check if the server is unknown?
rtt = await self._ping()
await self.add_sample(rtt)
self.add_sample(rtt)
except ReferenceError:
# Topology was garbage-collected.
await self.close()
except asyncio.CancelledError:
raise
except Exception:
await self._pool.reset()

Expand Down
2 changes: 0 additions & 2 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,6 @@ def _close_conn(self) -> None:
# shutdown.
try:
self.conn.close()
except asyncio.CancelledError:
raise
except Exception: # noqa: S110
pass

Expand Down
3 changes: 1 addition & 2 deletions pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,7 @@ async def async_receive_data(
)
for task in pending:
task.cancel()
if pending:
await asyncio.wait(pending)
await asyncio.wait(pending)
if len(done) == 0:
raise socket.timeout("timed out")
if read_task in done:
Expand Down
7 changes: 0 additions & 7 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"""
from __future__ import annotations

import asyncio
import contextlib
import os
import warnings
Expand Down Expand Up @@ -2031,8 +2030,6 @@ def _process_kill_cursors(self) -> None:
for address, cursor_id, conn_mgr in pinned_cursors:
try:
self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it
Expand All @@ -2047,8 +2044,6 @@ def _process_kill_cursors(self) -> None:
for address, cursor_ids in address_to_cursor_ids.items():
try:
self._kill_cursors(cursor_ids, address, topology, session=None)
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
raise
Expand All @@ -2063,8 +2058,6 @@ def _process_periodic_tasks(self) -> None:
try:
self._process_kill_cursors()
self._topology.update_pool()
except asyncio.CancelledError:
raise
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
return
Expand Down
5 changes: 0 additions & 5 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import annotations

import asyncio
import atexit
import logging
import time
Expand Down Expand Up @@ -414,8 +413,6 @@ def _get_seedlist(self) -> Optional[list[tuple[str, Any]]]:
if len(seedlist) == 0:
# As per the spec: this should be treated as a failure.
raise Exception
except asyncio.CancelledError:
raise
except Exception:
# As per the spec, upon encountering an error:
# - An error must not be raised
Expand Down Expand Up @@ -479,8 +476,6 @@ def _run(self) -> None:
except ReferenceError:
# Topology was garbage-collected.
self.close()
except asyncio.CancelledError:
raise
except Exception:
self._pool.reset()

Expand Down
2 changes: 0 additions & 2 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,6 @@ def _close_conn(self) -> None:
# shutdown.
try:
self.conn.close()
except asyncio.CancelledError:
raise
except Exception: # noqa: S110
pass

Expand Down
4 changes: 0 additions & 4 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import asyncio
import gc
import logging
import multiprocessing
import os
import signal
Expand All @@ -26,7 +25,6 @@
import sys
import threading
import time
import traceback
import unittest
import warnings
from asyncio import iscoroutinefunction
Expand Down Expand Up @@ -193,8 +191,6 @@ def _connect(self, host, port, **kwargs):
client.close()

def _init_client(self):
self.mongoses = []
self.connection_attempts = []
self.client = self._connect(host, port)
if self.client is not None:
# Return early when connected to dataLake as mongohoused does not
Expand Down
4 changes: 0 additions & 4 deletions test/asynchronous/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import asyncio
import gc
import logging
import multiprocessing
import os
import signal
Expand All @@ -26,7 +25,6 @@
import sys
import threading
import time
import traceback
import unittest
import warnings
from asyncio import iscoroutinefunction
Expand Down Expand Up @@ -193,8 +191,6 @@ async def _connect(self, host, port, **kwargs):
await client.close()

async def _init_client(self):
self.mongoses = []
self.connection_attempts = []
self.client = await self._connect(host, port)
if self.client is not None:
# Return early when connected to dataLake as mongohoused does not
Expand Down
2 changes: 1 addition & 1 deletion test/asynchronous/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2580,7 +2580,7 @@ async def test_direct_client_maintains_pool_to_arbiter(self):
await async_wait_until(lambda: len(c.nodes) == 1, "connect")
self.assertEqual(await c.address, ("c", 3))
# Assert that we create 1 pooled connection.
await listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 1)
listener.wait_for_event(monitoring.ConnectionReadyEvent, 1)
self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1)
arbiter = c._topology.get_server_by_address(("c", 3))
self.assertEqual(len(arbiter.pool.conns), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest):
listener: CMAPListener
coll: AsyncCollection

async def asyncTearDown(self):
await reset_client_context()

@async_client_context.require_replica_set
async def asyncSetUp(self):
self.listener = CMAPListener()
Expand Down
5 changes: 5 additions & 0 deletions test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ async def asyncSetUp(self):
# initialize internals
self.match_evaluator = MatchEvaluatorUtil(self)

async def asyncTearDown(self):
for client in self.mongos_clients:
await client.close()
await super().asyncTearDown()

def maybe_skip_test(self, spec):
# add any special-casing for skipping tests here
if async_client_context.storage_engine == "mmapv1":
Expand Down
2 changes: 2 additions & 0 deletions test/asynchronous/utils_spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ async def asyncSetUp(self) -> None:

async def asyncTearDown(self) -> None:
self.knobs.disable()
for client in self.mongos_clients:
await client.close()

async def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
Expand Down
3 changes: 3 additions & 0 deletions test/test_connections_survive_primary_stepdown_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class TestConnectionsSurvivePrimaryStepDown(IntegrationTest):
listener: CMAPListener
coll: Collection

def tearDown(self):
reset_client_context()

@client_context.require_replica_set
def setUp(self):
self.listener = CMAPListener()
Expand Down
5 changes: 5 additions & 0 deletions test/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ def setUp(self):
# initialize internals
self.match_evaluator = MatchEvaluatorUtil(self)

def tearDown(self):
for client in self.mongos_clients:
client.close()
super().tearDown()

def maybe_skip_test(self, spec):
# add any special-casing for skipping tests here
if client_context.storage_engine == "mmapv1":
Expand Down
2 changes: 2 additions & 0 deletions test/utils_spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ def setUp(self) -> None:

def tearDown(self) -> None:
self.knobs.disable()
for client in self.mongos_clients:
client.close()

def _set_fail_point(self, client, command_args):
cmd = SON([("configureFailPoint", "failCommand")])
Expand Down

0 comments on commit 966eb4e

Please sign in to comment.