Skip to content

fix(NODE-4788)!: use implementer Writable methods for GridFSBucketWriteStream #3808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 90 additions & 156 deletions src/gridfs/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Writable } from 'stream';
import type { Document } from '../bson';
import { ObjectId } from '../bson';
import type { Collection } from '../collection';
import { type AnyError, MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
import { MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error';
import type { Callback } from '../utils';
import type { WriteConcernOptions } from '../write_concern';
import { WriteConcern } from './../write_concern';
Expand Down Expand Up @@ -38,36 +38,58 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions {
* Do not instantiate this class directly. Use `openUploadStream()` instead.
* @public
*/
export class GridFSBucketWriteStream extends Writable implements NodeJS.WritableStream {
export class GridFSBucketWriteStream extends Writable {
bucket: GridFSBucket;
/** A Collection instance where the file's chunks are stored */
chunks: Collection<GridFSChunk>;
filename: string;
/** A Collection instance where the file's GridFSFile document is stored */
files: Collection<GridFSFile>;
/** The name of the file */
filename: string;
/** Options controlling the metadata inserted along with the file */
options: GridFSBucketWriteStreamOptions;
/** Indicates the stream is finished uploading */
done: boolean;
/** The ObjectId used for the `_id` field on the GridFSFile document */
id: ObjectId;
/** The number of bytes that each chunk will be limited to */
chunkSizeBytes: number;
/** Space used to store a chunk currently being inserted */
bufToStore: Buffer;
/** Accumulates the number of bytes inserted as the stream uploads chunks */
length: number;
/** Accumulates the number of chunks inserted as the stream uploads file contents */
n: number;
/** Tracks the current offset into the buffered bytes being uploaded */
pos: number;
/** Contains a number of properties indicating the current state of the stream */
state: {
/** If set the stream has ended */
streamEnd: boolean;
/** Indicates the number of chunks that still need to be inserted to exhaust the current buffered data */
outstandingRequests: number;
/** If set an error occurred during insertion */
errored: boolean;
/** If set the stream was intentionally aborted */
aborted: boolean;
};
/** The write concern setting to be used with every insert operation */
writeConcern?: WriteConcern;

/** @event */
static readonly CLOSE = 'close';
/** @event */
static readonly ERROR = 'error';
/**
* `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB.
* @event
* The document containing information about the inserted file.
* This property is defined _after_ the finish event has been emitted.
* It will remain `null` if an error occurs.
*
* @example
* ```ts
* fs.createReadStream('file.txt')
* .pipe(bucket.openUploadStream('file.txt'))
* .on('finish', function () {
* console.log(this.gridFSFile)
* })
* ```
*/
static readonly FINISH = 'finish';
gridFSFile: GridFSFile | null = null;

/**
* @param bucket - Handle for this stream's corresponding bucket
Expand Down Expand Up @@ -115,6 +137,16 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
}
}

/**
* The stream is considered constructed when the indexes ßare done being created
*/
override _construct(callback: (error?: Error | null) => void): void {
if (this.bucket.s.checkedIndexes) {
return process.nextTick(callback);
}
this.bucket.once('index', callback);
}

/**
* Write a buffer to the stream.
*
Expand All @@ -123,22 +155,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
* @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
* @returns False if this write required flushing a chunk to MongoDB. True otherwise.
*/
override write(chunk: Buffer | string): boolean;
override write(chunk: Buffer | string, callback: Callback<void>): boolean;
override write(chunk: Buffer | string, encoding: BufferEncoding | undefined): boolean;
override write(
override _write(
chunk: Buffer | string,
encoding: BufferEncoding | undefined,
encoding: BufferEncoding,
callback: Callback<void>
): boolean;
override write(
chunk: Buffer | string,
encodingOrCallback?: Callback<void> | BufferEncoding,
callback?: Callback<void>
): boolean {
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback;
return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback));
): void {
doWrite(this, chunk, encoding, callback);
}

override _final(callback: (error?: Error | null) => void): void {
if (this.state.streamEnd) {
return process.nextTick(callback);
}
this.state.streamEnd = true;
writeRemnant(this, callback);
}

/**
Expand All @@ -159,76 +189,15 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
this.state.aborted = true;
await this.chunks.deleteMany({ files_id: this.id });
}

/**
* Tells the stream that no more data will be coming in. The stream will
* persist the remaining data to MongoDB, write the files document, and
* then emit a 'finish' event.
*
* @param chunk - Buffer to write
* @param encoding - Optional encoding for the buffer
* @param callback - Function to call when all files and chunks have been persisted to MongoDB
*/
override end(): this;
override end(chunk: Buffer): this;
override end(callback: Callback<GridFSFile | void>): this;
override end(chunk: Buffer, callback: Callback<GridFSFile | void>): this;
override end(chunk: Buffer, encoding: BufferEncoding): this;
override end(
chunk: Buffer,
encoding: BufferEncoding | undefined,
callback: Callback<GridFSFile | void>
): this;
override end(
chunkOrCallback?: Buffer | Callback<GridFSFile | void>,
encodingOrCallback?: BufferEncoding | Callback<GridFSFile | void>,
callback?: Callback<GridFSFile | void>
): this {
const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback;
const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback;
callback =
typeof chunkOrCallback === 'function'
? chunkOrCallback
: typeof encodingOrCallback === 'function'
? encodingOrCallback
: callback;

if (this.state.streamEnd || checkAborted(this, callback)) return this;

this.state.streamEnd = true;

if (callback) {
this.once(GridFSBucketWriteStream.FINISH, (result: GridFSFile) => {
if (callback) callback(undefined, result);
});
}

if (!chunk) {
waitForIndexes(this, () => !!writeRemnant(this));
return this;
}

this.write(chunk, encoding, () => {
writeRemnant(this);
});

return this;
}
}

function __handleError(
stream: GridFSBucketWriteStream,
error: AnyError,
callback?: Callback
): void {
function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
if (stream.state.errored) {
process.nextTick(callback);
return;
}
stream.state.errored = true;
if (callback) {
return callback(error);
}
stream.emit(GridFSBucketWriteStream.ERROR, error);
process.nextTick(callback, error);
}

function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
Expand Down Expand Up @@ -271,13 +240,16 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
}
}

function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
if (stream.done) return true;
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
if (stream.done) {
return process.nextTick(callback);
}

if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
// Set done so we do not trigger duplicate createFilesDoc
stream.done = true;
// Create a new files doc
const filesDoc = createFilesDoc(
const gridFSFile = createFilesDoc(
stream.id,
stream.length,
stream.chunkSizeBytes,
Expand All @@ -287,24 +259,21 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
stream.options.metadata
);

if (checkAborted(stream, callback)) {
return false;
if (isAborted(stream, callback)) {
return;
}

stream.files.insertOne(filesDoc, { writeConcern: stream.writeConcern }).then(
stream.files.insertOne(gridFSFile, { writeConcern: stream.writeConcern }).then(
() => {
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
stream.emit(GridFSBucketWriteStream.CLOSE);
stream.gridFSFile = gridFSFile;
callback();
},
error => {
return __handleError(stream, error, callback);
}
error => handleError(stream, error, callback)
);

return true;
return;
}

return false;
process.nextTick(callback);
}

async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
Expand Down Expand Up @@ -377,11 +346,11 @@ function createFilesDoc(
function doWrite(
stream: GridFSBucketWriteStream,
chunk: Buffer | string,
encoding?: BufferEncoding,
callback?: Callback<void>
): boolean {
if (checkAborted(stream, callback)) {
return false;
encoding: BufferEncoding,
callback: Callback<void>
): void {
if (isAborted(stream, callback)) {
return;
}

const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
Expand All @@ -392,13 +361,8 @@ function doWrite(
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
inputBuf.copy(stream.bufToStore, stream.pos);
stream.pos += inputBuf.length;

callback && callback();

// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// True means client can keep writing.
return true;
process.nextTick(callback);
return;
}

// Otherwise, buffer is too big for current chunk, so we need to flush
Expand All @@ -418,8 +382,8 @@ function doWrite(
++stream.state.outstandingRequests;
++outstandingRequests;

if (checkAborted(stream, callback)) {
return false;
if (isAborted(stream, callback)) {
return;
}

stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
Expand All @@ -428,14 +392,10 @@ function doWrite(
--outstandingRequests;

if (!outstandingRequests) {
stream.emit('drain', doc);
callback && callback();
checkDone(stream);
checkDone(stream, callback);
}
},
error => {
return __handleError(stream, error);
}
error => handleError(stream, error, callback)
);

spaceRemaining = stream.chunkSizeBytes;
Expand All @@ -445,29 +405,9 @@ function doWrite(
inputBufRemaining -= numToCopy;
numToCopy = Math.min(spaceRemaining, inputBufRemaining);
}

// Note that we reverse the typical semantics of write's return value
// to be compatible with node's `.pipe()` function.
// False means the client should wait for the 'drain' event.
return false;
}

function waitForIndexes(
stream: GridFSBucketWriteStream,
callback: (res: boolean) => boolean
): boolean {
if (stream.bucket.s.checkedIndexes) {
return callback(false);
}

stream.bucket.once('index', () => {
callback(true);
});

return true;
}

function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boolean {
function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void {
// Buffer is empty, so don't bother to insert
if (stream.pos === 0) {
return checkDone(stream, callback);
Expand All @@ -482,28 +422,22 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
const doc = createChunkDoc(stream.id, stream.n, remnant);

// If the stream was aborted, do not write remnant
if (checkAborted(stream, callback)) {
return false;
if (isAborted(stream, callback)) {
return;
}

stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
() => {
--stream.state.outstandingRequests;
checkDone(stream);
checkDone(stream, callback);
},
error => {
return __handleError(stream, error);
}
error => handleError(stream, error, callback)
);
return true;
}

function checkAborted(stream: GridFSBucketWriteStream, callback?: Callback<void>): boolean {
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
if (stream.state.aborted) {
if (typeof callback === 'function') {
// TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
callback(new MongoAPIError('Stream has been aborted'));
}
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
return true;
}
return false;
Expand Down
Loading