Skip to content

Fix pg-query-stream implementation #2051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pg-cursor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) {
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')

Cursor.prototype.close = function (cb) {
if (this.state === 'done') {
if (!this.connection || this.state === 'done') {
if (cb) {
return setImmediate(cb)
} else {
Expand Down
5 changes: 5 additions & 0 deletions packages/pg-cursor/test/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ describe('close', function () {
})
})
})

it('is a no-op to "close" the cursor before submitting it', function (done) {
const cursor = new Cursor(text)
cursor.close(done)
})
})
2 changes: 1 addition & 1 deletion packages/pg-query-stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson
Copyright (c) 2013-2019 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
57 changes: 20 additions & 37 deletions packages/pg-query-stream/index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100
constructor(text, values, config = {}) {
const { batchSize = 100 } = config;
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, config)

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Expand All @@ -19,40 +17,25 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
submit(connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
_destroy(_err, cb) {
this.cursor.close((err) => {
cb(err || _err)
})
}

_read (size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
this.cursor.read(size, (err, rows, result) => {
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
Expand Down
55 changes: 55 additions & 0 deletions packages/pg-query-stream/test/async-iterator.es6
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,59 @@ describe('Async iterator', () => {
assert.equal(allRows.length, 603)
await pool.end()
})

it('can break out of iteration early', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
assert.strictEqual(rows.length, 3)
client.release()
await pool.end()
})

it('only returns rows on first iteration', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, []))
for await (const row of stream) {
rows.push(row)
break;
}
for await (const row of stream) {
rows.push(row)
}
for await (const row of stream) {
rows.push(row)
}
assert.strictEqual(rows.length, 1)
client.release()
await pool.end()
})

it('can read with delays', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
for await (const row of stream) {
rows.push(row)
await new Promise((resolve) => setTimeout(resolve, 1))
}
assert.strictEqual(rows.length, 201)
client.release()
await pool.end()
})
})
74 changes: 55 additions & 19 deletions packages/pg-query-stream/test/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,85 @@ var concat = require('concat-stream')
var QueryStream = require('../')
var helper = require('./helper')

if (process.version.startsWith('v8.')) {
return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly');
}

helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
query.pipe(concat(function () {}))
query.pipe(concat(function () { }))
query.on('close', done)
})
})

helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
var readCount = 0
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function () {
query.close()
query.destroy()
})
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done()
it('can destroy stream while reading', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy()
stream.on('close', done)
}, 100)
})

it('emits an error when calling destroy with an error', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy(new Error('intentional error'))
stream.on('error', (err) => {
// make sure there's an error
assert(err);
assert.strictEqual(err.message, 'intentional error');
done();
})
}, 100)
})

it('can destroy stream while reading an error', function (done) {
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.once('error', () => {
stream.destroy()
// wait a bit to let any other errors shake through
setTimeout(done, 100)
})
query.on('close', function () {
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})

it('does not crash when destroying the stream immediately after calling read', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})

it('does not crash when destroying the stream before its submitted', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})
})
4 changes: 1 addition & 3 deletions packages/pg-query-stream/test/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ var assert = require('assert')
var QueryStream = require('../')

var stream = new QueryStream('SELECT NOW()', [], {
highWaterMark: 999,
batchSize: 88
})

assert.equal(stream._readableState.highWaterMark, 999)
assert.equal(stream.batchSize, 88)
assert.equal(stream._readableState.highWaterMark, 88)