Skip to content

fix(NODE-4788)!: use implementer Writable methods for GridFSBucketWriteStream #3808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 16 additions & 56 deletions src/gridfs/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,15 @@ export interface GridFSBucketReadStreamPrivate {
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
* @public
*/
export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableStream {
export class GridFSBucketReadStream extends Readable {
/** @internal */
s: GridFSBucketReadStreamPrivate;

/**
* An error occurred
* @event
*/
static readonly ERROR = 'error' as const;
/**
* Fires when the stream loaded the file document corresponding to the provided id.
* @event
*/
static readonly FILE = 'file' as const;
/**
* Emitted when a chunk of data is available to be consumed.
* @event
*/
static readonly DATA = 'data' as const;
/**
* Fired when the stream is exhausted (no more data events).
* @event
*/
static readonly END = 'end' as const;
/**
* Fired when the stream is exhausted and the underlying cursor is killed
* @event
*/
static readonly CLOSE = 'close' as const;

/**
* @param chunks - Handle for chunks collection
Expand All @@ -122,7 +102,7 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
filter: Document,
options?: GridFSBucketReadStreamOptions
) {
super();
super({ emitClose: true });
this.s = {
bytesToTrim: 0,
bytesToSkip: 0,
Expand Down Expand Up @@ -185,20 +165,8 @@ export class GridFSBucketReadStream extends Readable implements NodeJS.ReadableS
*/
async abort(): Promise<void> {
this.push(null);
this.destroyed = true;
if (this.s.cursor) {
try {
await this.s.cursor.close();
} finally {
this.emit(GridFSBucketReadStream.CLOSE);
}
} else {
if (!this.s.init) {
// If not initialized, fire close event because we will never
// get a cursor
this.emit(GridFSBucketReadStream.CLOSE);
}
}
this.destroy();
await this.s.cursor?.close();
}
}

Expand All @@ -221,19 +189,15 @@ function doRead(stream: GridFSBucketReadStream): void {
return;
}
if (error) {
stream.emit(GridFSBucketReadStream.ERROR, error);
stream.destroy(error);
return;
}
if (!doc) {
stream.push(null);

stream.s.cursor?.close().then(
() => {
stream.emit(GridFSBucketReadStream.CLOSE);
},
error => {
stream.emit(GridFSBucketReadStream.ERROR, error);
}
() => null,
error => stream.destroy(error)
);
return;
}
Expand All @@ -244,17 +208,15 @@ function doRead(stream: GridFSBucketReadStream): void {
const expectedN = stream.s.expected++;
const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
if (doc.n > expectedN) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`
)
);
}

if (doc.n < expectedN) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`)
);
}
Expand All @@ -263,16 +225,14 @@ function doRead(stream: GridFSBucketReadStream): void {

if (buf.byteLength !== expectedLength) {
if (bytesRemaining <= 0) {
return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`
)
);
}

return stream.emit(
GridFSBucketReadStream.ERROR,
return stream.destroy(
new MongoGridFSChunkError(
`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`
)
Expand Down Expand Up @@ -332,7 +292,7 @@ function init(stream: GridFSBucketReadStream): void {
doc
}: { error: Error; doc: null } | { error: null; doc: any }) => {
if (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

if (!doc) {
Expand All @@ -343,7 +303,7 @@ function init(stream: GridFSBucketReadStream): void {
// TODO(NODE-3483)
const err = new MongoRuntimeError(errmsg);
err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
return stream.emit(GridFSBucketReadStream.ERROR, err);
return stream.destroy(err);
}

// If document is empty, kill the stream immediately and don't
Expand All @@ -357,14 +317,14 @@ function init(stream: GridFSBucketReadStream): void {
// If user destroys the stream before we have a cursor, wait
// until the query is done to say we're 'closed' because we can't
// cancel a query.
stream.emit(GridFSBucketReadStream.CLOSE);
stream.destroy();
return;
}

try {
stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
} catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

const filter: Document = { files_id: doc._id };
Expand All @@ -390,7 +350,7 @@ function init(stream: GridFSBucketReadStream): void {
try {
stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
} catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
return stream.destroy(error);
}

stream.emit(GridFSBucketReadStream.FILE, doc);
Expand Down
Loading