Skip to content
This repository was archived by the owner on Mar 5, 2025. It is now read-only.

Commit 5c5e95d

Browse files
authored
Merge pull request #3135 from bloq/reconnect-resubscribe
Full support for websocket reconnection/resubscription
2 parents 9dd09db + 66f7f74 commit 5c5e95d

File tree

5 files changed

+114
-45
lines changed

5 files changed

+114
-45
lines changed

packages/web3-core-requestmanager/src/index.js

+19-13
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,16 @@ RequestManager.prototype.setProvider = function (p, net) {
103103
_this.subscriptions[result.params.subscription].callback(null, result.params.result);
104104
}
105105
});
106-
// TODO add error, end, timeout, connect??
107-
// this.provider.on('error', function requestManagerNotification(result){
108-
// Object.keys(_this.subscriptions).forEach(function(id){
109-
// if(_this.subscriptions[id].callback)
110-
// _this.subscriptions[id].callback(err);
111-
// });
112-
// }
106+
107+
// notify all subscriptions about the error condition
108+
this.provider.on('error', function (event) {
109+
Object.keys(_this.subscriptions).forEach(function(id){
110+
if(_this.subscriptions[id] && _this.subscriptions[id].callback)
111+
_this.subscriptions[id].callback(event.code || new Error('Provider error'));
112+
});
113+
});
114+
115+
// TODO add end, timeout, connect??
113116
}
114117
};
115118

@@ -205,17 +208,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) {
205208
* @param {Function} callback fired once the subscription is removed
206209
*/
207210
RequestManager.prototype.removeSubscription = function (id, callback) {
208-
var _this = this;
209-
210211
if(this.subscriptions[id]) {
212+
var type = this.subscriptions[id].type;
213+
214+
// remove subscription first to avoid reentry
215+
delete this.subscriptions[id];
211216

217+
// then, try to actually unsubscribe
212218
this.send({
213-
method: this.subscriptions[id].type + '_unsubscribe',
219+
method: type + '_unsubscribe',
214220
params: [id]
215221
}, callback);
216-
217-
// remove subscription
218-
delete _this.subscriptions[id];
222+
} else if (typeof callback === 'function') {
223+
// call the callback if the subscription was already removed
224+
callback(null);
219225
}
220226
};
221227

packages/web3-core-subscriptions/src/subscription.js

+43-23
Original file line numberDiff line numberDiff line change
@@ -272,37 +272,57 @@ Subscription.prototype.subscribe = function() {
272272
_this.callback(null, output, _this);
273273
});
274274
} else {
275-
// unsubscribe, but keep listeners
276-
_this.options.requestManager.removeSubscription(_this.id);
277-
278-
// re-subscribe, if connection fails
279-
if(_this.options.requestManager.provider.once) {
280-
_this._reconnectIntervalId = setInterval(function () {
281-
// TODO check if that makes sense!
282-
if (_this.options.requestManager.provider.reconnect) {
283-
_this.options.requestManager.provider.reconnect();
284-
}
285-
}, 500);
286-
287-
_this.options.requestManager.provider.once('connect', function () {
288-
clearInterval(_this._reconnectIntervalId);
289-
_this.subscribe(_this.callback);
290-
});
291-
}
292-
_this.emit('error', err);
293-
294-
// call the callback, last so that unsubscribe there won't affect the emit above
295-
_this.callback(err, null, _this);
275+
_this._resubscribe(err);
296276
}
297277
});
278+
279+
// just in case the provider reconnects silently, resubscribe over the new connection
280+
if (_this.options.requestManager.provider.once) {
281+
_this.options.requestManager.provider.once('connect', function () {
282+
_this._resubscribe();
283+
});
284+
}
298285
} else {
299-
_this.callback(err, null, _this);
300-
_this.emit('error', err);
286+
_this._resubscribe(err);
301287
}
302288
});
303289

304290
// return an object to cancel the subscription
305291
return this;
306292
};
307293

294+
Subscription.prototype._resubscribe = function (err) {
295+
var _this = this;
296+
297+
// unsubscribe
298+
this.options.requestManager.removeSubscription(this.id);
299+
300+
// re-subscribe, if connection fails
301+
if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) {
302+
this._reconnectIntervalId = setInterval(function () {
303+
// TODO check if that makes sense!
304+
if (_this.options.requestManager.provider.reconnect) {
305+
_this.options.requestManager.provider.reconnect();
306+
}
307+
}, 500);
308+
309+
this.options.requestManager.provider.once('connect', function () {
310+
clearInterval(_this._reconnectIntervalId);
311+
_this._reconnectIntervalId = null;
312+
313+
// delete id to keep the listeners on subscribe
314+
_this.id = null;
315+
316+
_this.subscribe(_this.callback);
317+
});
318+
}
319+
320+
if (err) {
321+
this.emit('error', err);
322+
}
323+
324+
// call the callback, last so that unsubscribe there won't affect the emit above
325+
this.callback(err, null, this);
326+
};
327+
308328
module.exports = Subscription;

packages/web3-providers-ws/README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ var Web3WsProvider = require('web3-providers-ws');
3232

3333
var options = {
3434
timeout: 30000,
35-
headers: { authorization: 'Basic username:password' }
36-
}; // set a custom timeout at 30 seconds, and credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546)
35+
headers: { authorization: 'Basic username:password' },
36+
autoReconnect: true
37+
}; // set a custom timeout at 30 seconds, credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546), and enable WebSocket auto-reconnection
3738
var ws = new Web3WsProvider('ws://localhost:8546', options);
3839
```
3940

packages/web3-providers-ws/package.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
"dependencies": {
1616
"underscore": "1.9.1",
1717
"web3-core-helpers": "1.2.2",
18-
"websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis"
18+
"websocket": "github:web3-js/WebSocket-Node#polyfill/globalThis",
19+
"websocket-reconnector": "1.1.1"
1920
},
2021
"devDependencies": {
2122
"definitelytyped-header-parser": "^1.0.1",
2223
"dtslint": "0.4.2"
2324
}
24-
}
25+
}

packages/web3-providers-ws/src/index.js

+46-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
var _ = require('underscore');
2626
var errors = require('web3-core-helpers').errors;
2727
var Ws = require('websocket').w3cwebsocket;
28+
var WsReconnector = require('websocket-reconnector');
2829

2930
var isNode = Object.prototype.toString.call(typeof process !== 'undefined' ? process : 0) === '[object process]';
3031

@@ -86,6 +87,11 @@ var WebsocketProvider = function WebsocketProvider(url, options) {
8687
// https://github.com/theturtle32/WebSocket-Node/blob/master/docs/WebSocketClient.md#connectrequesturl-requestedprotocols-origin-headers-requestoptions
8788
var requestOptions = options.requestOptions || undefined;
8889

90+
// Enable automatic reconnection wrapping `Ws` with reconnector
91+
if (options.autoReconnect) {
92+
Ws = WsReconnector(Ws);
93+
}
94+
8995
// When all node core implementations that do not have the
9096
// WHATWG compatible URL parser go out of service this line can be removed.
9197
if (parsedURL.auth) {
@@ -238,7 +244,13 @@ WebsocketProvider.prototype._addResponseCallback = function(payload, callback) {
238244
setTimeout(function () {
239245
if (_this.responseCallbacks[id]) {
240246
_this.responseCallbacks[id](errors.ConnectionTimeout(_this._customTimeout));
247+
241248
delete _this.responseCallbacks[id];
249+
250+
// try to reconnect
251+
if (_this.connection.reconnect) {
252+
_this.connection.reconnect();
253+
}
242254
}
243255
}, this._customTimeout);
244256
}
@@ -305,15 +317,15 @@ WebsocketProvider.prototype.on = function (type, callback) {
305317
break;
306318

307319
case 'connect':
308-
this.connection.onopen = callback;
320+
this.connection.addEventListener('open', callback);
309321
break;
310322

311323
case 'end':
312-
this.connection.onclose = callback;
324+
this.connection.addEventListener('close', callback);
313325
break;
314326

315327
case 'error':
316-
this.connection.onerror = callback;
328+
this.connection.addEventListener('error', callback);
317329
break;
318330

319331
// default:
@@ -322,7 +334,26 @@ WebsocketProvider.prototype.on = function (type, callback) {
322334
}
323335
};
324336

325-
// TODO add once
337+
/**
338+
Subscribes to provider only once
339+
340+
@method once
341+
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
342+
@param {Function} callback the callback to call
343+
*/
344+
WebsocketProvider.prototype.once = function (type, callback) {
345+
var _this = this;
346+
347+
function onceCallback(event) {
348+
setTimeout(function () {
349+
_this.removeListener(type, onceCallback);
350+
}, 0);
351+
352+
callback(event);
353+
}
354+
355+
this.on(type, onceCallback);
356+
};
326357

327358
/**
328359
Removes event listener
@@ -342,7 +373,17 @@ WebsocketProvider.prototype.removeListener = function (type, callback) {
342373
});
343374
break;
344375

345-
// TODO remvoving connect missing
376+
case 'connect':
377+
this.connection.removeEventListener('open', callback);
378+
break;
379+
380+
case 'end':
381+
this.connection.removeEventListener('close', callback);
382+
break;
383+
384+
case 'error':
385+
this.connection.removeEventListener('error', callback);
386+
break;
346387

347388
// default:
348389
// this.connection.removeListener(type, callback);

0 commit comments

Comments
 (0)