Skip to content

Commit b1e477e

Browse files
authored
refactor(deps): remove stream-events dependency (#2022)
* refactor(deps): remove stream-events dependency * implement final, fix _write to invoke callback on nextTick * remove unused variable * invoke callback directly in _final
1 parent 3ecb731 commit b1e477e

File tree

3 files changed

+58
-11
lines changed

3 files changed

+58
-11
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
"p-limit": "^3.0.1",
6868
"pumpify": "^2.0.0",
6969
"retry-request": "^5.0.0",
70-
"stream-events": "^1.0.4",
7170
"teeny-request": "^8.0.0",
7271
"uuid": "^8.0.0"
7372
},

src/file.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ import * as mime from 'mime';
3131
// eslint-disable-next-line @typescript-eslint/no-var-requires
3232
const pumpify = require('pumpify');
3333
import * as resumableUpload from './resumable-upload';
34-
import {Duplex, Writable, Readable, PassThrough} from 'stream';
35-
import * as streamEvents from 'stream-events';
34+
import {Writable, Readable, PassThrough} from 'stream';
3635
import * as zlib from 'zlib';
3736
import * as http from 'http';
3837

@@ -65,6 +64,7 @@ import {
6564
objectKeyToLowercase,
6665
unicodeJSONStringify,
6766
formatAsUTCISO,
67+
PassThroughShim,
6868
} from './util';
6969
import {CRC32CValidatorGenerator} from './crc32c';
7070
import {HashStreamValidator} from './hash-stream-validator';
@@ -1363,7 +1363,7 @@ class File extends ServiceObject<File> {
13631363

13641364
let validateStream: HashStreamValidator | undefined = undefined;
13651365

1366-
const throughStream = streamEvents(new PassThrough());
1366+
const throughStream = new PassThroughShim();
13671367

13681368
let isServedCompressed = true;
13691369
let crc32c = true;
@@ -1938,13 +1938,18 @@ class File extends ServiceObject<File> {
19381938
stream.emit('progress', evt);
19391939
});
19401940

1941-
const stream = streamEvents(
1942-
pumpify([
1943-
gzip ? zlib.createGzip() : new PassThrough(),
1944-
validateStream,
1945-
fileWriteStream,
1946-
])
1947-
) as Duplex;
1941+
const passThroughShim = new PassThroughShim();
1942+
1943+
passThroughShim.on('writing', () => {
1944+
stream.emit('writing');
1945+
});
1946+
1947+
const stream = pumpify([
1948+
passThroughShim,
1949+
gzip ? zlib.createGzip() : new PassThrough(),
1950+
validateStream,
1951+
fileWriteStream,
1952+
]);
19481953

19491954
// Wait until we've received data to determine what upload technique to use.
19501955
stream.on('writing', () => {

src/util.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
import * as querystring from 'querystring';
16+
import {PassThrough} from 'stream';
1617

1718
export function normalize<T = {}, U = Function>(
1819
optionsOrCallback?: T | U,
@@ -166,3 +167,45 @@ export function formatAsUTCISO(
166167

167168
return resultString;
168169
}
170+
171+
export class PassThroughShim extends PassThrough {
172+
private shouldEmitReading = true;
173+
private shouldEmitWriting = true;
174+
175+
_read(size: number): void {
176+
if (this.shouldEmitReading) {
177+
this.emit('reading');
178+
this.shouldEmitReading = false;
179+
}
180+
super._read(size);
181+
}
182+
183+
_write(
184+
chunk: never,
185+
encoding: BufferEncoding,
186+
callback: (error?: Error | null | undefined) => void
187+
): void {
188+
if (this.shouldEmitWriting) {
189+
this.emit('writing');
190+
this.shouldEmitWriting = false;
191+
}
192+
// Per the nodejs documention, callback must be invoked on the next tick
193+
process.nextTick(() => {
194+
super._write(chunk, encoding, callback);
195+
});
196+
}
197+
198+
_final(callback: (error?: Error | null | undefined) => void): void {
199+
// If the stream is empty (i.e. empty file) final will be invoked before _read / _write
200+
// and we should still emit the proper events.
201+
if (this.shouldEmitReading) {
202+
this.emit('reading');
203+
this.shouldEmitReading = false;
204+
}
205+
if (this.shouldEmitWriting) {
206+
this.emit('writing');
207+
this.shouldEmitWriting = false;
208+
}
209+
callback(null);
210+
}
211+
}

0 commit comments

Comments
 (0)