Skip to content

Add pg-query-stream module #2035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 80 commits into from
Dec 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
722296f
Initial commit
brianc Oct 22, 2013
5400dfe
Remove bad code
brianc Oct 22, 2013
cc20d98
Bump version
brianc Oct 22, 2013
48687fc
Fix relative load path for pg.js
brianc Oct 22, 2013
aec85ce
Bump version
brianc Oct 22, 2013
1b249e9
Add in proper error handling
brianc Oct 22, 2013
5888135
Bump version
brianc Oct 22, 2013
278c5ce
Update npm test command
brianc Oct 22, 2013
33be525
Add ability to configure highWaterMark and batchSize
brianc Oct 23, 2013
0ebd4c3
Bump version
brianc Oct 23, 2013
e111715
Emit 'close' events when query completes
grncdr Dec 25, 2013
0df516c
Add cleanup to the example
brianc Jan 31, 2014
0ed940c
Merge pull request #1 from grncdr/patch-1
brianc Feb 26, 2014
2909062
Port tests to use mocha
brianc Feb 26, 2014
f8f2a92
Remove Makefile
brianc Feb 26, 2014
b66be5e
Add test for stream close & satisfy stream contract
brianc Feb 26, 2014
122bcfb
Bump version
brianc Feb 26, 2014
37de9c2
Rebase code on top of pg-cursor
brianc Feb 26, 2014
0a7da37
Bump version
brianc Feb 26, 2014
1961125
Update pg-cursor
brianc Mar 21, 2014
0f13c80
Bump version
brianc Mar 21, 2014
87b52f9
adds failing test
tbuchok Apr 10, 2014
cab956b
passes `stream-tester-timestamp`
tbuchok Apr 10, 2014
d82386e
Merge pull request #4 from tbuchok/master
brianc Apr 11, 2014
7593a44
Bump version
brianc Apr 11, 2014
41b7d7d
fix up tests
calvinmetcalf May 14, 2014
fec0909
clean ups
calvinmetcalf May 14, 2014
e242b94
Update version of pg-cursor
brianc May 22, 2014
99fe666
Bump version
brianc May 22, 2014
6763e09
merge it
calvinmetcalf May 22, 2014
adf86b8
deps
calvinmetcalf May 22, 2014
7fb1f50
Merge pull request #5 from calvinmetcalf/fixes
brianc May 22, 2014
df63cbb
Bump version
brianc May 22, 2014
6ab80d3
Add close method & supporting tests
brianc Oct 30, 2014
0b45eda
Merge pull request #7 from brianc/add-close-method
brianc Nov 3, 2014
1dd2d3a
Add infrastructure files
brianc Nov 3, 2014
02ff00a
Bump version
brianc Nov 3, 2014
aa61055
Update README.md
brianc Nov 3, 2014
b38d092
Update travis file
brianc Nov 3, 2014
52f5c70
Merge branch 'master' of github.com:brianc/node-pg-query-stream
brianc Nov 3, 2014
e9d1872
Test new travis config
brianc Nov 3, 2014
68819df
Avoid race when stream closed while fetching
Nov 5, 2015
d1ac31c
Support a close callback when closing the stream
Nov 5, 2015
27bba8d
Conform to readable stream spec
brianc Nov 13, 2015
edfe1aa
Merge pull request #16 from brianc/conform-to-readable-stream-spec
brianc Nov 13, 2015
ca21462
Merge pull request #15 from slickmb/bug/close_race
brianc Nov 13, 2015
9aca077
Add more versions of node to the travis matrix
brianc Nov 13, 2015
df8acf0
Bump version
brianc Nov 13, 2015
aa72d9b
Merge pull request #17 from brianc/more-travis-versions
brianc Nov 13, 2015
802616b
Update README.md
brianc Nov 13, 2015
2a46c8a
Add node LTS and current versions to travis matrix
abenhamdine Mar 1, 2017
c9e21f4
Merge pull request #22 from abenhamdine/master
brianc Apr 24, 2017
e517b8c
WIP
brianc Aug 5, 2017
b1f8f8d
Eslint
brianc Aug 6, 2017
57f62df
Merge pull request #29 from brianc/upgrade-pg-cursor
brianc Aug 6, 2017
465ac5c
Bump version
brianc Aug 6, 2017
17e19e5
Move eslint to dev dependencies
brianc Aug 9, 2017
090b759
Merge pull request #33 from brianc/eslint-deps
brianc Aug 9, 2017
e762b48
Bump version
brianc Aug 9, 2017
2446fdb
Fix for [email protected] (#47)
brianc Oct 8, 2018
6177ff9
Bump version
brianc Oct 8, 2018
c999aae
Updated readme examples to es6 (#36)
amilajack Jan 8, 2019
d822fc8
Added --save flags to installation steps (#35)
amilajack Jan 8, 2019
28e43b8
Create LICENSE (#34)
amilajack Jan 8, 2019
5b2816a
Bump version of pg-cursor
brianc Jan 8, 2019
7d27bd2
Bump version. Drop version of pg.js via pg-cursor.
brianc Jan 8, 2019
c95a650
Upgrade dependencies (#59)
brianc Oct 28, 2019
e153e3f
fixed typo `cumbersom` (#49)
mandyreal Oct 28, 2019
fb52c52
Bump js-yaml from 3.9.1 to 3.13.1 (#60)
dependabot[bot] Oct 28, 2019
a756ee3
Bump debug from 2.6.8 to 2.6.9 (#61)
dependabot[bot] Oct 28, 2019
a84db5f
Add async iterator tests (#62)
brianc Oct 28, 2019
3e59f28
Bump version of pg-cursor (#64)
brianc Oct 30, 2019
05b4c57
Bump eslint from 4.4.0 to 4.18.2 (#63)
dependabot[bot] Oct 30, 2019
08072a9
Pass options to cursor (#65)
brianc Oct 30, 2019
9ced05e
Bump version
brianc Oct 30, 2019
db1b95e
Add 'packages/pg-query-stream/' from commit '9ced05e8aab65f3fdf1a67ad…
brianc Dec 20, 2019
8607302
Run gitsubtree merge
brianc Dec 20, 2019
ef2f2d2
Unify lint
brianc Dec 20, 2019
6b7b8d1
Do not run tests in parallel
brianc Dec 20, 2019
fdae851
Delete files which are no longer needed as they exist in the monorepo
brianc Dec 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"packages/*"
],
"scripts": {
"test": "yarn lerna exec --parallel yarn test",
"test": "yarn lerna exec yarn test",
"lint": "yarn lerna exec --parallel yarn lint"
},
"devDependencies": {
Expand Down
1 change: 1 addition & 0 deletions packages/pg-query-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
9 changes: 9 additions & 0 deletions packages/pg-query-stream/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
68 changes: 68 additions & 0 deletions packages/pg-query-stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# pg-query-stream

[![Build Status](https://travis-ci.org/brianc/node-pg-query-stream.svg)](https://travis-ci.org/brianc/node-pg-query-stream)

Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream.


## installation

```bash
$ npm install pg --save
$ npm install pg-query-stream --save
```

_requires pg>=2.8.1_


## use

```js
const pg = require('pg')
const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')

//pipe 1,000,000 rows to stdout without blowing up your memory usage
pg.connect((err, client, done) => {
if (err) throw err;
const query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
const stream = client.query(query)
//release the client when the stream is finished
stream.on('end', done)
stream.pipe(JSONStream.stringify()).pipe(process.stdout)
})
```

The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory.

This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersome, and _way way way_ slower than using a cursor.

_note: this module only works with the JavaScript client, and does not work with the native bindings. libpq doesn't expose the protocol at a level where a cursor can be manipulated directly_

## contribution

I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome?

## license

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
61 changes: 61 additions & 0 deletions packages/pg-query-stream/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
}

_read (size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
}
})
}
}

module.exports = PgQueryStream
38 changes: 38 additions & 0 deletions packages/pg-query-stream/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "pg-query-stream",
"version": "2.0.1",
"description": "Postgres query result returned as readable stream",
"main": "index.js",
"scripts": {
"test": "mocha",
"lint": "eslint ."
},
"repository": {
"type": "git",
"url": "git://github.com/brianc/node-postgres.git"
},
"keywords": [
"postgres",
"pg",
"query",
"stream"
],
"author": "Brian M. Carlson",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-postgres/issues"
},
"devDependencies": {
"JSONStream": "~0.7.1",
"concat-stream": "~1.0.1",
"eslint-plugin-promise": "^3.5.0",
"mocha": "^6.2.2",
"pg": "^7.5.0",
"stream-spec": "~0.3.5",
"stream-tester": "0.0.5",
"through": "~2.3.4"
},
"dependencies": {
"pg-cursor": "^2.0.1"
}
}
57 changes: 57 additions & 0 deletions packages/pg-query-stream/test/async-iterator.es6
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const QueryStream = require('../')
const pg = require('pg')
const assert = require('assert')

const queryText = 'SELECT * FROM generate_series(0, 200) num'
describe('Async iterator', () => {
it('works', async () => {
const stream = new QueryStream(queryText, [])
const client = new pg.Client()
await client.connect()
const query = client.query(stream)
const rows = []
for await (const row of query) {
rows.push(row)
}
assert.equal(rows.length, 201)
await client.end()
})

it('can async iterate and then do a query afterwards', async () => {
const stream = new QueryStream(queryText, [])
const client = new pg.Client()
await client.connect()
const query = client.query(stream)
const iteratorRows = []
for await (const row of query) {
iteratorRows.push(row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should make:

  1. a test that breaks in the first iteration
  2. a test that triggers two breaking loops
  3. a test that await new Promise(r => setTimeout(r)) in the loop body

These tests will fail due to the poor implementation of pg-query

Copy link
Contributor

@matthieusieben matthieusieben Dec 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to brianc/node-pg-query-stream#52 (comment) for an alternative implementation

}
assert.equal(iteratorRows.length, 201)
const { rows } = await client.query('SELECT NOW()')
assert.equal(rows.length, 1)
await client.end()
})

it('can async iterate multiple times with a pool', async () => {
const pool = new pg.Pool({ max: 1 })

const allRows = []
const run = async () => {
// get the client
const client = await pool.connect()
// stream some rows
const stream = new QueryStream(queryText, [])
const iteratorRows = []
client.query(stream)
for await (const row of stream) {
iteratorRows.push(row)
allRows.push(row)
}
assert.equal(iteratorRows.length, 201)
client.release()
}
await Promise.all([run(), run(), run()])
assert.equal(allRows.length, 603)
await pool.end()
})
})
4 changes: 4 additions & 0 deletions packages/pg-query-stream/test/async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// only newer versions of node support async iterator
if (!process.version.startsWith('v8')) {
require('./async-iterator.es6')
}
52 changes: 52 additions & 0 deletions packages/pg-query-stream/test/close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
var assert = require('assert')
var concat = require('concat-stream')

var QueryStream = require('../')
var helper = require('./helper')

helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.pipe(concat(function () {}))
query.on('close', done)
})
})

helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var readCount = 0
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function () {
query.close()
})
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done()
})
})
query.on('close', function () {
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})
})
22 changes: 22 additions & 0 deletions packages/pg-query-stream/test/concat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
var assert = require('assert')
var concat = require('concat-stream')
var through = require('through')
var helper = require('./helper')

var QueryStream = require('../')

helper('concat', function (client) {
it('concats correctly', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
query.pipe(through(function (row) {
this.push(row.num)
})).pipe(concat(function (result) {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)
}))
stream.on('end', done)
})
})
10 changes: 10 additions & 0 deletions packages/pg-query-stream/test/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
var assert = require('assert')
var QueryStream = require('../')

var stream = new QueryStream('SELECT NOW()', [], {
highWaterMark: 999,
batchSize: 88
})

assert.equal(stream._readableState.highWaterMark, 999)
assert.equal(stream.batchSize, 88)
22 changes: 22 additions & 0 deletions packages/pg-query-stream/test/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
var assert = require('assert')
var helper = require('./helper')

var QueryStream = require('../')

helper('error', function (client) {
it('receives error on stream', function (done) {
var stream = new QueryStream('SELECT * FROM asdf num', [])
var query = client.query(stream)
query.on('error', function (err) {
assert(err)
assert.equal(err.code, '42P01')
done()
}).on('data', function () {
// noop to kick of reading
})
})

it('continues to function after stream', function (done) {
client.query('SELECT NOW()', done)
})
})
35 changes: 35 additions & 0 deletions packages/pg-query-stream/test/fast-reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
var assert = require('assert')
var helper = require('./helper')
var QueryStream = require('../')

helper('fast reader', function (client) {
it('works', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
var result = []
stream.on('readable', function () {
var res = stream.read()
while (res) {
if (result.length !== 201) {
assert(res, 'should not return null on evented reader')
} else {
// a readable stream will emit a null datum when it finishes being readable
// https://nodejs.org/api/stream.html#stream_event_readable
assert.equal(res, null)
}
if (res) {
result.push(res.num)
}
res = stream.read()
}
})
stream.on('end', function () {
var total = result.reduce(function (prev, cur) {
return prev + cur
})
assert.equal(total, 20100)
done()
})
assert.strictEqual(query.read(2), null)
})
})
Loading