-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathpauses.ts
37 lines (34 loc) · 887 Bytes
/
pauses.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import helper from './helper'
import concat from 'concat-stream'
import JSONStream from 'JSONStream'
import QueryStream from '../src'
import { Transform, TransformCallback } from 'stream'
class PauseStream extends Transform {
constructor() {
super({ objectMode: true })
}
_transform(chunk, encoding, callback): void {
this.push(chunk, encoding)
setTimeout(callback, 1)
}
}
helper('pauses', function (client) {
it('pauses', function (done) {
this.timeout(5000)
const stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {
batchSize: 2,
highWaterMark: 2,
})
const query = client.query(stream)
const pauser = new PauseStream()
query
.pipe(JSONStream.stringify())
.pipe(pauser)
.pipe(
concat(function (json) {
JSON.parse(json)
done()
})
)
})
})