-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathquery.js
243 lines (214 loc) · 6.93 KB
/
query.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/**
* Copyright (c) 2010-2016 Brian Carlson ([email protected])
* All rights reserved.
*
* This source code is licensed under the MIT license found in the
* README.md file in the root directory of this source tree.
*/
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Result = require('./result');
var utils = require('./utils');
var configGlobal;
var Query = function (config, values, callback, multipleStatementResult) {
// use of "new" optional
if (!(this instanceof Query)) { return new Query(config, values, callback); }
config = utils.normalizeQueryConfig(config, values, callback);
this.text = config.text;
this.values = config.values;
this.rows = config.rows;
this.types = config.types;
this.name = config.name;
this.binary = config.binary;
this.stream = config.stream;
//use unique portal name each time
this.portal = config.portal || "";
this.callback = config.callback;
if (process.domain && config.callback) {
this.callback = process.domain.bind(config.callback);
}
this.multipleStatementResult = multipleStatementResult;
if (this.multipleStatementResult == true) {
this._result = new Array();
}
else {
this._result = new Result(config.rowMode, config.types);
}
this.isPreparedStatement = false;
this._canceledDueToError = false;
this._promise = null;
configGlobal = config;
EventEmitter.call(this);
};
util.inherits(Query, EventEmitter);
Query.prototype.then = function (onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};
Query.prototype.catch = function (callback) {
return this.promise().catch(callback);
};
Query.prototype.promise = function () {
if (this._promise) return this._promise;
this._promise = new Promise(function (resolve, reject) {
this.once('end', resolve);
this.once('error', reject);
}.bind(this));
return this._promise;
};
Query.prototype.requiresPreparation = function () {
//named queries must always be prepared
if (this.name) { return true; }
//always prepare if there are max number of rows expected per
//portal execution
if (this.rows) { return true; }
//don't prepare empty text queries
if (!this.text) { return false; }
//binary should be prepared to specify results should be in binary
//unless there are no parameters
if (this.binary && !this.values) { return false; }
//prepare if there are values
return (this.values || 0).length > 0;
};
//associates row metadata from the supplied
//message with this query object
//metadata used when parsing row results
Query.prototype.handleRowDescription = function (msg) {
this._accumulateRows = this.callback || !this.listeners('row').length;
if (this.multipleStatementResult == true) {
this.currentResult = new Result(configGlobal.rowMode, configGlobal.types);
this.currentResult.completed = false;
this.currentResult.addFields(msg.fields);
}
else {
this._result.addFields(msg.fields);
}
};
Query.prototype.handleDataRow = function (msg) {
if (this.multipleStatementResult == true) {
var row = this.currentResult.parseRow(msg.fields);
this.emit('row', row, this.currentResult);
if (this._accumulateRows) {
this.currentResult.addRow(row);
}
}
else {
var row = this._result.parseRow(msg.fields);
this.emit('row', row, this._result);
if (this._accumulateRows) {
this._result.addRow(row);
}
}
};
Query.prototype.handleCommandComplete = function (msg, con) {
if (this.multipleStatementResult == true) {
if(this.currentResult == null || this.currentResult.completed == true){
this.currentResult = new Result(configGlobal.rowMode, configGlobal.types);
}
this.currentResult.addCommandComplete(msg);
this._result.push(this.currentResult);
this.currentResult.completed = true;
}
else {
this._result.addCommandComplete(msg);
}
//need to sync after each command complete of a prepared statement
if (this.isPreparedStatement) {
con.sync();
}
};
//if a named prepared statement is created with empty query text
//the backend will send an emptyQuery message but *not* a command complete message
//execution on the connection will hang until the backend receives a sync message
Query.prototype.handleEmptyQuery = function (con) {
if (this.isPreparedStatement) {
con.sync();
}
};
Query.prototype.handleReadyForQuery = function () {
if (this._canceledDueToError) {
return this.handleError(this._canceledDueToError);
}
if (this.callback) {
this.callback(null, this._result);
}
this.emit('end', this._result);
};
Query.prototype.handleError = function (err, connection) {
//need to sync after error during a prepared statement
if (this.isPreparedStatement) {
connection.sync();
}
if (this._canceledDueToError) {
err = this._canceledDueToError;
this._canceledDueToError = false;
}
//if callback supplied do not emit error event as uncaught error
//events will bubble up to node process
if (this.callback) {
return this.callback(err);
}
this.emit('error', err);
};
Query.prototype.submit = function (connection) {
if (this.requiresPreparation()) {
this.prepare(connection);
} else {
connection.query(this.text);
}
};
Query.prototype.hasBeenParsed = function (connection) {
return this.name && connection.parsedStatements[this.name];
};
Query.prototype.handlePortalSuspended = function (connection) {
this._getRows(connection, this.rows);
};
Query.prototype._getRows = function (connection, rows) {
connection.execute({
portal: this.portalName,
rows: rows
}, true);
connection.flush();
};
Query.prototype.prepare = function (connection) {
var self = this;
//prepared statements need sync to be called after each command
//complete or when an error is encountered
this.isPreparedStatement = true;
//TODO refactor this poor encapsulation
if (!this.hasBeenParsed(connection)) {
connection.parse({
text: self.text,
name: self.name,
types: self.types
}, true);
}
if (self.values) {
self.values = self.values.map(utils.prepareValue);
}
//http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
connection.bind({
portal: self.portalName,
statement: self.name,
values: self.values,
binary: self.binary
}, true);
connection.describe({
type: 'P',
name: self.portalName || ""
}, true);
this._getRows(connection, this.rows);
};
Query.prototype.handleCopyInResponse = function (connection) {
if (this.stream) this.stream.startStreamingToConnection(connection);
else connection.sendCopyFail('No source stream defined');
};
Query.prototype.handleCopyData = function (msg, connection) {
var chunk = msg.chunk;
if (this.stream) {
this.stream.handleChunk(chunk);
}
//if there are no stream (for example when copy to query was sent by
//query method instead of copyTo) error will be handled
//on copyOutResponse event, so silently ignore this error here
};
module.exports = Query;