Skip to content

Commit

Permalink
feat(shell-api): add options in stream processor start, stop, and drop
Browse files Browse the repository at this point in the history
  • Loading branch information
mongodb-matthew-normyle authored Nov 22, 2024
1 parent dce0b68 commit 3d751a2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
13 changes: 8 additions & 5 deletions packages/shell-api/src/stream-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ export default class StreamProcessor extends ShellApiWithMongoClass {
}

@returnsPromise
async start() {
async start(options: Document = {}) {
return await this._streams._runStreamCommand({
startStreamProcessor: this.name,
...options,
});
}

@returnsPromise
async stop() {
async stop(options: Document = {}) {
return await this._streams._runStreamCommand({
stopStreamProcessor: this.name,
...options,
});
}

@returnsPromise
async drop() {
return this._drop();
async drop(options: Document = {}) {
return this._drop(options);
}

async _drop() {
async _drop(options: Document = {}) {
return await this._streams._runStreamCommand({
dropStreamProcessor: this.name,
...options,
});
}

Expand Down
82 changes: 61 additions & 21 deletions packages/shell-api/src/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,67 @@ describe('Streams', function () {
});
});

// Create a stream processor.
const createProcessor = async (name: string) => {
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
return { runCmdStub, processor };
};

// Validate supplying options in start,stop, and drop commands.
describe('options', function () {
it('supplies options in start, stop, and drop', async function () {
const name = 'testOptions';
const { runCmdStub, processor } = await createProcessor(name);

// Start the stream processor with an extra option.
await processor.start({ resumeFromCheckpoint: false });
expect(
runCmdStub.calledWithExactly(
'admin',
{ startStreamProcessor: name, resumeFromCheckpoint: false },
{}
)
).to.be.true;

// Stop the stream processor with an extra option.
await processor.stop({ force: true });
expect(
runCmdStub.calledWithExactly(
'admin',
{ stopStreamProcessor: name, force: true },
{}
)
).to.be.true;

// Drop the stream processor with a few extra options.
const opts = {
force: true,
ttl: { unit: 'day', size: 30 },
};
await processor.drop(opts);
expect(
runCmdStub.calledWithExactly(
'admin',
{
dropStreamProcessor: name,
...opts,
},
{}
)
).to.be.true;
});
});

describe('modify', function () {
it('throws with invalid parameters', async function () {
// Create the stream processor.
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });
const name = 'p1';
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
const { processor } = await createProcessor('testModify');

// No arguments to modify.
const caught = await processor
Expand Down Expand Up @@ -206,17 +255,8 @@ describe('Streams', function () {
});

it('works with pipeline and options arguments', async function () {
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });

// Create the stream processor.
const name = 'p1';
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
const name = 'testModify';
const { runCmdStub, processor } = await createProcessor(name);

// Start the stream processor.
await processor.start();
Expand Down

0 comments on commit 3d751a2

Please sign in to comment.