Skip to content

Commit

Permalink
feat(NODE-6403): add CSOT support to client bulk write (#4261)
Browse files Browse the repository at this point in the history
Co-authored-by: Warren James <[email protected]>
  • Loading branch information
baileympearson and W-A-James authored Oct 14, 2024
1 parent c0d6ec9 commit 7df1a70
Show file tree
Hide file tree
Showing 13 changed files with 535 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
} finally {
timeout.clear();
}
}
return await drainEvent;
Expand Down
1 change: 1 addition & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export function onData(
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
finished = true;
timeoutForSocketRead?.clear();
const doneResult = { value: undefined, done: finished } as const;

for (const promise of unconsumedPromises) {
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export abstract class AbstractCursor<
options.timeoutMode ??
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
} else {
if (options.timeoutMode != null)
if (options.timeoutMode != null && options.timeoutContext == null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}

Expand Down
8 changes: 6 additions & 2 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
options: ClientBulkWriteCursorOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

Expand Down Expand Up @@ -72,7 +72,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
const response = await executeOperation(
this.client,
clientBulkWriteOperation,
this.timeoutContext
);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
Expand Down
16 changes: 14 additions & 2 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import {
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoServerError
} from '../../error';
import { type MongoClient } from '../../mongo_client';
import { TimeoutContext } from '../../timeout';
import { resolveTimeoutOptions } from '../../utils';
import { WriteConcern } from '../../write_concern';
import { executeOperation } from '../execute_operation';
import { ClientBulkWriteOperation } from './client_bulk_write';
Expand Down Expand Up @@ -70,17 +73,26 @@ export class ClientBulkWriteExecutor {
pkFactory
);
// Unacknowledged writes need to execute all batches and return { ok: 1}
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
const context = TimeoutContext.create(resolvedOptions);

if (this.options.writeConcern?.w === 0) {
while (commandBuilder.hasNextBatch()) {
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, context);
}
return { ok: 1 };
} else {
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
while (commandBuilder.hasNextBatch()) {
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
const cursorContext = new CursorTimeoutContext(context, Symbol());
const options = {
...this.options,
timeoutContext: cursorContext,
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
};
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
try {
await resultsMerger.merge(cursor);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export type ServerEvents = {
EventEmitterWithState;

/** @internal */
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
timeoutContext: TimeoutContext;
};

Expand Down
13 changes: 13 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ServerType } from './sdam/common';
import type { Server } from './sdam/server';
import type { Topology } from './sdam/topology';
import type { ClientSession } from './sessions';
import { type TimeoutContextOptions } from './timeout';
import { WriteConcern } from './write_concern';

/**
Expand Down Expand Up @@ -514,6 +515,18 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
return keys.length > 0 && keys[0][0] === '$';
}

export function resolveTimeoutOptions<T extends Partial<TimeoutContextOptions>>(
client: MongoClient,
options: T
): T &
Pick<
MongoClient['s']['options'],
'timeoutMS' | 'serverSelectionTimeoutMS' | 'waitQueueTimeoutMS' | 'socketTimeoutMS'
> {
const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } =
client.s.options;
return { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS, ...options };
}
/**
* Merge inherited properties from parent into options, prioritizing values from options,
* then values from parent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
promiseWithResolvers,
squashError
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';
import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils';
import { filterForCommands } from '../shared';

// TODO(NODE-5824): Implement CSOT prose tests
describe('CSOT spec prose tests', function () {
Expand Down Expand Up @@ -1183,9 +1184,9 @@ describe('CSOT spec prose tests', function () {
});
});

describe.skip(
describe(
'11. Multi-batch bulkWrites',
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
{ requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } },
function () {
/**
* ### 11. Multi-batch bulkWrites
Expand Down Expand Up @@ -1245,9 +1246,6 @@ describe('CSOT spec prose tests', function () {
}
};

let maxBsonObjectSize: number;
let maxMessageSizeBytes: number;

beforeEach(async function () {
await internalClient
.db('db')
Expand All @@ -1256,29 +1254,20 @@ describe('CSOT spec prose tests', function () {
.catch(() => null);
await internalClient.db('admin').command(failpoint);

const hello = await internalClient.db('admin').command({ hello: 1 });
maxBsonObjectSize = hello.maxBsonObjectSize;
maxMessageSizeBytes = hello.maxMessageSizeBytes;

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
it('performs two bulkWrites which fail to complete before 2000 ms', async function () {
const writes = [];
client.on('commandStarted', ev => writes.push(ev));
client.on('commandStarted', filterForCommands('bulkWrite', writes));

const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
const models = Array.from({ length }, () => ({
namespace: 'db.coll',
name: 'insertOne' as const,
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
}));
const models = await makeMultiBatchWrite(this.configuration);

const error = await client.bulkWrite(models).catch(error => error);

expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
expect(writes).to.have.lengthOf(2);
});
}
);
});
16 changes: 10 additions & 6 deletions test/integration/client-side-operations-timeout/node_csot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ describe('CSOT driver tests', metadata, () => {
.stub(Connection.prototype, 'readMany')
.callsFake(async function* (...args) {
const realIterator = readManyStub.wrappedMethod.call(this, ...args);
const cmd = commandSpy.lastCall.args.at(1);
if ('giveMeWriteErrors' in cmd) {
await realIterator.next().catch(() => null); // dismiss response
yield { parse: () => writeErrorsReply };
} else {
yield (await realIterator.next()).value;
try {
const cmd = commandSpy.lastCall.args.at(1);
if ('giveMeWriteErrors' in cmd) {
await realIterator.next().catch(() => null); // dismiss response
yield { parse: () => writeErrorsReply };
} else {
yield (await realIterator.next()).value;
}
} finally {
realIterator.return();
}
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';

import { Collection, type Db, type MongoClient } from '../../mongodb';
import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb';

describe('Collection Management and Db Management', function () {
let client: MongoClient;
Expand All @@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () {
});

it('returns a collection object after calling createCollection', async function () {
const collection = await db.createCollection('collection');
const collection = await db.createCollection(new ObjectId().toHexString());
expect(collection).to.be.instanceOf(Collection);
});

Expand Down
Loading

0 comments on commit 7df1a70

Please sign in to comment.