1
- import pg , { PoolConfig } from 'pg'
2
- import { DatabaseError } from 'pg-protocol'
1
+ import pg from 'pg'
3
2
import { parse as parseArray } from 'postgres-array'
4
- import { PostgresMetaResult } from './types.js'
3
+ import { PostgresMetaResult , PoolConfig } from './types.js'
5
4
6
5
pg . types . setTypeParser ( pg . types . builtins . INT8 , ( x ) => {
7
6
const asNumber = Number ( x )
@@ -21,6 +20,42 @@ pg.types.setTypeParser(1185, parseArray) // _timestamptz
21
20
pg . types . setTypeParser ( 600 , ( x ) => x ) // point
22
21
pg . types . setTypeParser ( 1017 , ( x ) => x ) // _point
23
22
23
+ // Ensure any query will have an appropriate error handler on the pool to prevent connections errors
24
+ // to bubble up all the stack eventually killing the server
25
+ const poolerQueryHandleError = ( pgpool : pg . Pool , sql : string ) : Promise < pg . QueryResult < any > > => {
26
+ return new Promise ( ( resolve , reject ) => {
27
+ let rejected = false
28
+ const connectionErrorHandler = ( err : any ) => {
29
+ // If the error hasn't already be propagated to the catch
30
+ if ( ! rejected ) {
31
+ // This is a trick to wait for the next tick, leaving a chance for handled errors such as
32
+ // RESULT_SIZE_LIMIT to take over other stream errors such as `unexpected commandComplete message`
33
+ setTimeout ( ( ) => {
34
+ rejected = true
35
+ return reject ( err )
36
+ } )
37
+ }
38
+ }
39
+ // This listened avoid getting uncaught exceptions for errors happening at connection level within the stream
40
+ // such as parse or RESULT_SIZE_EXCEEDED errors instead, handle the error gracefully by bubbling in up to the caller
41
+ pgpool . once ( 'error' , connectionErrorHandler )
42
+ pgpool
43
+ . query ( sql )
44
+ . then ( ( results : pg . QueryResult < any > ) => {
45
+ if ( ! rejected ) {
46
+ return resolve ( results )
47
+ }
48
+ } )
49
+ . catch ( ( err : any ) => {
50
+ // If the error hasn't already be handled within the error listener
51
+ if ( ! rejected ) {
52
+ rejected = true
53
+ return reject ( err )
54
+ }
55
+ } )
56
+ } )
57
+ }
58
+
24
59
export const init : ( config : PoolConfig ) => {
25
60
query : ( sql : string ) => Promise < PostgresMetaResult < any > >
26
61
end : ( ) => Promise < void >
@@ -60,26 +95,27 @@ export const init: (config: PoolConfig) => {
60
95
// compromise: if we run `query` after `pool.end()` is called (i.e. pool is
61
96
// `null`), we temporarily create a pool and close it right after.
62
97
let pool : pg . Pool | null = new pg . Pool ( config )
98
+
63
99
return {
64
100
async query ( sql ) {
65
101
try {
66
102
if ( ! pool ) {
67
103
const pool = new pg . Pool ( config )
68
- let res = await pool . query ( sql )
104
+ let res = await poolerQueryHandleError ( pool , sql )
69
105
if ( Array . isArray ( res ) ) {
70
106
res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
71
107
}
72
108
await pool . end ( )
73
109
return { data : res . rows , error : null }
74
110
}
75
111
76
- let res = await pool . query ( sql )
112
+ let res = await poolerQueryHandleError ( pool , sql )
77
113
if ( Array . isArray ( res ) ) {
78
114
res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
79
115
}
80
116
return { data : res . rows , error : null }
81
117
} catch ( error : any ) {
82
- if ( error instanceof DatabaseError ) {
118
+ if ( error . constructor . name === ' DatabaseError' ) {
83
119
// Roughly based on:
84
120
// - https://github.com/postgres/postgres/blob/fc4089f3c65a5f1b413a3299ba02b66a8e5e37d0/src/interfaces/libpq/fe-protocol3.c#L1018
85
121
// - https://github.com/brianc/node-postgres/blob/b1a8947738ce0af004cb926f79829bb2abc64aa6/packages/pg/lib/native/query.js#L33
@@ -146,17 +182,42 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
146
182
} ,
147
183
}
148
184
}
149
-
150
- return { data : null , error : { message : error . message } }
185
+ try {
186
+ // Handle stream errors and result size exceeded errors
187
+ if ( error . code === 'RESULT_SIZE_EXCEEDED' ) {
188
+ // Force kill the connection without waiting for graceful shutdown
189
+ return {
190
+ data : null ,
191
+ error : {
192
+ message : `Query result size (${ error . resultSize } bytes) exceeded the configured limit (${ error . maxResultSize } bytes)` ,
193
+ code : error . code ,
194
+ resultSize : error . resultSize ,
195
+ maxResultSize : error . maxResultSize ,
196
+ } ,
197
+ }
198
+ }
199
+ return { data : null , error : { code : error . code , message : error . message } }
200
+ } finally {
201
+ // If the error isn't a "DatabaseError" assume it's a connection related we kill the connection
202
+ // To attempt a clean reconnect on next try
203
+ await this . end ( )
204
+ }
151
205
}
152
206
} ,
153
207
154
208
async end ( ) {
155
- const _pool = pool
156
- pool = null
157
- // Gracefully wait for active connections to be idle, then close all
158
- // connections in the pool.
159
- if ( _pool ) await _pool . end ( )
209
+ try {
210
+ const _pool = pool
211
+ pool = null
212
+ // Gracefully wait for active connections to be idle, then close all
213
+ // connections in the pool.
214
+ if ( _pool ) {
215
+ await _pool . end ( )
216
+ }
217
+ } catch ( endError ) {
218
+ // Ignore any errors during cleanup just log them
219
+ console . error ( 'Failed ending connection pool' , endError )
220
+ }
160
221
} ,
161
222
}
162
223
}
0 commit comments