Skip to content
This repository was archived by the owner on Mar 5, 2025. It is now read-only.

Full support for websocket reconnection/resubscription #3135

Merged
32 changes: 19 additions & 13 deletions packages/web3-core-requestmanager/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ RequestManager.prototype.setProvider = function (p, net) {
_this.subscriptions[result.params.subscription].callback(null, result.params.result);
}
});
// TODO add error, end, timeout, connect??
// this.provider.on('error', function requestManagerNotification(result){
// Object.keys(_this.subscriptions).forEach(function(id){
// if(_this.subscriptions[id].callback)
// _this.subscriptions[id].callback(err);
// });
// }

// notify all subscriptions about the error condition
this.provider.on('error', function (event) {
Object.keys(_this.subscriptions).forEach(function(id){
if(_this.subscriptions[id] && _this.subscriptions[id].callback)
_this.subscriptions[id].callback(event.code || new Error('Provider error'));
});
});

// TODO add end, timeout, connect??
}
};

Expand Down Expand Up @@ -205,17 +208,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) {
* @param {Function} callback fired once the subscription is removed
*/
RequestManager.prototype.removeSubscription = function (id, callback) {
var _this = this;

if(this.subscriptions[id]) {
var type = this.subscriptions[id].type;

// remove subscription first to avoid reentry
delete this.subscriptions[id];

// then, try to actually unsubscribe
this.send({
method: this.subscriptions[id].type + '_unsubscribe',
method: type + '_unsubscribe',
params: [id]
}, callback);

// remove subscription
delete _this.subscriptions[id];
} else if (typeof callback === 'function') {
// call the callback if the subscription was already removed
callback(null);
}
};

Expand Down
66 changes: 43 additions & 23 deletions packages/web3-core-subscriptions/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,37 +272,57 @@ Subscription.prototype.subscribe = function() {
_this.callback(null, output, _this);
});
} else {
// unsubscribe, but keep listeners
_this.options.requestManager.removeSubscription(_this.id);

// re-subscribe, if connection fails
if(_this.options.requestManager.provider.once) {
_this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
if (_this.options.requestManager.provider.reconnect) {
_this.options.requestManager.provider.reconnect();
}
}, 500);

_this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this.subscribe(_this.callback);
});
}
_this.emit('error', err);

// call the callback, last so that unsubscribe there won't affect the emit above
_this.callback(err, null, _this);
_this._resubscribe(err);
}
});

// just in case the provider reconnects silently, resubscribe over the new connection
if (_this.options.requestManager.provider.once) {
_this.options.requestManager.provider.once('connect', function () {
_this._resubscribe();
});
}
} else {
_this.callback(err, null, _this);
_this.emit('error', err);
_this._resubscribe(err);
}
});

// return an object to cancel the subscription
return this;
};

Subscription.prototype._resubscribe = function (err) {
var _this = this;

// unsubscribe
this.options.requestManager.removeSubscription(this.id);

// re-subscribe, if connection fails
if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) {
this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
if (_this.options.requestManager.provider.reconnect) {
_this.options.requestManager.provider.reconnect();
}
}, 500);

this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this._reconnectIntervalId = null;

// delete id to keep the listeners on subscribe
_this.id = null;

_this.subscribe(_this.callback);
});
}

if (err) {
this.emit('error', err);
}

// call the callback, last so that unsubscribe there won't affect the emit above
this.callback(err, null, this);
};

module.exports = Subscription;
5 changes: 3 additions & 2 deletions packages/web3-providers-ws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ var Web3WsProvider = require('web3-providers-ws');

var options = {
timeout: 30000,
headers: { authorization: 'Basic username:password' }
}; // set a custom timeout at 30 seconds, and credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546)
headers: { authorization: 'Basic username:password' },
autoReconnect: true
}; // set a custom timeout at 30 seconds, credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546), and enable WebSocket auto-reconnection
var ws = new Web3WsProvider('ws://localhost:8546', options);
```

Expand Down
5 changes: 3 additions & 2 deletions packages/web3-providers-ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
"dependencies": {
"underscore": "1.9.1",
"web3-core-helpers": "1.2.2",
"websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis"
"websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis",
"websocket-reconnector": "1.1.1"
},
"devDependencies": {
"definitelytyped-header-parser": "^1.0.1",
"dtslint": "0.4.2"
}
}
}
51 changes: 46 additions & 5 deletions packages/web3-providers-ws/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
var _ = require('underscore');
var errors = require('web3-core-helpers').errors;
var Ws = require('websocket').w3cwebsocket;
var WsReconnector = require('websocket-reconnector');

var isNode = Object.prototype.toString.call(typeof process !== 'undefined' ? process : 0) === '[object process]';

Expand Down Expand Up @@ -86,6 +87,11 @@ var WebsocketProvider = function WebsocketProvider(url, options) {
// https://github.com/theturtle32/WebSocket-Node/blob/master/docs/WebSocketClient.md#connectrequesturl-requestedprotocols-origin-headers-requestoptions
var requestOptions = options.requestOptions || undefined;

// Enable automatic reconnection wrapping `Ws` with reconnector
if (options.autoReconnect) {
Ws = WsReconnector(Ws);
}

// When all node core implementations that do not have the
// WHATWG compatible URL parser go out of service this line can be removed.
if (parsedURL.auth) {
Expand Down Expand Up @@ -238,7 +244,13 @@ WebsocketProvider.prototype._addResponseCallback = function(payload, callback) {
setTimeout(function () {
if (_this.responseCallbacks[id]) {
_this.responseCallbacks[id](errors.ConnectionTimeout(_this._customTimeout));

delete _this.responseCallbacks[id];

// try to reconnect
if (_this.connection.reconnect) {
_this.connection.reconnect();
}
}
}, this._customTimeout);
}
Expand Down Expand Up @@ -305,15 +317,15 @@ WebsocketProvider.prototype.on = function (type, callback) {
break;

case 'connect':
this.connection.onopen = callback;
this.connection.addEventListener('open', callback);
break;

case 'end':
this.connection.onclose = callback;
this.connection.addEventListener('close', callback);
break;

case 'error':
this.connection.onerror = callback;
this.connection.addEventListener('error', callback);
break;

// default:
Expand All @@ -322,7 +334,26 @@ WebsocketProvider.prototype.on = function (type, callback) {
}
};

// TODO add once
/**
Subscribes to provider only once

@method once
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
@param {Function} callback the callback to call
*/
WebsocketProvider.prototype.once = function (type, callback) {
var _this = this;

function onceCallback(event) {
setTimeout(function () {
_this.removeListener(type, onceCallback);
}, 0);

callback(event);
}

this.on(type, onceCallback);
};

/**
Removes event listener
Expand All @@ -342,7 +373,17 @@ WebsocketProvider.prototype.removeListener = function (type, callback) {
});
break;

// TODO remvoving connect missing
case 'connect':
this.connection.removeEventListener('open', callback);
break;

case 'end':
this.connection.removeEventListener('close', callback);
break;

case 'error':
this.connection.removeEventListener('error', callback);
break;

// default:
// this.connection.removeListener(type, callback);
Expand Down