Skip to content

Commit

Permalink
TestClusterTime for async suite
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahStapp committed Jul 15, 2024
1 parent f5929a4 commit 71d5afb
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 172 deletions.
180 changes: 94 additions & 86 deletions test/asynchronous/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,92 +1157,100 @@ async def test_cluster_time_no_server_support(self):
self.assertIsNone(after_cluster_time)


# class TestClusterTime(AsyncIntegrationTest):
# def setUp(self):
# super().setUp()
# if "$clusterTime" not in async_client_context.hello:
# raise SkipTest("$clusterTime not supported")
#
# def test_cluster_time(self):
# listener = SessionTestListener()
# # Prevent heartbeats from updating $clusterTime between operations.
# client = rs_or_single_client(event_listeners=[listener], heartbeatFrequencyMS=999999)
# self.addCleanup(client.aclose)
# collection = client.pymongo_test.collection
# # Prepare for tests of find() and aggregate().
# collection.insert_many([{} for _ in range(10)])
# self.addCleanup(collection.drop)
# self.addCleanup(client.pymongo_test.collection2.drop)
#
# def rename_and_drop():
# # Ensure collection exists.
# collection.insert_one({})
# collection.rename("collection2")
# client.pymongo_test.collection2.drop()
#
# def insert_and_find():
# cursor = collection.find().batch_size(1)
# for _ in range(10):
# # Advance the cluster time.
# collection.insert_one({})
# next(cursor)
#
# cursor.close()
#
# def insert_and_aggregate():
# cursor = collection.aggregate([], batchSize=1).batch_size(1)
# for _ in range(5):
# # Advance the cluster time.
# collection.insert_one({})
# next(cursor)
#
# cursor.close()
#
# ops = [
# # Tests from Driver Sessions Spec.
# ("ping", lambda: client.admin.command("ping")),
# ("aggregate", lambda: list(collection.aggregate([]))),
# ("find", lambda: list(collection.find())),
# ("insert_one", lambda: collection.insert_one({})),
# # Additional PyMongo tests.
# ("insert_and_find", insert_and_find),
# ("insert_and_aggregate", insert_and_aggregate),
# ("update_one", lambda: collection.update_one({}, {"$set": {"x": 1}})),
# ("update_many", lambda: collection.update_many({}, {"$set": {"x": 1}})),
# ("delete_one", lambda: collection.delete_one({})),
# ("delete_many", lambda: collection.delete_many({})),
# ("bulk_write", lambda: collection.bulk_write([InsertOne({})])),
# ("rename_and_drop", rename_and_drop),
# ]
#
# for _name, f in ops:
# listener.reset()
# # Call f() twice, insert to advance clusterTime, call f() again.
# f()
# f()
# collection.insert_one({})
# f()
#
# self.assertGreaterEqual(len(listener.started_events), 1)
# for i, event in enumerate(listener.started_events):
# self.assertTrue(
# "$clusterTime" in event.command,
# f"{f.__name__} sent no $clusterTime with {event.command_name}",
# )
#
# if i > 0:
# succeeded = listener.succeeded_events[i - 1]
# self.assertTrue(
# "$clusterTime" in succeeded.reply,
# f"{f.__name__} received no $clusterTime with {succeeded.command_name}",
# )
#
# self.assertTrue(
# event.command["$clusterTime"]["clusterTime"]
# >= succeeded.reply["$clusterTime"]["clusterTime"],
# f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
# )
#
class TestClusterTime(AsyncIntegrationTest):
async def asyncSetUp(self):
await super().asyncSetUp()
if "$clusterTime" not in (await async_client_context.hello):
raise SkipTest("$clusterTime not supported")

async def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = await async_rs_or_single_client(
event_listeners=[listener], heartbeatFrequencyMS=999999
)
self.addAsyncCleanup(client.aclose)
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
await collection.insert_many([{} for _ in range(10)])
self.addAsyncCleanup(collection.drop)
self.addAsyncCleanup(client.pymongo_test.collection2.drop)

async def rename_and_drop():
# Ensure collection exists.
await collection.insert_one({})
await collection.rename("collection2")
await client.pymongo_test.collection2.drop()

async def insert_and_find():
cursor = (await collection.find()).batch_size(1)
for _ in range(10):
# Advance the cluster time.
await collection.insert_one({})
await anext(cursor)

await cursor.close()

async def insert_and_aggregate():
cursor = (await collection.aggregate([], batchSize=1)).batch_size(1)
for _ in range(5):
# Advance the cluster time.
await collection.insert_one({})
await anext(cursor)

await cursor.close()

async def lambda_aggregate():
await (await collection.aggregate([])).to_list()

async def lambda_find():
await (await collection.find()).to_list()

ops = [
# Tests from Driver Sessions Spec.
("ping", lambda: client.admin.command("ping")),
("aggregate", lambda: lambda_aggregate()),
("find", lambda: lambda_find()),
("insert_one", lambda: collection.insert_one({})),
# Additional PyMongo tests.
("insert_and_find", insert_and_find),
("insert_and_aggregate", insert_and_aggregate),
("update_one", lambda: collection.update_one({}, {"$set": {"x": 1}})),
("update_many", lambda: collection.update_many({}, {"$set": {"x": 1}})),
("delete_one", lambda: collection.delete_one({})),
("delete_many", lambda: collection.delete_many({})),
("bulk_write", lambda: collection.bulk_write([InsertOne({})])),
("rename_and_drop", rename_and_drop),
]

for _name, f in ops:
listener.reset()
# Call f() twice, insert to advance clusterTime, call f() again.
await f()
await f()
await collection.insert_one({})
await f()

self.assertGreaterEqual(len(listener.started_events), 1)
for i, event in enumerate(listener.started_events):
self.assertTrue(
"$clusterTime" in event.command,
f"{f.__name__} sent no $clusterTime with {event.command_name}",
)

if i > 0:
succeeded = listener.succeeded_events[i - 1]
self.assertTrue(
"$clusterTime" in succeeded.reply,
f"{f.__name__} received no $clusterTime with {succeeded.command_name}",
)

self.assertTrue(
event.command["$clusterTime"]["clusterTime"]
>= succeeded.reply["$clusterTime"]["clusterTime"],
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
)


if __name__ == "__main__":
unittest.main()
178 changes: 92 additions & 86 deletions test/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,92 +1143,98 @@ def test_cluster_time_no_server_support(self):
self.assertIsNone(after_cluster_time)


# class TestClusterTime(IntegrationTest):
# def setUp(self):
# super().setUp()
# if "$clusterTime" not in client_context.hello:
# raise SkipTest("$clusterTime not supported")
#
# def test_cluster_time(self):
# listener = SessionTestListener()
# # Prevent heartbeats from updating $clusterTime between operations.
# client = rs_or_single_client(event_listeners=[listener], heartbeatFrequencyMS=999999)
# self.addCleanup(client.close)
# collection = client.pymongo_test.collection
# # Prepare for tests of find() and aggregate().
# collection.insert_many([{} for _ in range(10)])
# self.addCleanup(collection.drop)
# self.addCleanup(client.pymongo_test.collection2.drop)
#
# def rename_and_drop():
# # Ensure collection exists.
# collection.insert_one({})
# collection.rename("collection2")
# client.pymongo_test.collection2.drop()
#
# def insert_and_find():
# cursor = collection.find().batch_size(1)
# for _ in range(10):
# # Advance the cluster time.
# collection.insert_one({})
# next(cursor)
#
# cursor.close()
#
# def insert_and_aggregate():
# cursor = collection.aggregate([], batchSize=1).batch_size(1)
# for _ in range(5):
# # Advance the cluster time.
# collection.insert_one({})
# next(cursor)
#
# cursor.close()
#
# ops = [
# # Tests from Driver Sessions Spec.
# ("ping", lambda: client.admin.command("ping")),
# ("aggregate", lambda: list(collection.aggregate([]))),
# ("find", lambda: list(collection.find())),
# ("insert_one", lambda: collection.insert_one({})),
# # Additional PyMongo tests.
# ("insert_and_find", insert_and_find),
# ("insert_and_aggregate", insert_and_aggregate),
# ("update_one", lambda: collection.update_one({}, {"$set": {"x": 1}})),
# ("update_many", lambda: collection.update_many({}, {"$set": {"x": 1}})),
# ("delete_one", lambda: collection.delete_one({})),
# ("delete_many", lambda: collection.delete_many({})),
# ("bulk_write", lambda: collection.bulk_write([InsertOne({})])),
# ("rename_and_drop", rename_and_drop),
# ]
#
# for _name, f in ops:
# listener.reset()
# # Call f() twice, insert to advance clusterTime, call f() again.
# f()
# f()
# collection.insert_one({})
# f()
#
# self.assertGreaterEqual(len(listener.started_events), 1)
# for i, event in enumerate(listener.started_events):
# self.assertTrue(
# "$clusterTime" in event.command,
# f"{f.__name__} sent no $clusterTime with {event.command_name}",
# )
#
# if i > 0:
# succeeded = listener.succeeded_events[i - 1]
# self.assertTrue(
# "$clusterTime" in succeeded.reply,
# f"{f.__name__} received no $clusterTime with {succeeded.command_name}",
# )
#
# self.assertTrue(
# event.command["$clusterTime"]["clusterTime"]
# >= succeeded.reply["$clusterTime"]["clusterTime"],
# f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
# )
#
class TestClusterTime(IntegrationTest):
def setUp(self):
super().setUp()
if "$clusterTime" not in (client_context.hello):
raise SkipTest("$clusterTime not supported")

def test_cluster_time(self):
listener = SessionTestListener()
# Prevent heartbeats from updating $clusterTime between operations.
client = rs_or_single_client(event_listeners=[listener], heartbeatFrequencyMS=999999)
self.addCleanup(client.close)
collection = client.pymongo_test.collection
# Prepare for tests of find() and aggregate().
collection.insert_many([{} for _ in range(10)])
self.addCleanup(collection.drop)
self.addCleanup(client.pymongo_test.collection2.drop)

def rename_and_drop():
# Ensure collection exists.
collection.insert_one({})
collection.rename("collection2")
client.pymongo_test.collection2.drop()

def insert_and_find():
cursor = (collection.find()).batch_size(1)
for _ in range(10):
# Advance the cluster time.
collection.insert_one({})
next(cursor)

cursor.close()

def insert_and_aggregate():
cursor = (collection.aggregate([], batchSize=1)).batch_size(1)
for _ in range(5):
# Advance the cluster time.
collection.insert_one({})
next(cursor)

cursor.close()

def lambda_aggregate():
(collection.aggregate([])).to_list()

def lambda_find():
(collection.find()).to_list()

ops = [
# Tests from Driver Sessions Spec.
("ping", lambda: client.admin.command("ping")),
("aggregate", lambda: lambda_aggregate()),
("find", lambda: lambda_find()),
("insert_one", lambda: collection.insert_one({})),
# Additional PyMongo tests.
("insert_and_find", insert_and_find),
("insert_and_aggregate", insert_and_aggregate),
("update_one", lambda: collection.update_one({}, {"$set": {"x": 1}})),
("update_many", lambda: collection.update_many({}, {"$set": {"x": 1}})),
("delete_one", lambda: collection.delete_one({})),
("delete_many", lambda: collection.delete_many({})),
("bulk_write", lambda: collection.bulk_write([InsertOne({})])),
("rename_and_drop", rename_and_drop),
]

for _name, f in ops:
listener.reset()
# Call f() twice, insert to advance clusterTime, call f() again.
f()
f()
collection.insert_one({})
f()

self.assertGreaterEqual(len(listener.started_events), 1)
for i, event in enumerate(listener.started_events):
self.assertTrue(
"$clusterTime" in event.command,
f"{f.__name__} sent no $clusterTime with {event.command_name}",
)

if i > 0:
succeeded = listener.succeeded_events[i - 1]
self.assertTrue(
"$clusterTime" in succeeded.reply,
f"{f.__name__} received no $clusterTime with {succeeded.command_name}",
)

self.assertTrue(
event.command["$clusterTime"]["clusterTime"]
>= succeeded.reply["$clusterTime"]["clusterTime"],
f"{f.__name__} sent wrong $clusterTime with {event.command_name}",
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 71d5afb

Please sign in to comment.