Skip to content

refactor(NODE-4631): change_stream, gridfs to use maybeCallback #3406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 21, 2022
38 changes: 16 additions & 22 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, UpdateStatement } from '../operations/update';
import { PromiseProvider } from '../promise_provider';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
Expand All @@ -31,6 +30,7 @@ import {
Callback,
getTopology,
hasAtomicOperators,
maybeCallback,
MongoDBNamespace,
resolveOptions
} from '../utils';
Expand Down Expand Up @@ -1270,11 +1270,19 @@ export abstract class BulkOperationBase {
options?: BulkWriteOptions | Callback<BulkWriteResult>,
callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void {
if (typeof options === 'function') (callback = options), (options = {});
options = options ?? {};
callback =
typeof callback === 'function'
? callback
: typeof options === 'function'
? options
: undefined;
options = options != null && typeof options !== 'function' ? options : {};

if (this.s.executed) {
return handleEarlyError(new MongoBatchReExecutionError(), callback);
// eslint-disable-next-line @typescript-eslint/require-await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we calling maybePromise twice to handle these errors because executeOperation takes a callback? Why not just promisify execute operation?

maybeCallback(async () => {
  ...
  return promisify(execute_operation)(....)
})

This approach might be preferable as well, because once you make execute_operation return a promise, you won't need to refactor this method much except just to remove the call to promisify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the execOp in maybeCallback (naively, so that's likely why) messes with session allocation because it makes the maybePromise logic run. We can add a note to the refactor exec operation refactor ticket to comeback here, or I can revert the changes here and we can do it altogether.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll tackle this once execute_operation has been refactored

return maybeCallback(async () => {
throw new MongoBatchReExecutionError();
}, callback);
}

const writeConcern = WriteConcern.fromOptions(options);
Expand All @@ -1292,10 +1300,10 @@ export abstract class BulkOperationBase {
}
// If we have no operations in the bulk raise an error
if (this.s.batches.length === 0) {
const emptyBatchError = new MongoInvalidArgumentError(
'Invalid BulkOperation, Batch cannot be empty'
);
return handleEarlyError(emptyBatchError, callback);
// eslint-disable-next-line @typescript-eslint/require-await
return maybeCallback(async () => {
throw new MongoInvalidArgumentError('Invalid BulkOperation, Batch cannot be empty');
}, callback);
}

this.s.executed = true;
Expand Down Expand Up @@ -1351,20 +1359,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
}
});

/** helper function to assist with promiseOrCallback behavior */
function handleEarlyError(
err?: AnyError,
callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void {
if (typeof callback === 'function') {
callback(err);
return;
}

const PromiseConstructor = PromiseProvider.get() ?? Promise;
return PromiseConstructor.reject(err);
}

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
Expand Down
99 changes: 44 additions & 55 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import type { ServerSessionId } from './sessions';
import { Callback, filterOptions, getTopology, maybePromise, MongoDBNamespace } from './utils';
import { Callback, filterOptions, getTopology, maybeCallback, MongoDBNamespace } from './utils';

/** @internal */
const kCursorStream = Symbol('cursorStream');
Expand Down Expand Up @@ -649,29 +649,25 @@ export class ChangeStream<
hasNext(callback: Callback<boolean>): void;
hasNext(callback?: Callback): Promise<boolean> | void {
this._setIsIterator();
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
// Shadowing is intentional here. We want to override the `callback` variable
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
return maybePromise(callback, callback => {
(async () => {
return maybeCallback(async () => {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
await this.close().catch(err => err);
throw error;
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
})().then(
hasNext => callback(undefined, hasNext),
error => callback(error)
);
});
}
}, callback);
}

/** Get the next available document from the Change Stream. */
Expand All @@ -680,31 +676,27 @@ export class ChangeStream<
next(callback: Callback<TChange>): void;
next(callback?: Callback<TChange>): Promise<TChange> | void {
this._setIsIterator();
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
// Shadowing is intentional here. We want to override the `callback` variable
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
return maybePromise(callback, callback => {
(async () => {
return maybeCallback(async () => {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
await this.close().catch(err => err);
throw error;
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
})().then(
change => callback(undefined, change),
error => callback(error)
);
});
}
}, callback);
}

/**
Expand All @@ -715,29 +707,25 @@ export class ChangeStream<
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
// TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing
// Shadowing is intentional here. We want to override the `callback` variable
// from the outer scope so that the inner scope doesn't accidentally call the wrong callback.
return maybePromise(callback, callback => {
(async () => {
return maybeCallback(async () => {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
await this.close().catch(err => err);
throw error;
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
})().then(
change => callback(undefined, change),
error => callback(error)
);
});
}
}, callback);
}

/** Is the cursor closed */
Expand All @@ -752,13 +740,14 @@ export class ChangeStream<
close(callback?: Callback): Promise<void> | void {
this[kClosed] = true;

return maybePromise(callback, cb => {
return maybeCallback(async () => {
const cursor = this.cursor;
return cursor.close(err => {
try {
await cursor.close();
} finally {
this._endStream();
return cb(err);
});
});
}
}, callback);
}

/**
Expand Down
69 changes: 21 additions & 48 deletions src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Logger } from '../logger';
import { Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { Callback, maybePromise } from '../utils';
import { Callback, maybeCallback } from '../utils';
import { WriteConcern, WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -144,28 +144,18 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
delete(id: ObjectId, callback: Callback<void>): void;
delete(id: ObjectId, callback?: Callback<void>): Promise<void> | void {
return maybePromise(callback, callback => {
return this.s._filesCollection.deleteOne({ _id: id }, (error, res) => {
if (error) {
return callback(error);
}
return maybeCallback(async () => {
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });

return this.s._chunksCollection.deleteMany({ files_id: id }, error => {
if (error) {
return callback(error);
}
// Delete orphaned chunks before returning FileNotFound
await this.s._chunksCollection.deleteMany({ files_id: id });

// Delete orphaned chunks before returning FileNotFound
if (!res?.deletedCount) {
// TODO(NODE-3483): Replace with more appropriate error
// Consider creating new error MongoGridFSFileNotFoundError
return callback(new MongoRuntimeError(`File not found for id ${id}`));
}

return callback();
});
});
});
if (deletedCount === 0) {
// TODO(NODE-3483): Replace with more appropriate error
// Consider creating new error MongoGridFSFileNotFoundError
throw new MongoRuntimeError(`File not found for id ${id}`);
}
}, callback);
}

/** Convenience wrapper around find on the files collection */
Expand Down Expand Up @@ -215,42 +205,25 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
rename(id: ObjectId, filename: string, callback: Callback<void>): void;
rename(id: ObjectId, filename: string, callback?: Callback<void>): Promise<void> | void {
return maybePromise(callback, callback => {
return maybeCallback(async () => {
const filter = { _id: id };
const update = { $set: { filename } };
return this.s._filesCollection.updateOne(filter, update, (error?, res?) => {
if (error) {
return callback(error);
}

if (!res?.matchedCount) {
return callback(new MongoRuntimeError(`File with id ${id} not found`));
}

return callback();
});
});
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
if (matchedCount === 0) {
throw new MongoRuntimeError(`File with id ${id} not found`);
}
}, callback);
}

/** Removes this bucket's files collection, followed by its chunks collection. */
drop(): Promise<void>;
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
drop(callback: Callback<void>): void;
drop(callback?: Callback<void>): Promise<void> | void {
return maybePromise(callback, callback => {
return this.s._filesCollection.drop(error => {
if (error) {
return callback(error);
}
return this.s._chunksCollection.drop(error => {
if (error) {
return callback(error);
}

return callback();
});
});
});
return maybeCallback(async () => {
await this.s._filesCollection.drop();
await this.s._chunksCollection.drop();
}, callback);
}

/** Get the Db scoped logger. */
Expand Down
12 changes: 6 additions & 6 deletions src/gridfs/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Document } from '../bson';
import { ObjectId } from '../bson';
import type { Collection } from '../collection';
import { AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
import { Callback, maybePromise } from '../utils';
import { Callback, maybeCallback } from '../utils';
import type { WriteConcernOptions } from '../write_concern';
import { WriteConcern } from './../write_concern';
import type { GridFSFile } from './download';
Expand Down Expand Up @@ -149,20 +149,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
abort(callback: Callback<void>): void;
abort(callback?: Callback<void>): Promise<void> | void {
return maybePromise(callback, callback => {
return maybeCallback(async () => {
if (this.state.streamEnd) {
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
return callback(new MongoAPIError('Cannot abort a stream that has already completed'));
throw new MongoAPIError('Cannot abort a stream that has already completed');
}

if (this.state.aborted) {
// TODO(NODE-3485): Replace with MongoGridFSStreamClosed
return callback(new MongoAPIError('Cannot call abort() on a stream twice'));
throw new MongoAPIError('Cannot call abort() on a stream twice');
}

this.state.aborted = true;
this.chunks.deleteMany({ files_id: this.id }, error => callback(error));
});
await this.chunks.deleteMany({ files_id: this.id });
}, callback);
}

/**
Expand Down
Loading