Skip to content

Commit 3b2b877

Browse files
authored
perf: Improve Multiple Chunk Upload Performance (#2185)
* perf: Improve Multiple Chunk Upload Performance * perf: Keep Buffers in Arrays * doc: `prependLocalBufferToUpstream` * refactor: Remove `bufferEncoding` * perf: Request Data From Upstream Immediately Upon Consuming
1 parent 5a53804 commit 3b2b877

File tree

3 files changed

+284
-166
lines changed

3 files changed

+284
-166
lines changed

src/file.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ export type PredefinedAcl =
199199

200200
export interface CreateResumableUploadOptions {
201201
chunkSize?: number;
202+
highWaterMark?: number;
202203
metadata?: Metadata;
203204
origin?: string;
204205
offset?: number;
@@ -3853,6 +3854,7 @@ class File extends ServiceObject<File> {
38533854
retryOptions: {...retryOptions},
38543855
params: options?.preconditionOpts || this.instancePreconditionOpts,
38553856
chunkSize: options?.chunkSize,
3857+
highWaterMark: options?.highWaterMark,
38563858
});
38573859

38583860
uploadStream

src/resumable-upload.ts

Lines changed: 124 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
} from 'gaxios';
2424
import * as gaxios from 'gaxios';
2525
import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library';
26-
import {Readable, Writable} from 'stream';
26+
import {Readable, Writable, WritableOptions} from 'stream';
2727
import retry = require('async-retry');
2828
import {RetryOptions, PreconditionOptions} from './storage';
2929
import * as uuid from 'uuid';
@@ -62,7 +62,7 @@ export interface QueryParameters extends PreconditionOptions {
6262
userProject?: string;
6363
}
6464

65-
export interface UploadConfig {
65+
export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> {
6666
/**
6767
* The API endpoint used for the request.
6868
* Defaults to `storage.googleapis.com`.
@@ -260,20 +260,22 @@ export class Upload extends Writable {
260260
uri: uuid.v4(),
261261
offset: uuid.v4(),
262262
};
263-
private upstreamChunkBuffer: Buffer = Buffer.alloc(0);
264-
private chunkBufferEncoding?: BufferEncoding = undefined;
263+
/**
264+
* A cache of buffers written to this instance, ready for consuming
265+
*/
266+
private writeBuffers: Buffer[] = [];
265267
private numChunksReadInRequest = 0;
266268
/**
267-
* A chunk used for caching the most recent upload chunk.
269+
* An array of buffers used for caching the most recent upload chunk.
268270
* We should not assume that the server received all bytes sent in the request.
269271
* - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
270272
*/
271-
private lastChunkSent = Buffer.alloc(0);
273+
private localWriteCache: Buffer[] = [];
274+
private localWriteCacheByteLength = 0;
272275
private upstreamEnded = false;
273276

274277
constructor(cfg: UploadConfig) {
275-
super();
276-
278+
super(cfg);
277279
cfg = cfg || {};
278280

279281
if (!cfg.bucket || !cfg.file) {
@@ -391,24 +393,73 @@ export class Upload extends Writable {
391393
// Backwards-compatible event
392394
this.emit('writing');
393395

394-
this.upstreamChunkBuffer = Buffer.concat([
395-
this.upstreamChunkBuffer,
396-
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk,
397-
]);
398-
this.chunkBufferEncoding = encoding;
396+
this.writeBuffers.push(
397+
typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk
398+
);
399399

400400
this.once('readFromChunkBuffer', readCallback);
401401

402402
process.nextTick(() => this.emit('wroteToChunkBuffer'));
403403
}
404404

405+
#resetLocalBuffersCache() {
406+
this.localWriteCache = [];
407+
this.localWriteCacheByteLength = 0;
408+
}
409+
410+
#addLocalBufferCache(buf: Buffer) {
411+
this.localWriteCache.push(buf);
412+
this.localWriteCacheByteLength += buf.byteLength;
413+
}
414+
405415
/**
406-
* Prepends data back to the upstream chunk buffer.
416+
* Prepends the local buffer to write buffer and resets it.
407417
*
408-
* @param chunk The data to prepend
418+
* @param keepLastBytes number of bytes to keep from the end of the local buffer.
409419
*/
410-
private unshiftChunkBuffer(chunk: Buffer) {
411-
this.upstreamChunkBuffer = Buffer.concat([chunk, this.upstreamChunkBuffer]);
420+
private prependLocalBufferToUpstream(keepLastBytes?: number) {
421+
// Typically, the upstream write buffers should be smaller than the local
422+
// cache, so we can save time by setting the local cache as the new
423+
// upstream write buffer array and appending the old array to it
424+
let initialBuffers: Buffer[] = [];
425+
426+
if (keepLastBytes) {
427+
// we only want the last X bytes
428+
let bytesKept = 0;
429+
430+
while (keepLastBytes > bytesKept) {
431+
// load backwards because we want the last X bytes
432+
// note: `localWriteCacheByteLength` is reset below
433+
let buf = this.localWriteCache.pop();
434+
if (!buf) break;
435+
436+
bytesKept += buf.byteLength;
437+
438+
if (bytesKept > keepLastBytes) {
439+
// we have gone over the amount desired, let's keep the last X bytes
440+
// of this buffer
441+
const diff = bytesKept - keepLastBytes;
442+
buf = buf.subarray(diff);
443+
bytesKept -= diff;
444+
}
445+
446+
initialBuffers.unshift(buf);
447+
}
448+
} else {
449+
// we're keeping all of the local cache, simply use it as the initial buffer
450+
initialBuffers = this.localWriteCache;
451+
}
452+
453+
// Append the old upstream to the new
454+
const append = this.writeBuffers;
455+
this.writeBuffers = initialBuffers;
456+
457+
for (const buf of append) {
458+
this.writeBuffers.push(buf);
459+
}
460+
461+
// reset last buffers sent
462+
this.#resetLocalBuffersCache();
412463
}
413464

414465
/**
@@ -417,15 +468,28 @@ export class Upload extends Writable {
417468
* @param limit The maximum amount to return from the buffer.
418469
* @returns The data requested.
419470
*/
420-
private pullFromChunkBuffer(limit: number) {
421-
const chunk = this.upstreamChunkBuffer.slice(0, limit);
422-
this.upstreamChunkBuffer = this.upstreamChunkBuffer.slice(limit);
471+
private *pullFromChunkBuffer(limit: number) {
472+
while (limit) {
473+
const buf = this.writeBuffers.shift();
474+
if (!buf) break;
475+
476+
let bufToYield = buf;
423477

424-
// notify upstream we've read from the buffer so it can potentially
425-
// send more data down.
426-
process.nextTick(() => this.emit('readFromChunkBuffer'));
478+
if (buf.byteLength > limit) {
479+
bufToYield = buf.subarray(0, limit);
480+
this.writeBuffers.unshift(buf.subarray(limit));
481+
limit = 0;
482+
} else {
483+
limit -= buf.byteLength;
484+
}
485+
486+
yield bufToYield;
427487

428-
return chunk;
488+
// Notify upstream we've read from the buffer and we're able to consume
489+
// more. It can also potentially send more data down as we're currently
490+
// iterating.
491+
this.emit('readFromChunkBuffer');
492+
}
429493
}
430494

431495
/**
@@ -436,7 +500,7 @@ export class Upload extends Writable {
436500
private async waitForNextChunk(): Promise<boolean> {
437501
const willBeMoreChunks = await new Promise<boolean>(resolve => {
438502
// There's data available - it should be digested
439-
if (this.upstreamChunkBuffer.byteLength) {
503+
if (this.writeBuffers.length) {
440504
return resolve(true);
441505
}
442506

@@ -457,7 +521,7 @@ export class Upload extends Writable {
457521
removeListeners();
458522

459523
// this should be the last chunk, if there's anything there
460-
if (this.upstreamChunkBuffer.length) return resolve(true);
524+
if (this.writeBuffers.length) return resolve(true);
461525

462526
return resolve(false);
463527
};
@@ -483,35 +547,16 @@ export class Upload extends Writable {
483547
* Ends when the limit has reached or no data is expected to be pushed from upstream.
484548
*
485549
* @param limit The most amount of data this iterator should return. `Infinity` by default.
486-
* @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator
487550
*/
488-
private async *upstreamIterator(limit = Infinity, oneChunkMode?: boolean) {
489-
let completeChunk = Buffer.alloc(0);
490-
551+
private async *upstreamIterator(limit = Infinity) {
491552
// read from upstream chunk buffer
492553
while (limit && (await this.waitForNextChunk())) {
493554
// read until end or limit has been reached
494-
const chunk = this.pullFromChunkBuffer(limit);
495-
496-
limit -= chunk.byteLength;
497-
if (oneChunkMode) {
498-
// return 1 chunk at the end of iteration
499-
completeChunk = Buffer.concat([completeChunk, chunk]);
500-
} else {
501-
// return many chunks throughout iteration
502-
yield {
503-
chunk,
504-
encoding: this.chunkBufferEncoding,
505-
};
555+
for (const chunk of this.pullFromChunkBuffer(limit)) {
556+
limit -= chunk.byteLength;
557+
yield chunk;
506558
}
507559
}
508-
509-
if (oneChunkMode) {
510-
yield {
511-
chunk: completeChunk,
512-
encoding: this.chunkBufferEncoding,
513-
};
514-
}
515560
}
516561

517562
createURI(): Promise<string>;
@@ -680,10 +725,7 @@ export class Upload extends Writable {
680725
}
681726

682727
// A queue for the upstream data
683-
const upstreamQueue = this.upstreamIterator(
684-
expectedUploadSize,
685-
multiChunkMode // multi-chunk mode should return 1 chunk per request
686-
);
728+
const upstreamQueue = this.upstreamIterator(expectedUploadSize);
687729

688730
// The primary read stream for this request. This stream retrieves no more
689731
// than the exact requested amount from upstream.
@@ -696,15 +738,23 @@ export class Upload extends Writable {
696738

697739
if (result.value) {
698740
this.numChunksReadInRequest++;
699-
this.lastChunkSent = result.value.chunk;
700-
this.numBytesWritten += result.value.chunk.byteLength;
741+
742+
if (multiChunkMode) {
743+
// save ever buffer used in the request in multi-chunk mode
744+
this.#addLocalBufferCache(result.value);
745+
} else {
746+
this.#resetLocalBuffersCache();
747+
this.#addLocalBufferCache(result.value);
748+
}
749+
750+
this.numBytesWritten += result.value.byteLength;
701751

702752
this.emit('progress', {
703753
bytesWritten: this.numBytesWritten,
704754
contentLength: this.contentLength,
705755
});
706756

707-
requestStream.push(result.value.chunk, result.value.encoding);
757+
requestStream.push(result.value);
708758
}
709759

710760
if (result.done) {
@@ -720,17 +770,21 @@ export class Upload extends Writable {
720770
// If using multiple chunk upload, set appropriate header
721771
if (multiChunkMode) {
722772
// We need to know how much data is available upstream to set the `Content-Range` header.
723-
const oneChunkIterator = this.upstreamIterator(expectedUploadSize, true);
724-
const {value} = await oneChunkIterator.next();
773+
// https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
774+
for await (const chunk of this.upstreamIterator(expectedUploadSize)) {
775+
// This will conveniently track and keep the size of the buffers
776+
this.#addLocalBufferCache(chunk);
777+
}
725778

726-
const bytesToUpload = value!.chunk.byteLength;
779+
// We hit either the expected upload size or the remainder
780+
const bytesToUpload = this.localWriteCacheByteLength;
727781

728782
// Important: we want to know if the upstream has ended and the queue is empty before
729783
// unshifting data back into the queue. This way we will know if this is the last request or not.
730784
const isLastChunkOfUpload = !(await this.waitForNextChunk());
731785

732-
// Important: put the data back in the queue for the actual upload iterator
733-
this.unshiftChunkBuffer(value!.chunk);
786+
// Important: put the data back in the queue for the actual upload
787+
this.prependLocalBufferToUpstream();
734788

735789
let totalObjectSize = this.contentLength;
736790

@@ -808,15 +862,14 @@ export class Upload extends Writable {
808862
// - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
809863
const missingBytes = this.numBytesWritten - this.offset;
810864
if (missingBytes) {
811-
const dataToPrependForResending = this.lastChunkSent.slice(
812-
-missingBytes
813-
);
814865
// As multi-chunk uploads send one chunk per request and pulls one
815866
// chunk into the pipeline, prepending the missing bytes back should
816867
// be fine for the next request.
817-
this.unshiftChunkBuffer(dataToPrependForResending);
868+
this.prependLocalBufferToUpstream(missingBytes);
818869
this.numBytesWritten -= missingBytes;
819-
this.lastChunkSent = Buffer.alloc(0);
870+
} else {
871+
// No bytes missing - no need to keep the local cache
872+
this.#resetLocalBuffersCache();
820873
}
821874

822875
// continue uploading next chunk
@@ -831,8 +884,8 @@ export class Upload extends Writable {
831884

832885
this.destroy(err);
833886
} else {
834-
// remove the last chunk sent to free memory
835-
this.lastChunkSent = Buffer.alloc(0);
887+
// no need to keep the cache
888+
this.#resetLocalBuffersCache();
836889

837890
if (resp && resp.data) {
838891
resp.data.size = Number(resp.data.size);
@@ -983,11 +1036,9 @@ export class Upload extends Writable {
9831036
return;
9841037
}
9851038

986-
// Unshift the most recent chunk back in case it's needed for the next
987-
// request.
988-
this.numBytesWritten -= this.lastChunkSent.byteLength;
989-
this.unshiftChunkBuffer(this.lastChunkSent);
990-
this.lastChunkSent = Buffer.alloc(0);
1039+
// Unshift the local cache back in case it's needed for the next request.
1040+
this.numBytesWritten -= this.localWriteCacheByteLength;
1041+
this.prependLocalBufferToUpstream();
9911042

9921043
// We don't know how much data has been received by the server.
9931044
// `continueUploading` will recheck the offset via `getAndSetOffset`.

0 commit comments

Comments
 (0)