Skip to content

Commit 91e5964

Browse files
committed
Update createReadStream() to support streams2 API
Previous support was improperly reverting internal stream to legacy API causing unnecessary buffering.
1 parent 91fc885 commit 91e5964

File tree

6 files changed

+91
-40
lines changed

6 files changed

+91
-40
lines changed

features/s3/step_definitions/objects.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ module.exports = function () {
9292
this.result = '';
9393
var stream = this.service.getObject(params).createReadStream();
9494
stream.on('end', function() { callback(); });
95-
stream.on('readable', function() { world.result += stream.read(); });
95+
stream.on('readable', function() {
96+
var v = stream.read(); if (v) world.result += v;
97+
});
9698
});
9799

98100
this.Then(/^the streamed data should contain "([^"]*)"$/, function(data, callback) {

lib/event_listeners.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,16 +158,26 @@ AWS.EventListeners = {
158158

159159
add('SEND', 'send', function SEND(resp) {
160160
function callback(httpResp) {
161+
resp.httpStream = httpResp;
162+
161163
var headers = [httpResp.statusCode, httpResp.headers, resp];
162164
resp.request.emitEvent('httpHeaders', headers);
163165

164-
httpResp.on('data', function onData(data) {
165-
resp.request.emitEvent('httpData', [data, resp]);
166-
});
167-
168-
httpResp.on('end', function onEnd() {
169-
resp.request.emitEvent('httpDone', [resp]);
170-
});
166+
if (resp.httpStream) {
167+
if (AWS.HttpClient.streamsApiVersion === 2) { // streams2 API check
168+
httpResp.on('readable', function onReadable() {
169+
resp.request.emitEvent('httpData', [httpResp.read(), resp]);
170+
});
171+
} else { // legacy streams API
172+
httpResp.on('data', function onData(data) {
173+
resp.request.emitEvent('httpData', [data, resp]);
174+
});
175+
}
176+
177+
httpResp.on('end', function onEnd() {
178+
resp.request.emitEvent('httpDone', [resp]);
179+
});
180+
}
171181
}
172182

173183
function error(err) {
@@ -194,7 +204,7 @@ AWS.EventListeners = {
194204

195205
add('HTTP_DONE', 'httpDone', function HTTP_DONE(resp) {
196206
// convert buffers array into single buffer
197-
if (resp.httpResponse.buffers.length > 0) {
207+
if (resp.httpResponse.buffers && resp.httpResponse.buffers.length > 0) {
198208
var body = AWS.util.buffer.concat(resp.httpResponse.buffers);
199209
resp.httpResponse.body = body;
200210
}

lib/http.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ AWS.NodeHttpClient = inherit({
244244
*/
245245
AWS.HttpClient = AWS.NodeHttpClient;
246246

247+
/**
248+
* @api private
249+
*/
250+
AWS.HttpClient.streamsApiVersion = require('stream').Readable ? 2 : 1;
251+
247252
/**
248253
* @api private
249254
*/

lib/request.js

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -270,52 +270,54 @@ AWS.Request = inherit({
270270
*/
271271
createReadStream: function createReadStream() {
272272
var req = this;
273-
var legacyStreams = !streams.Readable;
274-
var stream;
273+
var stream = null;
274+
var legacyStreams = false;
275275

276-
if (legacyStreams) {
276+
if (AWS.HttpClient.streamsApiVersion === 2) {
277+
stream = new streams.Readable();
278+
stream._read = function() { stream.push(''); };
279+
} else {
277280
stream = new streams.Stream();
278281
stream.readable = true;
279-
} else {
280-
stream = new streams.Readable();
281-
stream._read = function() { };
282282
}
283283

284+
stream.sent = false;
284285
stream.on('newListener', function(event) {
285-
if ((legacyStreams && event === 'data') ||
286-
(!legacyStreams && event == 'readable')) {
286+
if (!stream.sent && (event === 'data' || event === 'readable')) {
287+
if (event === 'data') legacyStreams = true;
288+
stream.sent = true;
287289
process.nextTick(function() { req.send(); });
288290
}
289291
});
290292

291-
this.on('httpHeaders', function(statusCode) {
293+
this.on('httpHeaders', function streamHeaders(statusCode, headers, resp) {
292294
if (statusCode < 300) {
293-
// Remove httpData, httpError listeners, add streaming listeners
294295
req.removeListener('httpData', AWS.EventListeners.Core.HTTP_DATA);
295296
req.removeListener('httpError', AWS.EventListeners.Core.HTTP_ERROR);
297+
req.on('httpError', function streamHttpError(error, resp) {
298+
resp.error = error;
299+
resp.error.retryable = false;
300+
this.completeRequest(resp);
301+
});
296302

297-
req.on('httpData', function(data) {
298-
if (legacyStreams) {
299-
stream.emit('data', data);
300-
} else {
303+
var httpStream = resp.httpStream;
304+
stream.response = resp;
305+
stream._read = function() {
306+
var data;
307+
/*jshint boss:true*/
308+
while (data = httpStream.read()) {
301309
stream.push(data);
302310
}
303-
});
311+
};
304312

305-
req.on('httpDone', function() {
306-
if (legacyStreams) {
307-
stream.emit('end');
308-
stream.readable = false;
309-
} else {
310-
stream.push(null);
311-
}
313+
var events = ['end', 'error', (legacyStreams ? 'data' : 'readable')];
314+
AWS.util.arrayEach(events, function(event) {
315+
httpStream.on(event, function(arg) {
316+
stream.emit(event, arg);
317+
});
312318
});
313319

314-
req.on('httpError', function(error, resp) {
315-
resp.error = error;
316-
resp.error.retryable = false;
317-
this.completeRequest(resp);
318-
});
320+
resp.httpStream = null; // take ownership of the stream object
319321
}
320322
});
321323

test/helpers.coffee

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,25 @@ MockService = AWS.Service.defineService 'mockService',
5959
signatureVersion: 'v4'
6060

6161
mockHttpSuccessfulResponse = (status, headers, data, cb) ->
62+
if !Array.isArray(data)
63+
data = [data]
64+
6265
httpResp = new EventEmitter()
6366
httpResp.statusCode = status
6467
httpResp.headers = headers
68+
httpResp.read = ->
69+
if data.length > 0
70+
new Buffer(data.shift())
71+
else
72+
null
6573

6674
cb(httpResp)
6775

68-
if !Array.isArray(data)
69-
data = [data]
70-
AWS.util.arrayEach data, (str) ->
71-
httpResp.emit('data', new Buffer(str))
76+
AWS.util.arrayEach data.slice(), (str) ->
77+
if httpResp._events.readable
78+
httpResp.emit('readable')
79+
else
80+
httpResp.emit('data', new Buffer(str))
7281

7382
httpResp.emit('end')
7483

test/request.spec.coffee

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,29 @@ describe 'AWS.Request', ->
3333
runs ->
3434
expect(data).toEqual('FOOBARBAZQUX')
3535

36+
it 'streams2 data (readable event)', ->
37+
if AWS.HttpClient.streamsApiVersion < 2
38+
return
39+
40+
data = ''; done = false
41+
helpers.mockHttpResponse 200, {}, ['FOO', 'BAR', 'BAZ', 'QUX']
42+
43+
runs ->
44+
request = service.makeRequest('mockMethod')
45+
s = request.createReadStream()
46+
s.on 'end', -> done = true
47+
s.on 'readable', ->
48+
try
49+
chunk = s.read()
50+
if chunk
51+
data += chunk
52+
catch e
53+
console.log(e.stack)
54+
55+
waitsFor -> done == true
56+
runs ->
57+
expect(data).toEqual('FOOBARBAZQUX')
58+
3659
it 'does not stream data on failures', ->
3760
data = ''; error = null; done = false
3861
helpers.mockHttpResponse 404, {}, ['No such file']

0 commit comments

Comments
 (0)