-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PYTHON-4667 Handle $clusterTime from error responses in client Bulk Write #1822
Conversation
pymongo/asynchronous/client_bulk.py
Outdated
@@ -312,6 +312,9 @@ async def write_command( | |||
bwc._fail(request_id, failure, duration) | |||
# Top-level error will be embedded in ClientBulkWriteException. | |||
reply = {"error": exc} | |||
# Propagate $clusterTime to the caller. | |||
if "$clusterTime" in cmd: | |||
reply["$clusterTime"] = cmd["$clusterTime"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$clusterTime needs to be parsed from the (potential) server error response, not the command. We also need to do the same for operationTime.
This is the method that handles it, it probably makes more sense to call this method at a lower level if we can (perhaps in write_command):
client._process_response(command_response, session)
Or another alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test to test/mockupdb/test_cluster_time.py
? Something like:
def test_bulk_error(self):
def callback(client: MongoClient[dict]) -> None:
client.db.collection.bulk_write(
[InsertOne({}), InsertOne({})]
)
self.cluster_time_conversation(
callback,
[{"ok": 0, "errmsg": "mock error"}],
)
pymongo/asynchronous/bulk.py
Outdated
if isinstance(exc, OperationFailure): | ||
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] | ||
else: | ||
await client._process_response({}, bwc.session) # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need an else
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
pymongo/asynchronous/bulk.py
Outdated
@@ -308,6 +309,11 @@ async def write_command( | |||
|
|||
if bwc.publish: | |||
bwc._fail(request_id, failure, duration) | |||
# Process the response from the server. | |||
if isinstance(exc, OperationFailure): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isinstance(exc, (NotPrimaryError, OperationFailure))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Done |
test/mockupdb/test_cluster_time.py
Outdated
callback, | ||
[{"ok": 0, "errmsg": "mock error"}], | ||
) | ||
self.assertIn("$clusterTime", str(context.exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is skipping the clusterTime asserts in cluster_time_conversation. We need to do:
def callback(client: MongoClient[dict]) -> None:
with self.assertRaises(OperationFailure):
client.db.collection.bulk_write([InsertOne({}), InsertOne({})])
self.cluster_time_conversation(
callback,
[{"ok": 0, "errmsg": "mock error"}],
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add a similar test for client.bulk_write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done and done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once the tests pass!
No description provided.