@@ -2,15 +2,17 @@ import { Readable } from 'stream'
2
2
import { Submittable , Connection } from 'pg'
3
3
import Cursor from 'pg-cursor'
4
4
5
- interface PgQueryStreamConfig {
5
+ interface QueryStreamConfig {
6
6
batchSize ?: number
7
7
highWaterMark ?: number
8
8
rowMode ?: 'array'
9
9
types ?: any
10
10
}
11
11
12
- class PgQueryStream extends Readable implements Submittable {
12
+ class QueryStream extends Readable implements Submittable {
13
13
cursor : any
14
+ _result : any
15
+
14
16
handleRowDescription : Function
15
17
handleDataRow : Function
16
18
handlePortalSuspended : Function
@@ -19,9 +21,7 @@ class PgQueryStream extends Readable implements Submittable {
19
21
handleError : Function
20
22
handleEmptyQuery : Function
21
23
22
- _result : any
23
-
24
- constructor ( text : string , values ?: any [ ] , config : PgQueryStreamConfig = { } ) {
24
+ public constructor ( text : string , values ?: any [ ] , config : QueryStreamConfig = { } ) {
25
25
const { batchSize, highWaterMark = 100 } = config
26
26
27
27
super ( { objectMode : true , autoDestroy : true , highWaterMark : batchSize || highWaterMark } )
@@ -40,20 +40,21 @@ class PgQueryStream extends Readable implements Submittable {
40
40
this . _result = this . cursor . _result
41
41
}
42
42
43
- submit ( connection : Connection ) : void {
43
+ public submit ( connection : Connection ) : void {
44
44
this . cursor . submit ( connection )
45
45
}
46
46
47
- _destroy ( _err : Error , cb : Function ) {
47
+ public _destroy ( _err : Error , cb : Function ) {
48
48
this . cursor . close ( ( err ?: Error ) => {
49
49
cb ( err || _err )
50
50
} )
51
51
}
52
52
53
53
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
54
- _read ( size : number ) {
55
- this . cursor . read ( size , ( err : Error , rows : any [ ] , result : any ) => {
54
+ public _read ( size : number ) {
55
+ this . cursor . read ( size , ( err : Error , rows : any [ ] ) => {
56
56
if ( err ) {
57
+ // https://nodejs.org/api/stream.html#stream_errors_while_reading
57
58
this . destroy ( err )
58
59
} else {
59
60
for ( const row of rows ) this . push ( row )
@@ -63,4 +64,4 @@ class PgQueryStream extends Readable implements Submittable {
63
64
}
64
65
}
65
66
66
- export = PgQueryStream
67
+ export = QueryStream
0 commit comments