Skip to content

feat: support pull streams #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"is-pull-stream": "0.0.0",
"is-stream": "^2.0.0",
"kind-of": "^6.0.2",
"pull-stream-to-async-iterator": "^1.0.2",
"readable-stream": "^3.4.0"
},
"devDependencies": {
Expand Down
246 changes: 171 additions & 75 deletions src/files/normalise-input.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,53 @@

const errCode = require('err-code')
const { Buffer } = require('buffer')
const pullStreamToIterable = require('pull-stream-to-async-iterator')

/*
* Transform one of:
*
* ```
* Buffer|ArrayBuffer|TypedArray
* Blob|File
* { path, content: Blob }
* { path, content: String }
* { path, content: Iterable<Number> }
* { path, content: Iterable<Buffer> }
* { path, content: Iterable<Iterable<Number>> }
* { path, content: AsyncIterable<Iterable<Number>> }
* String
* Iterable<Number>
* Iterable<Buffer>
* Iterable<Blob>
* Iterable<{ path, content: Buffer }>
* Iterable<{ path, content: Blob }>
* Iterable<{ path, content: Iterable<Number> }>
* Iterable<{ path, content: AsyncIterable<Buffer> }>
* AsyncIterable<Buffer>
* AsyncIterable<{ path, content: Buffer }>
* AsyncIterable<{ path, content: Blob }>
* AsyncIterable<{ path, content: Iterable<Buffer> }>
* AsyncIterable<{ path, content: AsyncIterable<Buffer> }>
* Bytes (Buffer|ArrayBuffer|TypedArray) [single file]
* Bloby (Blob|File) [single file]
* String [single file]
* { path, content: Bytes } [single file]
* { path, content: Bloby } [single file]
* { path, content: String } [single file]
* { path, content: Iterable<Number> } [single file]
* { path, content: Iterable<Bytes> } [single file]
* { path, content: AsyncIterable<Bytes> } [single file]
* { path, content: PullStream<Bytes> } [single file]
* Iterable<Number> [single file]
* Iterable<Bytes> [single file]
* Iterable<Bloby> [multiple files]
* Iterable<String> [multiple files]
* Iterable<{ path, content: Bytes }> [multiple files]
* Iterable<{ path, content: Bloby }> [multiple files]
* Iterable<{ path, content: String }> [multiple files]
* Iterable<{ path, content: Iterable<Number> }> [multiple files]
* Iterable<{ path, content: Iterable<Bytes> }> [multiple files]
* Iterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* Iterable<{ path, content: PullStream<Bytes> }> [multiple files]
* AsyncIterable<Bytes> [single file]
* AsyncIterable<Bloby> [multiple files]
* AsyncIterable<String> [multiple files]
* AsyncIterable<{ path, content: Bytes }> [multiple files]
* AsyncIterable<{ path, content: Bloby }> [multiple files]
* AsyncIterable<{ path, content: String }> [multiple files]
* AsyncIterable<{ path, content: Iterable<Number> }> [multiple files]
* AsyncIterable<{ path, content: Iterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: PullStream<Bytes> }> [multiple files]
* PullStream<Bytes> [single file]
* PullStream<Bloby> [multiple files]
* PullStream<String> [multiple files]
* PullStream<{ path, content: Bytes }> [multiple files]
* PullStream<{ path, content: Bloby }> [multiple files]
* PullStream<{ path, content: String }> [multiple files]
* PullStream<{ path, content: Iterable<Number> }> [multiple files]
* PullStream<{ path, content: Iterable<Bytes> }> [multiple files]
* PullStream<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* PullStream<{ path, content: PullStream<Bytes> }> [multiple files]
* ```
* Into:
*
Expand All @@ -44,13 +65,6 @@ module.exports = function normaliseInput (input) {
throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT'))
}

// { path, content: ? }
if (isFileObject(input)) {
return (async function * () { // eslint-disable-line require-await
yield toFileObject(input)
})()
}

// String
if (typeof input === 'string' || input instanceof String) {
return (async function * () { // eslint-disable-line require-await
Expand All @@ -68,76 +82,165 @@ module.exports = function normaliseInput (input) {

// Iterable<?>
if (input[Symbol.iterator]) {
// Iterable<Number>
if (!isNaN(input[0])) {
return (async function * () { // eslint-disable-line require-await
yield toFileObject([input])
})()
}

// Iterable<Buffer>
// Iterable<Blob>
return (async function * () { // eslint-disable-line require-await
for (const chunk of input) {
yield toFileObject(chunk)
const iterator = input[Symbol.iterator]()
const first = iterator.next()
if (first.done) return iterator

// Iterable<Number>
// Iterable<Bytes>
if (Number.isInteger(first.value) || isBytes(first.value)) {
yield toFileObject((function * () {
yield first.value
yield * iterator
})())
return
}

// Iterable<Bloby>
// Iterable<String>
// Iterable<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// AsyncIterable<?>
if (input[Symbol.asyncIterator]) {
return (async function * () {
const iterator = input[Symbol.asyncIterator]()
const first = await iterator.next()
if (first.done) return iterator

// AsyncIterable<Bytes>
if (isBytes(first.value)) {
yield toFileObject((async function * () { // eslint-disable-line require-await
yield first.value
yield * iterator
})())
return
}

// AsyncIterable<Bloby>
// AsyncIterable<String>
// AsyncIterable<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for await (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// { path, content: ? }
// Note: Detected _after_ AsyncIterable<?> because Node.js streams have a
// `path` property that passes this check.
if (isFileObject(input)) {
return (async function * () { // eslint-disable-line require-await
for await (const chunk of input) {
yield toFileObject(chunk)
yield toFileObject(input)
})()
}

// PullStream<?>
if (typeof input === 'function') {
return (async function * () {
const iterator = pullStreamToIterable(input)[Symbol.asyncIterator]()
const first = await iterator.next()
if (first.done) return iterator

// PullStream<Bytes>
if (isBytes(first.value)) {
yield toFileObject((async function * () { // eslint-disable-line require-await
yield first.value
yield * iterator
})())
return
}

// PullStream<Bloby>
// PullStream<String>
// PullStream<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for await (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
}

function toFileObject (input) {
return {
path: input.path || '',
content: toAsyncIterable(input.content || input)
const obj = { path: input.path || '' }

if (input.content) {
obj.content = toAsyncIterable(input.content)
} else if (!input.path) { // Not already a file object with path or content prop
obj.content = toAsyncIterable(input)
}

return obj
}

function toAsyncIterable (input) {
// Buffer|ArrayBuffer|TypedArray|array of bytes
if (isBytes(input)) {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

if (typeof input === 'string' || input instanceof String) {
// Bytes | String
if (isBytes(input) || typeof input === 'string') {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

// Blob|File
// Bloby
if (isBloby(input)) {
return blobToAsyncGenerator(input)
}

// Iterator<?>
if (input[Symbol.iterator]) {
if (!isNaN(input[0])) {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

return (async function * () { // eslint-disable-line require-await
for (const chunk of input) {
yield toBuffer(chunk)
const iterator = input[Symbol.iterator]()
const first = iterator.next()
if (first.done) return iterator

// Iterable<Number>
if (Number.isInteger(first.value)) {
yield toBuffer(Array.from((function * () {
yield first.value
yield * iterator
})()))
return
}

// Iterable<Bytes>
if (isBytes(first.value)) {
yield toBuffer(first.value)
for (const chunk of iterator) {
yield toBuffer(chunk)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// AsyncIterable<?>
// AsyncIterable<Bytes>
if (input[Symbol.asyncIterator]) {
return (async function * () {
for await (const chunk of input) {
Expand All @@ -146,23 +249,16 @@ function toAsyncIterable (input) {
})()
}

// PullStream<Bytes>
if (typeof input === 'function') {
return pullStreamToIterable(input)
}

throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT'))
}

function toBuffer (chunk) {
if (isBytes(chunk)) {
return chunk
}

if (typeof chunk === 'string' || chunk instanceof String) {
return Buffer.from(chunk)
}

if (Array.isArray(chunk)) {
return Buffer.from(chunk)
}

throw new Error('Unexpected input: ' + typeof chunk)
return isBytes(chunk) ? chunk : Buffer.from(chunk)
}

function isBytes (obj) {
Expand Down
Loading