Skip to content

feat: reintroduce clone and rewind for cursors #2647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ export class ChangeStreamCursor extends AbstractCursor {
}
}

clone(): ChangeStreamCursor {
return new ChangeStreamCursor(this.topology, this.namespace, this.pipeline, {
...this.cursorOptions
});
}

_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(
{ s: { namespace: this.namespace } },
Expand Down
34 changes: 33 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,38 @@ export abstract class AbstractCursor extends EventEmitter {
return this;
}

/**
* Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
* remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
* if the resultant data has already been retrieved by this cursor.
*/
rewind(): void {
if (!this[kInitialized]) {
return;
}

this[kId] = undefined;
this[kDocuments] = [];
this[kClosed] = false;
this[kKilled] = false;
this[kInitialized] = false;

const session = this[kSession];
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false && !session.hasEnded) {
session.endSession();
}

this[kSession] = undefined;
}
}

/**
* Returns a new uninitialized copy of this cursor, with options matching those that have been set on the current instance
*/
abstract clone(): AbstractCursor;

/* @internal */
abstract _initialize(
session: ClientSession | undefined,
Expand Down Expand Up @@ -552,7 +584,7 @@ function next(
if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
if (cursor[kSession] == null && cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: true });
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
}

cursor._initialize(cursor[kSession], (err, state) => {
Expand Down
7 changes: 7 additions & 0 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ export class AggregationCursor extends AbstractCursor {
return this[kPipeline];
}

clone(): AggregationCursor {
return new AggregationCursor(this[kParent], this.topology, this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(this[kParent], this[kPipeline], {
Expand Down
7 changes: 7 additions & 0 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ export class FindCursor extends AbstractCursor {
}
}

clone(): FindCursor {
return new FindCursor(this.topology, this.namespace, this[kFilter], {
...this[kBuiltOptions],
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], {
Expand Down
7 changes: 7 additions & 0 deletions src/operations/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ export class ListIndexesCursor extends AbstractCursor {
this.options = options;
}

clone(): ListIndexesCursor {
return new ListIndexesCursor(this.parent, {
...this.options,
...this.cursorOptions
});
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const operation = new ListIndexesOperation(this.parent, {
Expand Down
7 changes: 7 additions & 0 deletions src/operations/list_collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ export class ListCollectionsCursor extends AbstractCursor {
this.options = options;
}

clone(): ListCollectionsCursor {
return new ListCollectionsCursor(this.parent, this.filter, {
...this.options,
...this.cursorOptions
});
}

_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
const operation = new ListCollectionsOperation(this.parent, this.filter, {
...this.cursorOptions,
Expand Down
131 changes: 125 additions & 6 deletions test/functional/abstract_cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('AbstractCursor', function () {
before(
withClientV2((client, done) => {
const docs = [{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }, { a: 6 }];
const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const tryNextColl = client.db().collection('try_next');
coll.drop(() => tryNextColl.drop(() => coll.insertMany(docs, done)));
})
Expand All @@ -35,7 +35,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['getMore'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
this.defer(() => cursor.close());

Expand All @@ -56,7 +56,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.next(err => {
expect(err).to.not.exist;
Expand All @@ -75,7 +75,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.toArray(err => {
expect(err).to.not.exist;
Expand All @@ -95,7 +95,7 @@ describe('AbstractCursor', function () {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));

const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.close(err => {
expect(err).to.not.exist;
Expand All @@ -110,7 +110,7 @@ describe('AbstractCursor', function () {
it(
'should iterate each document in a cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('find_cursor');
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });

const bag = [];
Expand Down Expand Up @@ -159,4 +159,123 @@ describe('AbstractCursor', function () {
})
);
});

context('#clone', function () {
it(
'should clone a find cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(cursor).property('closed').to.be.true;

const clonedCursor = cursor.clone();
this.defer(() => clonedCursor.close());

clonedCursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(clonedCursor).property('closed').to.be.true;
done();
});
});
})
);

it(
'should clone an aggregate cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.aggregate([{ $match: {} }]);
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(cursor).property('closed').to.be.true;

const clonedCursor = cursor.clone();
this.defer(() => clonedCursor.close());

clonedCursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);
expect(clonedCursor).property('closed').to.be.true;
done();
});
});
})
);
});

context('#rewind', function () {
it(
'should rewind a cursor',
withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({});
this.defer(() => cursor.close());

cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);

cursor.rewind();
cursor.toArray((err, docs) => {
expect(err).to.not.exist;
expect(docs).to.have.length(6);

done();
});
});
})
);

it('should end an implicit session on rewind', {
metadata: { requires: { mongodb: '>=3.6' } },
test: withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 1 });
this.defer(() => cursor.close());

cursor.next((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

const session = cursor.session;
expect(session).property('hasEnded').to.be.false;
cursor.rewind();
expect(session).property('hasEnded').to.be.true;
done();
});
})
});

it('should not end an explicit session on rewind', {
metadata: { requires: { mongodb: '>=3.6' } },
test: withClientV2(function (client, done) {
const coll = client.db().collection('abstract_cursor');
const session = client.startSession();

const cursor = coll.find({}, { batchSize: 1, session });
this.defer(() => cursor.close());

cursor.next((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

const session = cursor.session;
expect(session).property('hasEnded').to.be.false;
cursor.rewind();
expect(session).property('hasEnded').to.be.false;

session.endSession(done);
});
})
});
});
});
3 changes: 1 addition & 2 deletions test/functional/operation_example.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4908,8 +4908,7 @@ describe('Operation Examples', function () {
* @example-class Cursor
* @example-method rewind
*/
// NOTE: unclear whether we should continue to support `rewind`
it.skip('Should correctly rewind and restart cursor', {
it('Should correctly rewind and restart cursor', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
Expand Down