@@ -32,8 +32,10 @@ var Client = module.exports = function (config) {
32
32
} )
33
33
34
34
this . _queryQueue = [ ]
35
- this . _connected = false
35
+ this . _ending = false
36
36
this . _connecting = false
37
+ this . _connected = false
38
+ this . _queryable = true
37
39
38
40
// keep these on the object for legacy reasons
39
41
// for the time being. TODO: deprecate all this jazz
@@ -52,50 +54,48 @@ Client.Query = NativeQuery
52
54
53
55
util . inherits ( Client , EventEmitter )
54
56
57
+ Client . prototype . _errorAllQueries = function ( err ) {
58
+ const enqueueError = ( query ) => {
59
+ process . nextTick ( ( ) => {
60
+ query . native = this . native
61
+ query . handleError ( err )
62
+ } )
63
+ }
64
+
65
+ if ( this . _hasActiveQuery ( ) ) {
66
+ enqueueError ( this . _activeQuery )
67
+ this . _activeQuery = null
68
+ }
69
+
70
+ this . _queryQueue . forEach ( enqueueError )
71
+ this . _queryQueue . length = 0
72
+ }
73
+
55
74
// connect to the backend
56
75
// pass an optional callback to be called once connected
57
76
// or with an error if there was a connection error
58
- // if no callback is passed and there is a connection error
59
- // the client will emit an error event.
60
- Client . prototype . connect = function ( cb ) {
77
+ Client . prototype . _connect = function ( cb ) {
61
78
var self = this
62
79
63
- var onError = function ( err ) {
64
- if ( cb ) return cb ( err )
65
- return self . emit ( 'error' , err )
66
- }
67
-
68
- var result
69
- if ( ! cb ) {
70
- var resolveOut , rejectOut
71
- cb = ( err ) => err ? rejectOut ( err ) : resolveOut ( )
72
- result = new global . Promise ( function ( resolve , reject ) {
73
- resolveOut = resolve
74
- rejectOut = reject
75
- } )
76
- }
77
-
78
80
if ( this . _connecting ) {
79
81
process . nextTick ( ( ) => cb ( new Error ( 'Client has already been connected. You cannot reuse a client.' ) ) )
80
- return result
82
+ return
81
83
}
82
84
83
85
this . _connecting = true
84
86
85
87
this . connectionParameters . getLibpqConnectionString ( function ( err , conString ) {
86
- if ( err ) return onError ( err )
88
+ if ( err ) return cb ( err )
87
89
self . native . connect ( conString , function ( err ) {
88
- if ( err ) return onError ( err )
90
+ if ( err ) return cb ( err )
89
91
90
92
// set internal states to connected
91
93
self . _connected = true
92
94
93
95
// handle connection errors from the native layer
94
96
self . native . on ( 'error' , function ( err ) {
95
- // error will be handled by active query
96
- if ( self . _activeQuery && self . _activeQuery . state !== 'end' ) {
97
- return
98
- }
97
+ self . _queryable = false
98
+ self . _errorAllQueries ( err )
99
99
self . emit ( 'error' , err )
100
100
} )
101
101
@@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) {
110
110
self . emit ( 'connect' )
111
111
self . _pulseQueryQueue ( true )
112
112
113
- // possibly call the optional callback
114
- if ( cb ) cb ( )
113
+ cb ( )
115
114
} )
116
115
} )
116
+ }
117
117
118
- return result
118
+ Client . prototype . connect = function ( callback ) {
119
+ if ( callback ) {
120
+ this . _connect ( callback )
121
+ return
122
+ }
123
+
124
+ return new Promise ( ( resolve , reject ) => {
125
+ this . _connect ( ( error ) => {
126
+ if ( error ) {
127
+ reject ( error )
128
+ } else {
129
+ resolve ( )
130
+ }
131
+ } )
132
+ } )
119
133
}
120
134
121
135
// send a query to the server
@@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) {
129
143
// optional string rowMode = 'array' for an array of results
130
144
// }
131
145
Client . prototype . query = function ( config , values , callback ) {
146
+ var query
147
+ var result
148
+
132
149
if ( typeof config . submit === 'function' ) {
150
+ result = query = config
133
151
// accept query(new Query(...), (err, res) => { }) style
134
152
if ( typeof values === 'function' ) {
135
153
config . callback = values
136
154
}
137
- this . _queryQueue . push ( config )
138
- this . _pulseQueryQueue ( )
139
- return config
155
+ } else {
156
+ query = new NativeQuery ( config , values , callback )
157
+ if ( ! query . callback ) {
158
+ let resolveOut , rejectOut
159
+ result = new Promise ( ( resolve , reject ) => {
160
+ resolveOut = resolve
161
+ rejectOut = reject
162
+ } )
163
+ query . callback = ( err , res ) => err ? rejectOut ( err ) : resolveOut ( res )
164
+ }
140
165
}
141
166
142
- var query = new NativeQuery ( config , values , callback )
143
- var result
144
- if ( ! query . callback ) {
145
- let resolveOut , rejectOut
146
- result = new Promise ( ( resolve , reject ) => {
147
- resolveOut = resolve
148
- rejectOut = reject
167
+ if ( ! this . _queryable ) {
168
+ query . native = this . native
169
+ process . nextTick ( ( ) => {
170
+ query . handleError ( new Error ( 'Client has encountered a connection error and is not queryable' ) )
171
+ } )
172
+ return result
173
+ }
174
+
175
+ if ( this . _ending ) {
176
+ query . native = this . native
177
+ process . nextTick ( ( ) => {
178
+ query . handleError ( new Error ( 'Client was closed and is not queryable' ) )
149
179
} )
150
- query . callback = ( err , res ) => err ? rejectOut ( err ) : resolveOut ( res )
180
+ return result
151
181
}
182
+
152
183
this . _queryQueue . push ( query )
153
184
this . _pulseQueryQueue ( )
154
185
return result
@@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) {
157
188
// disconnect from the backend server
158
189
Client . prototype . end = function ( cb ) {
159
190
var self = this
191
+
192
+ this . _ending = true
193
+
160
194
if ( ! this . _connected ) {
161
195
this . once ( 'connect' , this . end . bind ( this , cb ) )
162
196
}
@@ -170,12 +204,7 @@ Client.prototype.end = function (cb) {
170
204
} )
171
205
}
172
206
this . native . end ( function ( ) {
173
- // send an error to the active query
174
- if ( self . _hasActiveQuery ( ) ) {
175
- var msg = 'Connection terminated'
176
- self . _queryQueue . length = 0
177
- self . _activeQuery . handleError ( new Error ( msg ) )
178
- }
207
+ self . _errorAllQueries ( new Error ( 'Connection terminated' ) )
179
208
self . emit ( 'end' )
180
209
if ( cb ) cb ( )
181
210
} )
0 commit comments