Skip to content
This repository was archived by the owner on Mar 5, 2025. It is now read-only.

Commit b6c49d4

Browse files
committed
improved async polling
1 parent 16252f3 commit b6c49d4

File tree

5 files changed

+87
-55
lines changed

5 files changed

+87
-55
lines changed

lib/utils/config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ module.exports = {
7171
ETH_SIGNATURE_LENGTH: 4,
7272
ETH_UNITS: ETH_UNITS,
7373
ETH_BIGNUMBER_ROUNDING_MODE: { ROUNDING_MODE: BigNumber.ROUND_DOWN },
74-
ETH_POLLING_TIMEOUT: 1000,
74+
ETH_POLLING_TIMEOUT: 1000/2,
7575
defaultBlock: 'latest',
7676
defaultAccount: undefined
7777
};

lib/web3/filter.js

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,33 @@ var getOptions = function (options) {
7777
/**
7878
Adds the callback and sets up the methods, to iterate over the results.
7979
80-
@method addCallback
80+
@method getLogsAtStart
8181
@param {Object} self
82-
@param {funciton} callback
82+
@param {funciton}
8383
*/
84-
var addCallback = function(self, callback) {
84+
var getLogsAtStart = function(self, callback){
85+
// call getFilterLogs for the first watch callback start
86+
if (!utils.isString(self.options)) {
87+
self.get(function (err, messages) {
88+
// don't send all the responses to all the watches again... just to self one
89+
if (err) {
90+
callback(err);
91+
}
92+
93+
messages.forEach(function (message) {
94+
callback(null, message);
95+
});
96+
});
97+
}
98+
};
99+
100+
/**
101+
Adds the callback and sets up the methods, to iterate over the results.
102+
103+
@method pollFilter
104+
@param {Object} self
105+
*/
106+
var pollFilter = function(self) {
85107

86108
var onMessage = function (error, messages) {
87109
if (error) {
@@ -98,24 +120,11 @@ var addCallback = function(self, callback) {
98120
});
99121
};
100122

101-
// call getFilterLogs on start
102-
if (!utils.isString(self.options)) {
103-
self.get(function (err, messages) {
104-
// don't send all the responses to all the watches again... just to self one
105-
if (err) {
106-
callback(err);
107-
}
108-
109-
messages.forEach(function (message) {
110-
callback(null, message);
111-
});
112-
});
113-
}
114-
115123
RequestManager.getInstance().startPolling({
116124
method: self.implementation.poll.call,
117125
params: [self.filterId],
118126
}, self.filterId, onMessage, self.stopWatching.bind(self));
127+
119128
};
120129

121130
var Filter = function (options, methods, formatter) {
@@ -127,31 +136,33 @@ var Filter = function (options, methods, formatter) {
127136
this.options = getOptions(options);
128137
this.implementation = implementation;
129138
this.callbacks = [];
130-
this.addCallbacks = [];
139+
this.pollFilters = [];
131140
this.formatter = formatter;
132141
this.implementation.newFilter(this.options, function(error, id){
133142
if(error) {
134-
self.filterError = error;
135-
self.addCallbacks.forEach(function(callback){
143+
self.callbacks.forEach(function(callback){
136144
callback(error);
137145
});
138-
} else if(self.addCallbacks) {
146+
} else {
139147
self.filterId = id;
140-
self.addCallbacks.forEach(function(callback){
141-
addCallback(self, callback);
148+
// get filter logs at start
149+
self.callbacks.forEach(function(callback){
150+
getLogsAtStart(self, callback);
142151
});
143-
self.addCallbacks = [];
152+
pollFilter(self);
144153
}
145154
});
146155
};
147156

148157
Filter.prototype.watch = function (callback) {
149158
this.callbacks.push(callback);
150159

151-
if(this.filterId)
152-
addCallback(this, callback);
153-
else
154-
this.addCallbacks.push(callback);
160+
if(this.filterId) {
161+
getLogsAtStart(this, callback);
162+
pollFilter(this);
163+
}
164+
165+
return this;
155166
};
156167

157168
Filter.prototype.stopWatching = function () {
@@ -179,6 +190,8 @@ Filter.prototype.get = function (callback) {
179190
return self.formatter ? self.formatter(log) : log;
180191
});
181192
}
193+
194+
return this;
182195
};
183196

184197
module.exports = Filter;

lib/web3/requestmanager.js

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var RequestManager = function (provider) {
4343
arguments.callee._singletonInstance = this;
4444

4545
this.provider = provider;
46-
this.polls = [];
46+
this.polls = {};
4747
this.timeout = null;
4848
this.poll();
4949
};
@@ -156,7 +156,7 @@ RequestManager.prototype.setProvider = function (p) {
156156
* @todo cleanup number of params
157157
*/
158158
RequestManager.prototype.startPolling = function (data, pollId, callback, uninstall) {
159-
this.polls.push({data: data, id: pollId, callback: callback, uninstall: uninstall});
159+
this.polls['poll_'+ pollId] = {data: data, id: pollId, callback: callback, uninstall: uninstall};
160160
};
161161
/*jshint maxparams:3 */
162162

@@ -167,24 +167,21 @@ RequestManager.prototype.startPolling = function (data, pollId, callback, uninst
167167
* @param {Number} pollId
168168
*/
169169
RequestManager.prototype.stopPolling = function (pollId) {
170-
for (var i = this.polls.length; i--;) {
171-
var poll = this.polls[i];
172-
if (poll.id === pollId) {
173-
this.polls.splice(i, 1);
174-
}
175-
}
170+
delete this.polls['poll_'+ pollId];
176171
};
177172

178173
/**
179-
* Should be called to reset polling mechanism of request manager
174+
* Should be called to reset the polling mechanism of the request manager
180175
*
181176
* @method reset
182177
*/
183178
RequestManager.prototype.reset = function () {
184-
this.polls.forEach(function (poll) {
185-
poll.uninstall(poll.id);
186-
});
187-
this.polls = [];
179+
for (var key in this.polls) {
180+
if (this.polls.hasOwnProperty(key)) {
181+
this.polls[key].uninstall();
182+
}
183+
}
184+
this.polls = {};
188185

189186
if (this.timeout) {
190187
clearTimeout(this.timeout);
@@ -201,7 +198,7 @@ RequestManager.prototype.reset = function () {
201198
RequestManager.prototype.poll = function () {
202199
this.timeout = setTimeout(this.poll.bind(this), c.ETH_POLLING_TIMEOUT);
203200

204-
if (!this.polls.length) {
201+
if (this.polls === {}) {
205202
return;
206203
}
207204

@@ -210,32 +207,43 @@ RequestManager.prototype.poll = function () {
210207
return;
211208
}
212209

213-
var payload = Jsonrpc.getInstance().toBatchPayload(this.polls.map(function (data) {
214-
return data.data;
215-
}));
210+
var pollsData = [];
211+
var pollsKeys = [];
212+
for (var key in this.polls) {
213+
if (this.polls.hasOwnProperty(key)) {
214+
pollsData.push(this.polls[key].data);
215+
pollsKeys.push(key);
216+
}
217+
}
218+
219+
if (pollsData.length === 0) {
220+
return;
221+
}
222+
223+
var payload = Jsonrpc.getInstance().toBatchPayload(pollsData);
216224

217225
var self = this;
218226
this.provider.sendAsync(payload, function (error, results) {
219227
// TODO: console log?
220228
if (error) {
221229
return;
222230
}
223-
231+
224232
if (!utils.isArray(results)) {
225233
throw errors.InvalidResponse(results);
226234
}
227235

228236
results.map(function (result, index) {
237+
var key = pollsKeys[index];
229238
// make sure the filter is still installed after arrival of the request
230-
if(self.polls[index]) {
231-
result.callback = self.polls[index].callback;
239+
if(self.polls[key]) {
240+
result.callback = self.polls[key].callback;
232241
return result;
233242
} else
234243
return false;
235244
}).filter(function (result) {
236-
if(!result)
237-
return false;
238-
245+
return (!result) ? false : true;
246+
}).filter(function (result) {
239247
var valid = Jsonrpc.getInstance().isValidResponse(result);
240248
if (!valid) {
241249
result.callback(errors.InvalidResponse(result));

test/contract.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ describe('web3.eth.contract', function () {
6666
provider.injectValidation(function (payload) {
6767
if (step === 0) {
6868
step = 1;
69-
provider.injectResult(3);
69+
provider.injectResult('0x3');
7070
assert.equal(payload.jsonrpc, '2.0');
7171
assert.equal(payload.method, 'eth_newFilter');
7272
assert.deepEqual(payload.params[0], {
@@ -105,7 +105,7 @@ describe('web3.eth.contract', function () {
105105
'0000000000000000000000000000000000000000000000000000000000000008'
106106
}]]);
107107
var r = payload.filter(function (p) {
108-
return p.jsonrpc === '2.0' && p.method === 'eth_getFilterChanges' && p.params[0] === 3;
108+
return p.jsonrpc === '2.0' && p.method === 'eth_getFilterChanges' && p.params[0] === '0x3';
109109
});
110110
assert.equal(r.length > 0, true);
111111
}
@@ -114,7 +114,8 @@ describe('web3.eth.contract', function () {
114114
var contract = web3.eth.contract(desc).at(address);
115115

116116
var res = 0;
117-
contract.Changed({from: address}).watch(function(err, result) {
117+
var event = contract.Changed({from: address});
118+
event.watch(function(err, result) {
118119
assert.equal(result.args.from, address);
119120
assert.equal(result.args.amount, 1);
120121
assert.equal(result.args.t1, 1);

test/helpers/FakeHttpProvider2.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@ FakeHttpProvider2.prototype.injectResultList = function (list) {
1515
FakeHttpProvider2.prototype.getResponse = function () {
1616
var result = this.resultList[this.counter];
1717
this.counter++;
18+
19+
// add fallback result value
20+
if(!result)
21+
result = {
22+
result: undefined
23+
};
24+
1825
if (result.type === 'batch') {
1926
this.injectBatchResults(result.result);
2027
} else {
2128
this.injectResult(result.result);
2229
}
30+
31+
this.counter = 0;
32+
2333
return this.response;
2434
};
2535

0 commit comments

Comments
 (0)