From d3c01b1b3e239c1e3cc262bfbba7dc98fddcacba Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Fri, 31 Mar 2017 17:07:48 +0700 Subject: [PATCH 1/5] Support for logical/streaming replication mode --- lib/client.js | 4 ++++ lib/connection-parameters.js | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/lib/client.js b/lib/client.js index 5d365dab5..13a62576e 100644 --- a/lib/client.js +++ b/lib/client.js @@ -26,6 +26,7 @@ var Client = function(config) { this.port = this.connectionParameters.port; this.host = this.connectionParameters.host; this.password = this.connectionParameters.password; + this.replication = this.connectionParameters.replication; var c = config || {}; @@ -222,6 +223,9 @@ Client.prototype.getStartupConf = function() { if (appName) { data.application_name = appName; } + if (params.replication) { + data.replication = params.replication === true ? 'true' : params.replication; + } return data; }; diff --git a/lib/connection-parameters.js b/lib/connection-parameters.js index f5126a644..fa1329468 100644 --- a/lib/connection-parameters.js +++ b/lib/connection-parameters.js @@ -57,6 +57,7 @@ var ConnectionParameters = function(config) { this.binary = val('binary', config); this.ssl = typeof config.ssl === 'undefined' ? useSsl() : config.ssl; this.client_encoding = val("client_encoding", config); + this.replication = val("replication", config); //a domain socket begins with '/' this.isDomainSocket = (!(this.host||'').indexOf('/')); @@ -82,6 +83,9 @@ ConnectionParameters.prototype.getLibpqConnectionString = function(cb) { if(this.database) { params.push("dbname='" + this.database + "'"); } + if(this.replication) { + params.push("replication='" + (this.database === true ? "true" : this.replication) + "'"); + } if(this.host) { params.push("host=" + this.host); } From 1437fc3f1c2327c4dc49c7b484590d3240a1e098 Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Mon, 3 Apr 2017 02:23:19 +0700 Subject: [PATCH 2/5] - Support for Logical Replication Stream - Added WalStream class --- lib/connection.js | 3 ++ lib/index.js | 2 ++ lib/wallstream.js | 92 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100644 lib/wallstream.js diff --git a/lib/connection.js b/lib/connection.js index 59247a7c4..7318287c2 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -425,6 +425,9 @@ Connection.prototype.parseMessage = function(buffer) { case 0x48: //H return this.parseH(buffer, length); + case 0x57: //W + return new Message('replicationStart', length); + case 0x63: //c return new Message('copyDone', length); diff --git a/lib/index.js b/lib/index.js index b2372cf4f..5b4a9eff8 100644 --- a/lib/index.js +++ b/lib/index.js @@ -13,6 +13,7 @@ var defaults = require('./defaults'); var Connection = require('./connection'); var ConnectionParameters = require('./connection-parameters'); var poolFactory = require('./pool-factory'); +var WalStream = require('./wallstream'); var PG = function(clientConstructor) { EventEmitter.call(this); @@ -23,6 +24,7 @@ var PG = function(clientConstructor) { this._pools = []; this.Connection = Connection; this.types = require('pg-types'); + this.WalStream = WalStream; }; util.inherits(PG, EventEmitter); diff --git a/lib/wallstream.js b/lib/wallstream.js new file mode 100644 index 000000000..33417ef68 --- /dev/null +++ b/lib/wallstream.js @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2017 Kibae Shin (nonunnet@gmail.com) + * All rights reserved. + * + * This source code is licensed under the MIT license found in the + * README.md file in the root directory of this source tree. + */ +var EventEmitter = require('events').EventEmitter; +var Client = require('./client'); +var util = require('util'); + +var WalStream = function(config) { + EventEmitter.call(this); + + var c = config || {}; + c.replication = 'database'; + + var self = this; + + var client; + var stoped = false; + this.getChanges = function(slotName, uptoLsn, option, cb /*(err)*/) { + option = option || {}; + /* + * includeXids : include xid on BEGIN and COMMIT, default false + * includeTimestamp : include timestamp on COMMIT, default false + * skipEmptyXacts : skip empty transaction like DDL, default true + */ + + stoped = false; + client = new Client(config); + + client.on('error', function(err) { + self.emit('error', err); + }); + + client.connect(function(err) { + //error handling + if (err) { + self.emit('error', err); + return; + } + + var sql = 'START_REPLICATION SLOT ' + slotName + ' LOGICAL ' + (uptoLsn ? uptoLsn : '0/00000000'); + var opts = [ + '"include-xids" \'' + (option.includeXids === true ? 'on' : 'off') + '\'', + '"include-timestamp" \'' + (option.includeTimestamp === true ? 'on' : 'off') + '\'', + '"skip-empty-xacts" \'' + (option.skipEmptyXacts !== false ? 'on' : 'off') + '\'', + ]; + sql += ' (' + (opts.join(' , ')) + ')'; + + client.query(sql, function(err) { + if (err) { + if (!stoped && cb) { + cb(err); + cb = null; + } + } + cb = null; + }); + + client.connection.once('replicationStart', function() { + //start + self.emit('start', self); + client.connection.on('copyData', function(msg) { + if (msg.chunk[0] != 0x77) { + return; + } + + var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase()); + self.emit('data', { + lsn: lsn, + log: msg.chunk.slice(25), + }); + }); + }); + }); + return self; + }; + + this.stop = function() { + stoped = true; + if (client) { + client.end(); + client = null; + } + }; +}; + +util.inherits(WalStream, EventEmitter); + +module.exports = WalStream; From 35f35eb75ab695c86514ac57f578556f90c139ca Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Mon, 3 Apr 2017 02:23:38 +0700 Subject: [PATCH 3/5] WalStream test/sample code --- test/test-walstream.js | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 test/test-walstream.js diff --git a/test/test-walstream.js b/test/test-walstream.js new file mode 100644 index 000000000..ba15af531 --- /dev/null +++ b/test/test-walstream.js @@ -0,0 +1,32 @@ +'use strict'; + +const WalStream = require(__dirname + '/../lib').WalStream; + +var lastLsn = null; + +var walStream = new WalStream({}); +function proc() { + walStream.getChanges('test_slot', lastLsn, { + includeXids: false, //default: false + includeTimestamp: false, //default: false + skipEmptyXacts: true, //default: true + }, function(err) { + if (err) { + console.log('Logical replication initialize error', err); + setTimeout(proc, 1000); + } + }); +} + +walStream.on('data', function(msg) { + lastLsn = msg.lsn || lastLsn; + console.log('log recv', msg); +}).on('error', function(err) { + console.log('Error #2', err); + setTimeout(proc, 1000); +}); + +proc(); + +//If want to stop replication +//walStream.stop(); From 1ff021921ed004f97b7019a5efc3039006554540 Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Mon, 3 Apr 2017 02:23:38 +0700 Subject: [PATCH 4/5] WalStream test/sample code --- test/test-walstream.js | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 test/test-walstream.js diff --git a/test/test-walstream.js b/test/test-walstream.js new file mode 100644 index 000000000..f1123ec78 --- /dev/null +++ b/test/test-walstream.js @@ -0,0 +1,34 @@ +'use strict'; + +const WalStream = require(__dirname + '/../lib').WalStream; + +var lastLsn = null; + +var walStream = new WalStream({}); +function proc() { + walStream.getChanges('test_slot', lastLsn, { + includeXids: false, //default: false + includeTimestamp: false, //default: false + skipEmptyXacts: true, //default: true + }, function(err) { + if (err) { + console.log('Logical replication initialize error', err); + setTimeout(proc, 1000); + } + }); +} + +walStream.on('data', function(msg) { + lastLsn = msg.lsn || lastLsn; + + //DO SOMETHING. eg) replicate to other dbms(pgsql, mysql, ...) + console.log('log recv', msg); +}).on('error', function(err) { + console.log('Error #2', err); + setTimeout(proc, 1000); +}); + +proc(); + +//If want to stop replication +//walStream.stop(); From 0aed4d01c6e63daa9c9f3fff8a5e44338a76113e Mon Sep 17 00:00:00 2001 From: Kibae Shin Date: Mon, 3 Apr 2017 02:48:06 +0700 Subject: [PATCH 5/5] Update test-walstream.js --- test/test-walstream.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test-walstream.js b/test/test-walstream.js index ba15af531..544d0e332 100644 --- a/test/test-walstream.js +++ b/test/test-walstream.js @@ -20,6 +20,8 @@ function proc() { walStream.on('data', function(msg) { lastLsn = msg.lsn || lastLsn; + + //DO SOMETHING. eg) replicate to other dbms(pgsql, mysql, ...) console.log('log recv', msg); }).on('error', function(err) { console.log('Error #2', err);