Skip to content

Multiple-statement Support #1110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ Client.prototype.copyTo = function (text) {
Client.prototype.query = function(config, values, callback) {
//can take in strings, config object or query object
var query = (typeof config.submit == 'function') ? config :
new Query(config, values, callback);
new Query(config, values, callback,this.connectionParameters.multipleStatementResult);
if(this.binary && !query.binary) {
query.binary = true;
}
Expand Down
1 change: 1 addition & 0 deletions lib/connection-parameters.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var ConnectionParameters = function(config) {

this.application_name = val('application_name', config, 'PGAPPNAME');
this.fallback_application_name = val('fallback_application_name', config, false);
this.multipleStatementResult = val('multipleStatementResult',config);
};

var add = function(params, config, paramName) {
Expand Down
119 changes: 77 additions & 42 deletions lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ var util = require('util');

var Result = require('./result');
var utils = require('./utils');

var Query = function(config, values, callback) {
var configGlobal;
var Query = function (config, values, callback, multipleStatementResult) {
// use of "new" optional
if(!(this instanceof Query)) { return new Query(config, values, callback); }
if (!(this instanceof Query)) { return new Query(config, values, callback); }

config = utils.normalizeQueryConfig(config, values, callback);

Expand All @@ -28,46 +28,53 @@ var Query = function(config, values, callback) {
//use unique portal name each time
this.portal = config.portal || "";
this.callback = config.callback;
if(process.domain && config.callback) {
if (process.domain && config.callback) {
this.callback = process.domain.bind(config.callback);
}
this._result = new Result(config.rowMode, config.types);
this.multipleStatementResult = multipleStatementResult;
if (this.multipleStatementResult == true) {
this._result = new Array();
}
else {
this._result = new Result(config.rowMode, config.types);
}
this.isPreparedStatement = false;
this._canceledDueToError = false;
this._promise = null;
configGlobal = config;
EventEmitter.call(this);
};

util.inherits(Query, EventEmitter);

Query.prototype.then = function(onSuccess, onFailure) {
Query.prototype.then = function (onSuccess, onFailure) {
return this.promise().then(onSuccess, onFailure);
};

Query.prototype.catch = function(callback) {
Query.prototype.catch = function (callback) {
return this.promise().catch(callback);
};

Query.prototype.promise = function() {
Query.prototype.promise = function () {
if (this._promise) return this._promise;
this._promise = new Promise(function(resolve, reject) {
this._promise = new Promise(function (resolve, reject) {
this.once('end', resolve);
this.once('error', reject);
}.bind(this));
return this._promise;
};

Query.prototype.requiresPreparation = function() {
Query.prototype.requiresPreparation = function () {
//named queries must always be prepared
if(this.name) { return true; }
if (this.name) { return true; }
//always prepare if there are max number of rows expected per
//portal execution
if(this.rows) { return true; }
if (this.rows) { return true; }
//don't prepare empty text queries
if(!this.text) { return false; }
if (!this.text) { return false; }
//binary should be prepared to specify results should be in binary
//unless there are no parameters
if(this.binary && !this.values) { return false; }
if (this.binary && !this.values) { return false; }
//prepare if there are values
return (this.values || 0).length > 0;
};
Expand All @@ -76,102 +83,130 @@ Query.prototype.requiresPreparation = function() {
//associates row metadata from the supplied
//message with this query object
//metadata used when parsing row results
Query.prototype.handleRowDescription = function(msg) {
this._result.addFields(msg.fields);
Query.prototype.handleRowDescription = function (msg) {
this._accumulateRows = this.callback || !this.listeners('row').length;
if (this.multipleStatementResult == true) {
this.currentResult = new Result(configGlobal.rowMode, configGlobal.types);
this.currentResult.completed = false;
this.currentResult.addFields(msg.fields);
}
else {
this._result.addFields(msg.fields);
}
};

Query.prototype.handleDataRow = function(msg) {
var row = this._result.parseRow(msg.fields);
this.emit('row', row, this._result);
if (this._accumulateRows) {
this._result.addRow(row);
Query.prototype.handleDataRow = function (msg) {
if (this.multipleStatementResult == true) {
var row = this.currentResult.parseRow(msg.fields);
this.emit('row', row, this.currentResult);
if (this._accumulateRows) {
this.currentResult.addRow(row);
}
}
else {
var row = this._result.parseRow(msg.fields);
this.emit('row', row, this._result);
if (this._accumulateRows) {
this._result.addRow(row);
}
}

};

Query.prototype.handleCommandComplete = function(msg, con) {
this._result.addCommandComplete(msg);
Query.prototype.handleCommandComplete = function (msg, con) {
if (this.multipleStatementResult == true) {
if(this.currentResult == null || this.currentResult.completed == true){
this.currentResult = new Result(configGlobal.rowMode, configGlobal.types);
}
this.currentResult.addCommandComplete(msg);
this._result.push(this.currentResult);
this.currentResult.completed = true;
}
else {
this._result.addCommandComplete(msg);
}

//need to sync after each command complete of a prepared statement
if(this.isPreparedStatement) {
if (this.isPreparedStatement) {
con.sync();
}
};

//if a named prepared statement is created with empty query text
//the backend will send an emptyQuery message but *not* a command complete message
//execution on the connection will hang until the backend receives a sync message
Query.prototype.handleEmptyQuery = function(con) {
Query.prototype.handleEmptyQuery = function (con) {
if (this.isPreparedStatement) {
con.sync();
}
};

Query.prototype.handleReadyForQuery = function() {
if(this._canceledDueToError) {
Query.prototype.handleReadyForQuery = function () {
if (this._canceledDueToError) {
return this.handleError(this._canceledDueToError);
}
if(this.callback) {
if (this.callback) {
this.callback(null, this._result);
}
this.emit('end', this._result);
};

Query.prototype.handleError = function(err, connection) {
Query.prototype.handleError = function (err, connection) {
//need to sync after error during a prepared statement
if(this.isPreparedStatement) {
if (this.isPreparedStatement) {
connection.sync();
}
if(this._canceledDueToError) {
if (this._canceledDueToError) {
err = this._canceledDueToError;
this._canceledDueToError = false;
}
//if callback supplied do not emit error event as uncaught error
//events will bubble up to node process
if(this.callback) {
if (this.callback) {
return this.callback(err);
}
this.emit('error', err);
};

Query.prototype.submit = function(connection) {
if(this.requiresPreparation()) {
Query.prototype.submit = function (connection) {
if (this.requiresPreparation()) {
this.prepare(connection);
} else {
connection.query(this.text);
}
};

Query.prototype.hasBeenParsed = function(connection) {
Query.prototype.hasBeenParsed = function (connection) {
return this.name && connection.parsedStatements[this.name];
};

Query.prototype.handlePortalSuspended = function(connection) {
Query.prototype.handlePortalSuspended = function (connection) {
this._getRows(connection, this.rows);
};

Query.prototype._getRows = function(connection, rows) {
Query.prototype._getRows = function (connection, rows) {
connection.execute({
portal: this.portalName,
rows: rows
}, true);
connection.flush();
};

Query.prototype.prepare = function(connection) {
Query.prototype.prepare = function (connection) {
var self = this;
//prepared statements need sync to be called after each command
//complete or when an error is encountered
this.isPreparedStatement = true;
//TODO refactor this poor encapsulation
if(!this.hasBeenParsed(connection)) {
if (!this.hasBeenParsed(connection)) {
connection.parse({
text: self.text,
name: self.name,
types: self.types
}, true);
}

if(self.values) {
if (self.values) {
self.values = self.values.map(utils.prepareValue);
}

Expand All @@ -192,13 +227,13 @@ Query.prototype.prepare = function(connection) {
};

Query.prototype.handleCopyInResponse = function (connection) {
if(this.stream) this.stream.startStreamingToConnection(connection);
if (this.stream) this.stream.startStreamingToConnection(connection);
else connection.sendCopyFail('No source stream defined');
};

Query.prototype.handleCopyData = function (msg, connection) {
var chunk = msg.chunk;
if(this.stream) {
if (this.stream) {
this.stream.handleChunk(chunk);
}
//if there are no stream (for example when copy to query was sent by
Expand Down
2 changes: 1 addition & 1 deletion lib/result.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Result.prototype.parseRow = function(rowData) {
};

Result.prototype.addRow = function(row) {
this.rows.push(row);
this.rows.push(row);
};

var inlineParser = function(fieldName, i) {
Expand Down
33 changes: 33 additions & 0 deletions sample/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
var pg = require('../lib/index.js');


var config = {
host: 'localhost',
user: 'postgres', //env var: PGUSER
database: 'postgres', //env var: PGDATABASE
password: null, //env var: PGPASSWORD
port: 5432, //env var: PGPORT
max: 10, // max number of clients in the pool
idleTimeoutMillis: 30000, // how long a client is allowed to remain idle before being closed
multipleStatementResult : true
};

var client = new pg.Client(config);

// connect to our database
client.connect(function (err) {
if (err) throw err;

// execute a query on our database
client.query('DELETE FROM users WHERE id = 1;SELECT * FROM users;select * from events;update users set id = 1 where id = 1; INSERT INTO users(id,email,name) values(332,\'[email protected]\',\'test user\');', function (err, result) {
if (err) throw err;

// just print the result to the console
console.log(result.rows[0]);

// disconnect the client
client.end(function (err) {
if (err) throw err;
});
});
});