diff --git a/packages/pg-query-stream/index.js b/packages/pg-query-stream/index.js index 9c34207ec..4fe231d4a 100644 --- a/packages/pg-query-stream/index.js +++ b/packages/pg-query-stream/index.js @@ -1,14 +1,16 @@ 'use strict' -var Cursor = require('pg-cursor') -var Readable = require('stream').Readable + +const { Readable } = require('stream') +const Cursor = require('pg-cursor') class PgQueryStream extends Readable { - constructor (text, values, options) { - super(Object.assign({ objectMode: true }, options)) - this.cursor = new Cursor(text, values, options) + constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) { + // https://nodejs.org/api/stream.html#stream_new_stream_readable_options + super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize }) + this.cursor = new Cursor(text, values, { rowMode, types }) + this._reading = false - this._closed = false - this.batchSize = (options || {}).batchSize || 100 + this._destroyCallback = undefined // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) @@ -19,40 +21,47 @@ class PgQueryStream extends Readable { this.handleError = this.cursor.handleError.bind(this.cursor) } - submit (connection) { + submit(connection) { this.cursor.submit(connection) } - close (callback) { - this._closed = true - const cb = callback || (() => this.emit('close')) - this.cursor.close(cb) + // Backwards compatibility. + // A stream should be 'closed' using destroy(). + close(callback) { + if (this.destroyed) { + if (callback) setImmediate(callback) + } else { + if (callback) this.once('close', callback) + this.destroy() + } } - _read (size) { - if (this._reading || this._closed) { - return false + _destroy(_err, callback) { + if (this._reading) { + this._destroyCallback = callback + } else { + this.cursor.close(callback) } + } + + // https://nodejs.org/api/stream.html#stream_readable_read_size_1 + _read(size) { + // Prevent _destroy() from closing while reading this._reading = true - const readAmount = Math.max(size, this.batchSize) - this.cursor.read(readAmount, (err, rows) => { - if (this._closed) { - return - } - if (err) { - return this.emit('error', err) - } - // if we get a 0 length array we've read to the end of the cursor - if (!rows.length) { - this._closed = true - setImmediate(() => this.emit('close')) - return this.push(null) - } - // push each row into the stream + this.cursor.read(size, (err, rows, result) => { this._reading = false - for (var i = 0; i < rows.length; i++) { - this.push(rows[i]) + + if (this.destroyed) { + // Destroyed while reading + this.cursor.close(this._destroyCallback) + this._destroyCallback = undefined + } else if (err) { + // https://nodejs.org/api/stream.html#stream_errors_while_reading + this.destroy(err) + } else { + for (const row of rows) this.push(row) + if (rows.length < size) this.push(null) } }) }