Skip to content

Commit e83ae11

Browse files
committed
refactor(NODE-4631): change_stream, gridfs to use maybeCallback
1 parent b8b765b commit e83ae11

13 files changed

+254
-133
lines changed

src/bulk/common.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1346,13 +1346,13 @@ function handleEarlyError(
13461346
err?: AnyError,
13471347
callback?: Callback<BulkWriteResult>
13481348
): Promise<BulkWriteResult> | void {
1349-
const Promise = PromiseProvider.get();
13501349
if (typeof callback === 'function') {
13511350
callback(err);
13521351
return;
13531352
}
13541353

1355-
return Promise.reject(err);
1354+
const PromiseConstructor = PromiseProvider.get() ?? Promise;
1355+
return PromiseConstructor.reject(err);
13561356
}
13571357

13581358
function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {

src/change_stream.ts

+42-57
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
2020
import type { CollationOptions, OperationParent } from './operations/command';
2121
import type { ReadPreference } from './read_preference';
2222
import type { ServerSessionId } from './sessions';
23-
import { Callback, filterOptions, getTopology, maybePromise, MongoDBNamespace } from './utils';
23+
import { Callback, filterOptions, getTopology, maybeCallback, MongoDBNamespace } from './utils';
2424

2525
/** @internal */
2626
const kCursorStream = Symbol('cursorStream');
@@ -649,29 +649,25 @@ export class ChangeStream<
649649
hasNext(callback: Callback<boolean>): void;
650650
hasNext(callback?: Callback): Promise<boolean> | void {
651651
this._setIsIterator();
652-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
653-
// Shadowing is intentional here. We want to override the `callback` variable
654-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
655-
return maybePromise(callback, callback => {
656-
(async () => {
652+
return maybeCallback(async () => {
653+
try {
654+
const hasNext = await this.cursor.hasNext();
655+
return hasNext;
656+
} catch (error) {
657657
try {
658+
await this._processErrorIteratorMode(error);
658659
const hasNext = await this.cursor.hasNext();
659660
return hasNext;
660661
} catch (error) {
661662
try {
662-
await this._processErrorIteratorMode(error);
663-
const hasNext = await this.cursor.hasNext();
664-
return hasNext;
665-
} catch (error) {
666-
await this.close().catch(err => err);
667-
throw error;
663+
await this.close();
664+
} catch {
665+
// We are not concerned with errors from close()
668666
}
667+
throw error;
669668
}
670-
})().then(
671-
hasNext => callback(undefined, hasNext),
672-
error => callback(error)
673-
);
674-
});
669+
}
670+
}, callback);
675671
}
676672

677673
/** Get the next available document from the Change Stream. */
@@ -680,31 +676,27 @@ export class ChangeStream<
680676
next(callback: Callback<TChange>): void;
681677
next(callback?: Callback<TChange>): Promise<TChange> | void {
682678
this._setIsIterator();
683-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
684-
// Shadowing is intentional here. We want to override the `callback` variable
685-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
686-
return maybePromise(callback, callback => {
687-
(async () => {
679+
return maybeCallback(async () => {
680+
try {
681+
const change = await this.cursor.next();
682+
const processedChange = this._processChange(change ?? null);
683+
return processedChange;
684+
} catch (error) {
688685
try {
686+
await this._processErrorIteratorMode(error);
689687
const change = await this.cursor.next();
690688
const processedChange = this._processChange(change ?? null);
691689
return processedChange;
692690
} catch (error) {
693691
try {
694-
await this._processErrorIteratorMode(error);
695-
const change = await this.cursor.next();
696-
const processedChange = this._processChange(change ?? null);
697-
return processedChange;
698-
} catch (error) {
699-
await this.close().catch(err => err);
700-
throw error;
692+
await this.close();
693+
} catch {
694+
// We are not concerned with errors from close()
701695
}
696+
throw error;
702697
}
703-
})().then(
704-
change => callback(undefined, change),
705-
error => callback(error)
706-
);
707-
});
698+
}
699+
}, callback);
708700
}
709701

710702
/**
@@ -715,29 +707,21 @@ export class ChangeStream<
715707
tryNext(callback: Callback<Document | null>): void;
716708
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
717709
this._setIsIterator();
718-
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
719-
// Shadowing is intentional here. We want to override the `callback` variable
720-
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
721-
return maybePromise(callback, callback => {
722-
(async () => {
710+
return maybeCallback(async () => {
711+
try {
712+
const change = await this.cursor.tryNext();
713+
return change ?? null;
714+
} catch (error) {
723715
try {
716+
await this._processErrorIteratorMode(error);
724717
const change = await this.cursor.tryNext();
725718
return change ?? null;
726719
} catch (error) {
727-
try {
728-
await this._processErrorIteratorMode(error);
729-
const change = await this.cursor.tryNext();
730-
return change ?? null;
731-
} catch (error) {
732-
await this.close().catch(err => err);
733-
throw error;
734-
}
720+
await this.close().catch(err => err);
721+
throw error;
735722
}
736-
})().then(
737-
change => callback(undefined, change),
738-
error => callback(error)
739-
);
740-
});
723+
}
724+
}, callback);
741725
}
742726

743727
/** Is the cursor closed */
@@ -752,13 +736,14 @@ export class ChangeStream<
752736
close(callback?: Callback): Promise<void> | void {
753737
this[kClosed] = true;
754738

755-
return maybePromise(callback, cb => {
739+
return maybeCallback(async () => {
756740
const cursor = this.cursor;
757-
return cursor.close(err => {
741+
try {
742+
await cursor.close();
743+
} finally {
758744
this._endStream();
759-
return cb(err);
760-
});
761-
});
745+
}
746+
}, callback);
762747
}
763748

764749
/**

src/gridfs/index.ts

+22-50
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { Logger } from '../logger';
77
import { Filter, TypedEventEmitter } from '../mongo_types';
88
import type { ReadPreference } from '../read_preference';
99
import type { Sort } from '../sort';
10-
import { Callback, maybePromise } from '../utils';
10+
import { Callback, maybeCallback } from '../utils';
1111
import { WriteConcern, WriteConcernOptions } from '../write_concern';
1212
import type { FindOptions } from './../operations/find';
1313
import {
@@ -144,28 +144,17 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
144144
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
145145
delete(id: ObjectId, callback: Callback<void>): void;
146146
delete(id: ObjectId, callback?: Callback<void>): Promise<void> | void {
147-
return maybePromise(callback, callback => {
148-
return this.s._filesCollection.deleteOne({ _id: id }, (error, res) => {
149-
if (error) {
150-
return callback(error);
151-
}
152-
153-
return this.s._chunksCollection.deleteMany({ files_id: id }, error => {
154-
if (error) {
155-
return callback(error);
156-
}
157-
158-
// Delete orphaned chunks before returning FileNotFound
159-
if (!res?.deletedCount) {
160-
// TODO(NODE-3483): Replace with more appropriate error
161-
// Consider creating new error MongoGridFSFileNotFoundError
162-
return callback(new MongoRuntimeError(`File not found for id ${id}`));
163-
}
164-
165-
return callback();
166-
});
167-
});
168-
});
147+
return maybeCallback(async () => {
148+
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });
149+
await this.s._chunksCollection.deleteMany({ files_id: id });
150+
151+
// Delete orphaned chunks before returning FileNotFound
152+
if (deletedCount === 0) {
153+
// TODO(NODE-3483): Replace with more appropriate error
154+
// Consider creating new error MongoGridFSFileNotFoundError
155+
throw new MongoRuntimeError(`File not found for id ${id}`);
156+
}
157+
}, callback);
169158
}
170159

171160
/** Convenience wrapper around find on the files collection */
@@ -215,42 +204,25 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
215204
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
216205
rename(id: ObjectId, filename: string, callback: Callback<void>): void;
217206
rename(id: ObjectId, filename: string, callback?: Callback<void>): Promise<void> | void {
218-
return maybePromise(callback, callback => {
207+
return maybeCallback(async () => {
219208
const filter = { _id: id };
220209
const update = { $set: { filename } };
221-
return this.s._filesCollection.updateOne(filter, update, (error?, res?) => {
222-
if (error) {
223-
return callback(error);
224-
}
225-
226-
if (!res?.matchedCount) {
227-
return callback(new MongoRuntimeError(`File with id ${id} not found`));
228-
}
229-
230-
return callback();
231-
});
232-
});
210+
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
211+
if (matchedCount === 0) {
212+
throw new MongoRuntimeError(`File with id ${id} not found`);
213+
}
214+
}, callback);
233215
}
234216

235217
/** Removes this bucket's files collection, followed by its chunks collection. */
236218
drop(): Promise<void>;
237219
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
238220
drop(callback: Callback<void>): void;
239221
drop(callback?: Callback<void>): Promise<void> | void {
240-
return maybePromise(callback, callback => {
241-
return this.s._filesCollection.drop(error => {
242-
if (error) {
243-
return callback(error);
244-
}
245-
return this.s._chunksCollection.drop(error => {
246-
if (error) {
247-
return callback(error);
248-
}
249-
250-
return callback();
251-
});
252-
});
253-
});
222+
return maybeCallback(async () => {
223+
await this.s._filesCollection.drop();
224+
await this.s._chunksCollection.drop();
225+
}, callback);
254226
}
255227

256228
/** Get the Db scoped logger. */

src/gridfs/upload.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { Document } from '../bson';
44
import { ObjectId } from '../bson';
55
import type { Collection } from '../collection';
66
import { AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
7-
import { Callback, maybePromise } from '../utils';
7+
import { Callback, maybeCallback } from '../utils';
88
import type { WriteConcernOptions } from '../write_concern';
99
import { WriteConcern } from './../write_concern';
1010
import type { GridFSFile } from './download';
@@ -149,20 +149,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
149149
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
150150
abort(callback: Callback<void>): void;
151151
abort(callback?: Callback<void>): Promise<void> | void {
152-
return maybePromise(callback, callback => {
152+
return maybeCallback(async () => {
153153
if (this.state.streamEnd) {
154154
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
155-
return callback(new MongoAPIError('Cannot abort a stream that has already completed'));
155+
throw new MongoAPIError('Cannot abort a stream that has already completed');
156156
}
157157

158158
if (this.state.aborted) {
159159
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
160-
return callback(new MongoAPIError('Cannot call abort() on a stream twice'));
160+
throw new MongoAPIError('Cannot call abort() on a stream twice');
161161
}
162162

163163
this.state.aborted = true;
164-
this.chunks.deleteMany({ files_id: this.id }, error => callback(error));
165-
});
164+
await this.chunks.deleteMany({ files_id: this.id });
165+
}, callback);
166166
}
167167

168168
/**

src/mongo_client.ts

+8-4
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
593593
return mongoClient.connect();
594594
}
595595
} catch (error) {
596-
if (callback) return callback(error);
597-
else return PromiseProvider.get().reject(error);
596+
if (callback) {
597+
return callback(error);
598+
}
599+
600+
const PromiseConstructor = PromiseProvider.get() ?? Promise;
601+
return PromiseConstructor.reject(error);
598602
}
599603
}
600604

@@ -645,9 +649,9 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
645649
}
646650

647651
const session = this.startSession(options);
648-
const Promise = PromiseProvider.get();
652+
const PromiseConstructor = PromiseProvider.get() ?? Promise;
649653

650-
return Promise.resolve()
654+
return PromiseConstructor.resolve()
651655
.then(() => withSessionCallback(session))
652656
.then(() => {
653657
// Do not return the result of callback

src/promise_provider.ts

+11-7
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import { MongoInvalidArgumentError } from './error';
44
const kPromise = Symbol('promise');
55

66
interface PromiseStore {
7-
[kPromise]?: PromiseConstructor;
7+
[kPromise]: PromiseConstructor | null;
88
}
99

1010
const store: PromiseStore = {
11-
[kPromise]: undefined
11+
[kPromise]: null
1212
};
1313

1414
/**
@@ -31,7 +31,13 @@ export class PromiseProvider {
3131
* Sets the promise library
3232
* @deprecated Setting a custom promise library is deprecated the next major version will use the global Promise constructor only.
3333
*/
34-
static set(lib: PromiseConstructor): void {
34+
static set(lib: PromiseConstructor | null): void {
35+
// eslint-disable-next-line no-restricted-syntax
36+
if (lib === null) {
37+
// Check explicitly against null since `.set()` (no args) should fall through to validate
38+
store[kPromise] = null;
39+
return;
40+
}
3541
if (!PromiseProvider.validate(lib)) {
3642
// validate
3743
return;
@@ -43,9 +49,7 @@ export class PromiseProvider {
4349
* Get the stored promise library, or resolves passed in
4450
* @deprecated Setting a custom promise library is deprecated the next major version will use the global Promise constructor only.
4551
*/
46-
static get(): PromiseConstructor {
47-
return store[kPromise] as PromiseConstructor;
52+
static get(): PromiseConstructor | null {
53+
return store[kPromise];
4854
}
4955
}
50-
51-
PromiseProvider.set(global.Promise);

src/sessions.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -609,14 +609,14 @@ function attemptTransaction<TSchema>(
609609
fn: WithTransactionCallback<TSchema>,
610610
options?: TransactionOptions
611611
): Promise<any> {
612-
const Promise = PromiseProvider.get();
613612
session.startTransaction(options);
614613

615614
let promise;
616615
try {
617616
promise = fn(session);
618617
} catch (err) {
619-
promise = Promise.reject(err);
618+
const PromiseConstructor = PromiseProvider.get() ?? Promise;
619+
promise = PromiseConstructor.reject(err);
620620
}
621621

622622
if (!isPromiseLike(promise)) {

0 commit comments

Comments
 (0)