- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(): pg-query-stream typescript #2376
Changes from all commits
f2bebb9
ae49613
ad19876
ef1c5bf
38a8a7d
300fee0
46e2ae7
f80c49b
1849a6a
b266b5d
66d4bb4
9935c92
76a5316
9463877
3b569f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import QueryStream from '../src' | ||
import pg from 'pg' | ||
import assert from 'assert' | ||
|
||
const queryText = 'SELECT * FROM generate_series(0, 200) num' | ||
|
||
// node v8 do not support async iteration | ||
if (!process.version.startsWith('v8')) { | ||
describe('Async iterator', () => { | ||
it('works', async () => { | ||
const stream = new QueryStream(queryText, []) | ||
const client = new pg.Client() | ||
await client.connect() | ||
const query = client.query(stream) | ||
const rows = [] | ||
for await (const row of query) { | ||
rows.push(row) | ||
} | ||
assert.equal(rows.length, 201) | ||
await client.end() | ||
}) | ||
|
||
it('can async iterate and then do a query afterwards', async () => { | ||
const stream = new QueryStream(queryText, []) | ||
const client = new pg.Client() | ||
await client.connect() | ||
const query = client.query(stream) | ||
const iteratorRows = [] | ||
for await (const row of query) { | ||
iteratorRows.push(row) | ||
} | ||
assert.equal(iteratorRows.length, 201) | ||
const { rows } = await client.query('SELECT NOW()') | ||
assert.equal(rows.length, 1) | ||
await client.end() | ||
}) | ||
|
||
it('can async iterate multiple times with a pool', async () => { | ||
const pool = new pg.Pool({ max: 1 }) | ||
|
||
const allRows = [] | ||
const run = async () => { | ||
// get the client | ||
const client = await pool.connect() | ||
// stream some rows | ||
const stream = new QueryStream(queryText, []) | ||
const iteratorRows = [] | ||
client.query(stream) | ||
for await (const row of stream) { | ||
iteratorRows.push(row) | ||
allRows.push(row) | ||
} | ||
assert.equal(iteratorRows.length, 201) | ||
client.release() | ||
} | ||
await Promise.all([run(), run(), run()]) | ||
assert.equal(allRows.length, 603) | ||
await pool.end() | ||
}) | ||
|
||
it('can break out of iteration early', async () => { | ||
const pool = new pg.Pool({ max: 1 }) | ||
const client = await pool.connect() | ||
const rows = [] | ||
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) { | ||
rows.push(row) | ||
break | ||
} | ||
for await (const row of client.query(new QueryStream(queryText, []))) { | ||
rows.push(row) | ||
break | ||
} | ||
for await (const row of client.query(new QueryStream(queryText, []))) { | ||
rows.push(row) | ||
break | ||
} | ||
assert.strictEqual(rows.length, 3) | ||
client.release() | ||
await pool.end() | ||
}) | ||
|
||
it('only returns rows on first iteration', async () => { | ||
const pool = new pg.Pool({ max: 1 }) | ||
const client = await pool.connect() | ||
const rows = [] | ||
const stream = client.query(new QueryStream(queryText, [])) | ||
for await (const row of stream) { | ||
rows.push(row) | ||
break | ||
} | ||
for await (const row of stream) { | ||
rows.push(row) | ||
} | ||
for await (const row of stream) { | ||
rows.push(row) | ||
} | ||
assert.strictEqual(rows.length, 1) | ||
client.release() | ||
await pool.end() | ||
}) | ||
|
||
it('can read with delays', async () => { | ||
const pool = new pg.Pool({ max: 1 }) | ||
const client = await pool.connect() | ||
const rows = [] | ||
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 })) | ||
for await (const row of stream) { | ||
rows.push(row) | ||
await new Promise((resolve) => setTimeout(resolve, 1)) | ||
} | ||
assert.strictEqual(rows.length, 201) | ||
client.release() | ||
await pool.end() | ||
}) | ||
}) | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import assert from 'assert' | ||
import QueryStream from '../src' | ||
|
||
describe('stream config options', () => { | ||
// this is mostly for backwards compatibility. | ||
it('sets readable.highWaterMark based on batch size', () => { | ||
const stream = new QueryStream('SELECT NOW()', [], { | ||
batchSize: 88, | ||
}) | ||
assert.equal(stream.readableHighWaterMark, 88) | ||
}) | ||
|
||
it('sets readable.highWaterMark based on highWaterMark config', () => { | ||
const stream = new QueryStream('SELECT NOW()', [], { | ||
highWaterMark: 88, | ||
}) | ||
|
||
assert.equal(stream.readableHighWaterMark, 88) | ||
}) | ||
|
||
it('defaults to 100 for highWaterMark', () => { | ||
const stream = new QueryStream('SELECT NOW()', []) | ||
|
||
assert.equal(stream.readableHighWaterMark, 100) | ||
}) | ||
}) |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import helper from './helper' | ||
import assert from 'assert' | ||
import concat from 'concat-stream' | ||
import QueryStream from '../src' | ||
|
||
helper('instant', function (client) { | ||
it('instant', function (done) { | ||
const query = new QueryStream('SELECT pg_sleep(1)', []) | ||
const stream = client.query(query) | ||
stream.pipe( | ||
concat(function (res) { | ||
assert.equal(res.length, 1) | ||
done() | ||
}) | ||
) | ||
}) | ||
}) |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import helper from './helper' | ||
import concat from 'concat-stream' | ||
import tester from 'stream-tester' | ||
import JSONStream from 'JSONStream' | ||
import QueryStream from '../src' | ||
|
||
helper('pauses', function (client) { | ||
it('pauses', function (done) { | ||
this.timeout(5000) | ||
const stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], { | ||
batchSize: 2, | ||
highWaterMark: 2, | ||
}) | ||
const query = client.query(stream) | ||
const pauser = tester.createPauseStream(0.1, 100) | ||
query | ||
.pipe(JSONStream.stringify()) | ||
.pipe(pauser) | ||
.pipe( | ||
concat(function (json) { | ||
JSON.parse(json) | ||
done() | ||
}) | ||
) | ||
}) | ||
}) |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import helper from './helper' | ||
import QueryStream from '../src' | ||
import spec from 'stream-spec' | ||
import assert from 'assert' | ||
|
||
helper('stream tester timestamp', function (client) { | ||
it('should not warn about max listeners', function (done) { | ||
const sql = "SELECT * FROM generate_series('1983-12-30 00:00'::timestamp, '2013-12-30 00:00', '1 years')" | ||
const stream = new QueryStream(sql, []) | ||
let ended = false | ||
const query = client.query(stream) | ||
query.on('end', function () { | ||
ended = true | ||
}) | ||
spec(query).readable().pausable({ strict: true }).validateOnExit() | ||
const checkListeners = function () { | ||
assert(stream.listeners('end').length < 10) | ||
if (!ended) { | ||
setImmediate(checkListeners) | ||
} else { | ||
done() | ||
} | ||
} | ||
checkListeners() | ||
}) | ||
}) |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
import spec from 'stream-spec' | ||
import helper from './helper' | ||
import QueryStream from '../src' | ||
|
||
helper('stream tester', function (client) { | ||
it('passes stream spec', function (done) { | ||
const stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', []) | ||
const query = client.query(stream) | ||
spec(query).readable().pausable({ strict: true }).validateOnExit() | ||
stream.on('end', done) | ||
}) | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
{ | ||
"compilerOptions": { | ||
"module": "commonjs", | ||
"esModuleInterop": true, | ||
"allowSyntheticDefaultImports": true, | ||
"strict": false, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will be nice to turn this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be possible after pg-cursor is converted. I will look at pg-cursor next |
||
"target": "es6", | ||
"noImplicitAny": false, | ||
"moduleResolution": "node", | ||
"sourceMap": true, | ||
"pretty": true, | ||
"outDir": "dist", | ||
"incremental": true, | ||
"baseUrl": ".", | ||
"declaration": true, | ||
"types": [ | ||
"node", | ||
"pg", | ||
"mocha", | ||
"chai" | ||
] | ||
}, | ||
"include": [ | ||
"src/**/*" | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"compilerOptions": { | ||
"strict": true, | ||
"incremental": true, | ||
"composite": true | ||
}, | ||
"include": [], | ||
"references": [ | ||
{"path": "./packages/pg-query-stream"}, | ||
{"path": "./packages/pg-protocol"} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: why remove
emitClose
here?