From 8376d19ea684e377e22ba35a914a02ebf82ba1d6 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 3 Sep 2019 14:03:47 +0100 Subject: [PATCH 1/2] feat: support pull streams This PR updates the `normaliseInput` function to accept pull streams. I've also made the following changes: 1. Update the docs for supported inputs * `Buffer|ArrayBuffer|TypedArray` is aliased as `Bytes` * `Blob|File` is aliased as `Bloby` * Added info for what a input "means" i.e. causes single/multiple files to be added 1. Peek the first item of an (async) iterator properly 1. Move file object check below `input[Symbol.asyncIterator]` check because Node.js streams have a path property that will false positive the `isFileObject` check 1. Fix `toFileObject` to allow objects with no `content` property 1. Simplify `toBuffer` to remove checks that `Buffer.from` already does License: MIT Signed-off-by: Alan Shaw --- package.json | 1 + src/files/normalise-input.js | 224 +++++++++++++++++++++++------------ 2 files changed, 149 insertions(+), 76 deletions(-) diff --git a/package.json b/package.json index ba23173..d9c0ac3 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "is-pull-stream": "0.0.0", "is-stream": "^2.0.0", "kind-of": "^6.0.2", + "pull-stream-to-async-iterator": "^1.0.2", "readable-stream": "^3.4.0" }, "devDependencies": { diff --git a/src/files/normalise-input.js b/src/files/normalise-input.js index d1d9b9f..decd2cc 100644 --- a/src/files/normalise-input.js +++ b/src/files/normalise-input.js @@ -2,32 +2,53 @@ const errCode = require('err-code') const { Buffer } = require('buffer') +const pullStreamToIterable = require('pull-stream-to-async-iterator') /* * Transform one of: * * ``` - * Buffer|ArrayBuffer|TypedArray - * Blob|File - * { path, content: Blob } - * { path, content: String } - * { path, content: Iterable } - * { path, content: Iterable } - * { path, content: Iterable> } - * { path, content: AsyncIterable> } - * String - * Iterable - * Iterable - * Iterable - * Iterable<{ path, content: Buffer }> - * Iterable<{ path, content: Blob }> - * Iterable<{ path, content: Iterable }> - * Iterable<{ path, content: AsyncIterable }> - * AsyncIterable - * AsyncIterable<{ path, content: Buffer }> - * AsyncIterable<{ path, content: Blob }> - * AsyncIterable<{ path, content: Iterable }> - * AsyncIterable<{ path, content: AsyncIterable }> + * Bytes (Buffer|ArrayBuffer|TypedArray) [single file] + * Bloby (Blob|File) [single file] + * String [single file] + * { path, content: Bytes } [single file] + * { path, content: Bloby } [single file] + * { path, content: String } [single file] + * { path, content: Iterable } [single file] + * { path, content: Iterable } [single file] + * { path, content: AsyncIterable } [single file] + * { path, content: PullStream } [single file] + * Iterable [single file] + * Iterable [single file] + * Iterable [multiple files] + * Iterable [multiple files] + * Iterable<{ path, content: Bytes }> [multiple files] + * Iterable<{ path, content: Bloby }> [multiple files] + * Iterable<{ path, content: String }> [multiple files] + * Iterable<{ path, content: Iterable }> [multiple files] + * Iterable<{ path, content: Iterable }> [multiple files] + * Iterable<{ path, content: AsyncIterable }> [multiple files] + * Iterable<{ path, content: PullStream }> [multiple files] + * AsyncIterable [single file] + * AsyncIterable [multiple files] + * AsyncIterable [multiple files] + * AsyncIterable<{ path, content: Bytes }> [multiple files] + * AsyncIterable<{ path, content: Bloby }> [multiple files] + * AsyncIterable<{ path, content: String }> [multiple files] + * AsyncIterable<{ path, content: Iterable }> [multiple files] + * AsyncIterable<{ path, content: Iterable }> [multiple files] + * AsyncIterable<{ path, content: AsyncIterable }> [multiple files] + * AsyncIterable<{ path, content: PullStream }> [multiple files] + * PullStream [single file] + * PullStream [multiple files] + * PullStream [multiple files] + * PullStream<{ path, content: Bytes }> [multiple files] + * PullStream<{ path, content: Bloby }> [multiple files] + * PullStream<{ path, content: String }> [multiple files] + * PullStream<{ path, content: Iterable }> [multiple files] + * PullStream<{ path, content: Iterable }> [multiple files] + * PullStream<{ path, content: AsyncIterable }> [multiple files] + * PullStream<{ path, content: PullStream }> [multiple files] * ``` * Into: * @@ -44,13 +65,6 @@ module.exports = function normaliseInput (input) { throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT')) } - // { path, content: ? } - if (isFileObject(input)) { - return (async function * () { // eslint-disable-line require-await - yield toFileObject(input) - })() - } - // String if (typeof input === 'string' || input instanceof String) { return (async function * () { // eslint-disable-line require-await @@ -68,28 +82,80 @@ module.exports = function normaliseInput (input) { // Iterable if (input[Symbol.iterator]) { - // Iterable - if (!isNaN(input[0])) { - return (async function * () { // eslint-disable-line require-await - yield toFileObject([input]) - })() - } - - // Iterable - // Iterable return (async function * () { // eslint-disable-line require-await - for (const chunk of input) { - yield toFileObject(chunk) + const iterator = input[Symbol.iterator]() + const first = iterator.next() + if (first.done) return iterator + + // Iterable + // Iterable + if (Number.isInteger(first.value) || isBytes(first.value)) { + yield toFileObject((function * () { + yield first.value + yield * iterator + })()) + return } + + // Iterable + // Iterable + // Iterable<{ path, content }> + if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') { + yield toFileObject(first.value) + for (const obj of iterator) { + yield toFileObject(obj) + } + return + } + + throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT') })() } // AsyncIterable if (input[Symbol.asyncIterator]) { - return (async function * () { // eslint-disable-line require-await - for await (const chunk of input) { - yield toFileObject(chunk) + return (async function * () { + const iterator = input[Symbol.asyncIterator]() + const first = await iterator.next() + if (first.done) return iterator + + // AsyncIterable + if (isBytes(first.value)) { + yield toFileObject((async function * () { // eslint-disable-line require-await + yield first.value + yield * iterator + })()) + return + } + + // AsyncIterable + // AsyncIterable + // AsyncIterable<{ path, content }> + if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') { + yield toFileObject(first.value) + for await (const obj of iterator) { + yield toFileObject(obj) + } + return } + + throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT') + })() + } + + // { path, content: ? } + // Note: Detected _after_ AsyncIterable because Node.js streams have a + // `path` property that passes this check. + if (isFileObject(input)) { + return (async function * () { // eslint-disable-line require-await + yield toFileObject(input) + })() + } + + // PullStream + if (typeof input === 'function') { + return (async function * () { // eslint-disable-line require-await + yield toFileObject(input) })() } @@ -97,47 +163,60 @@ module.exports = function normaliseInput (input) { } function toFileObject (input) { - return { - path: input.path || '', - content: toAsyncIterable(input.content || input) + const obj = { path: input.path || '' } + + if (input.content) { + obj.content = toAsyncIterable(input.content) + } else if (!input.path) { // Not already a file object with path or content prop + obj.content = toAsyncIterable(input) } + + return obj } function toAsyncIterable (input) { - // Buffer|ArrayBuffer|TypedArray|array of bytes - if (isBytes(input)) { - return (async function * () { // eslint-disable-line require-await - yield toBuffer(input) - })() - } - - if (typeof input === 'string' || input instanceof String) { + // Bytes | String + if (isBytes(input) || typeof input === 'string') { return (async function * () { // eslint-disable-line require-await yield toBuffer(input) })() } - // Blob|File + // Bloby if (isBloby(input)) { return blobToAsyncGenerator(input) } // Iterator if (input[Symbol.iterator]) { - if (!isNaN(input[0])) { - return (async function * () { // eslint-disable-line require-await - yield toBuffer(input) - })() - } - return (async function * () { // eslint-disable-line require-await - for (const chunk of input) { - yield toBuffer(chunk) + const iterator = input[Symbol.iterator]() + const first = iterator.next() + if (first.done) return iterator + + // Iterable + if (Number.isInteger(first.value)) { + yield toBuffer(Array.from((function * () { + yield first.value + yield * iterator + })())) + return + } + + // Iterable + if (isBytes(first.value)) { + yield toBuffer(first.value) + for (const chunk of iterator) { + yield toBuffer(chunk) + } + return } + + throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT') })() } - // AsyncIterable + // AsyncIterable if (input[Symbol.asyncIterator]) { return (async function * () { for await (const chunk of input) { @@ -146,23 +225,16 @@ function toAsyncIterable (input) { })() } + // PullStream + if (typeof input === 'function') { + return pullStreamToIterable(input) + } + throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT')) } function toBuffer (chunk) { - if (isBytes(chunk)) { - return chunk - } - - if (typeof chunk === 'string' || chunk instanceof String) { - return Buffer.from(chunk) - } - - if (Array.isArray(chunk)) { - return Buffer.from(chunk) - } - - throw new Error('Unexpected input: ' + typeof chunk) + return isBytes(chunk) ? chunk : Buffer.from(chunk) } function isBytes (obj) { From 08bd7bd3054620cbb5ca96b3b0d4ed3164aa6349 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 3 Sep 2019 15:03:27 +0100 Subject: [PATCH 2/2] fix: tests License: MIT Signed-off-by: Alan Shaw --- src/files/normalise-input.js | 28 ++++++++++++- test/files/normalise-input.spec.js | 66 +++++++++++++++++++----------- 2 files changed, 67 insertions(+), 27 deletions(-) diff --git a/src/files/normalise-input.js b/src/files/normalise-input.js index decd2cc..89482a3 100644 --- a/src/files/normalise-input.js +++ b/src/files/normalise-input.js @@ -154,8 +154,32 @@ module.exports = function normaliseInput (input) { // PullStream if (typeof input === 'function') { - return (async function * () { // eslint-disable-line require-await - yield toFileObject(input) + return (async function * () { + const iterator = pullStreamToIterable(input)[Symbol.asyncIterator]() + const first = await iterator.next() + if (first.done) return iterator + + // PullStream + if (isBytes(first.value)) { + yield toFileObject((async function * () { // eslint-disable-line require-await + yield first.value + yield * iterator + })()) + return + } + + // PullStream + // PullStream + // PullStream<{ path, content }> + if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') { + yield toFileObject(first.value) + for await (const obj of iterator) { + yield toFileObject(obj) + } + return + } + + throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT') })() } diff --git a/test/files/normalise-input.spec.js b/test/files/normalise-input.spec.js index 8b81dee..f536e36 100644 --- a/test/files/normalise-input.spec.js +++ b/test/files/normalise-input.spec.js @@ -7,6 +7,7 @@ const normalise = require('../../src/files/normalise-input') const { supportsFileReader } = require('../../src/supports') const { Buffer } = require('buffer') const all = require('async-iterator-all') +const pull = require('pull-stream') chai.use(dirtyChai) const expect = chai.expect @@ -14,6 +15,7 @@ const expect = chai.expect const STRING = 'hello world' const BUFFER = Buffer.from(STRING) const ARRAY = Array.from(BUFFER) +const TYPEDARRAY = Uint8Array.from(ARRAY) let BLOB if (supportsFileReader) { @@ -49,27 +51,27 @@ function asyncIterableOf (thing) { }()) } +function pullStreamOf (thing) { + return pull.values([thing]) +} + describe('normalise-input', function () { - function testInputType (content, name) { + function testInputType (content, name, isBytes) { it(name, async function () { await testContent(content) }) - it(`Iterable<${name}>`, async function () { - await testContent(iterableOf(content)) - }) - - it(`AsyncIterable<${name}>`, async function () { - await testContent(asyncIterableOf(content)) - }) + if (isBytes) { + it(`Iterable<${name}>`, async function () { + await testContent(iterableOf(content)) + }) - if (name !== 'Blob') { - it(`AsyncIterable>`, async function () { - await testContent(asyncIterableOf(iterableOf(content))) + it(`AsyncIterable<${name}>`, async function () { + await testContent(asyncIterableOf(content)) }) - it(`AsyncIterable>`, async function () { - await testContent(asyncIterableOf(asyncIterableOf(content))) + it(`PullStream<${name}>`, async function () { + await testContent(pullStreamOf(content)) }) } @@ -77,7 +79,7 @@ describe('normalise-input', function () { await testContent({ path: '', content }) }) - if (name !== 'Blob') { + if (isBytes) { it(`{ path: '', content: Iterable<${name}> }`, async function () { await testContent({ path: '', content: iterableOf(content) }) }) @@ -85,13 +87,25 @@ describe('normalise-input', function () { it(`{ path: '', content: AsyncIterable<${name}> }`, async function () { await testContent({ path: '', content: asyncIterableOf(content) }) }) + + it(`{ path: '', content: PullStream<${name}> }`, async function () { + await testContent({ path: '', content: pullStreamOf(content) }) + }) } it(`Iterable<{ path: '', content: ${name} }`, async function () { await testContent(iterableOf({ path: '', content })) }) - if (name !== 'Blob') { + it(`AsyncIterable<{ path: '', content: ${name} }`, async function () { + await testContent(asyncIterableOf({ path: '', content })) + }) + + it(`PullStream<{ path: '', content: ${name} }`, async function () { + await testContent(pullStreamOf({ path: '', content })) + }) + + if (isBytes) { it(`Iterable<{ path: '', content: Iterable<${name}> }>`, async function () { await testContent(iterableOf({ path: '', content: iterableOf(content) })) }) @@ -99,13 +113,7 @@ describe('normalise-input', function () { it(`Iterable<{ path: '', content: AsyncIterable<${name}> }>`, async function () { await testContent(iterableOf({ path: '', content: asyncIterableOf(content) })) }) - } - it(`AsyncIterable<{ path: '', content: ${name} }`, async function () { - await testContent(asyncIterableOf({ path: '', content })) - }) - - if (name !== 'Blob') { it(`AsyncIterable<{ path: '', content: Iterable<${name}> }>`, async function () { await testContent(asyncIterableOf({ path: '', content: iterableOf(content) })) }) @@ -113,15 +121,19 @@ describe('normalise-input', function () { it(`AsyncIterable<{ path: '', content: AsyncIterable<${name}> }>`, async function () { await testContent(asyncIterableOf({ path: '', content: asyncIterableOf(content) })) }) + + it(`PullStream<{ path: '', content: PullStream<${name}> }>`, async function () { + await testContent(pullStreamOf({ path: '', content: pullStreamOf(content) })) + }) } } describe('String', () => { - testInputType(STRING, 'String') + testInputType(STRING, 'String', false) }) describe('Buffer', () => { - testInputType(BUFFER, 'Buffer') + testInputType(BUFFER, 'Buffer', true) }) describe('Blob', () => { @@ -129,10 +141,14 @@ describe('normalise-input', function () { return } - testInputType(BLOB, 'Blob') + testInputType(BLOB, 'Blob', false) }) describe('Iterable', () => { - testInputType(ARRAY, 'Iterable') + testInputType(ARRAY, 'Iterable', false) + }) + + describe('TypedArray', () => { + testInputType(TYPEDARRAY, 'TypedArray', true) }) })