41
41
*/
42
42
43
43
import { pushable } from 'it-pushable'
44
+ import type { Pushable } from 'it-pushable'
44
45
45
46
function isAsyncIterable < T > ( thing : any ) : thing is AsyncIterable < T > {
46
47
return thing [ Symbol . asyncIterator ] != null
47
48
}
48
49
50
+ async function addAllToPushable < T > ( sources : Array < AsyncIterable < T > | Iterable < T > > , output : Pushable < T > ) : Promise < void > {
51
+ try {
52
+ await Promise . all (
53
+ sources . map ( async ( source ) => {
54
+ for await ( const item of source ) {
55
+ output . push ( item )
56
+ }
57
+ } )
58
+ )
59
+
60
+ output . end ( )
61
+ } catch ( err : any ) {
62
+ output . end ( err )
63
+ }
64
+ }
65
+
66
+ async function * mergeSources < T > ( sources : Array < AsyncIterable < T > | Iterable < T > > ) : AsyncGenerator < T , void , undefined > {
67
+ const output = pushable < T > ( {
68
+ objectMode : true
69
+ } )
70
+
71
+ addAllToPushable ( sources , output )
72
+ . catch ( ( ) => { } )
73
+
74
+ yield * output
75
+ }
76
+
77
+ function * mergeSyncSources < T > ( syncSources : Array < Iterable < T > > ) : Generator < T , void , undefined > {
78
+ for ( const source of syncSources ) {
79
+ yield * source
80
+ }
81
+ }
82
+
49
83
/**
50
84
* Treat one or more iterables as a single iterable.
51
85
*
@@ -65,36 +99,10 @@ function merge <T> (...sources: Array<AsyncIterable<T> | Iterable<T>>): AsyncGen
65
99
66
100
if ( syncSources . length === sources . length ) {
67
101
// all sources are synchronous
68
- return ( function * ( ) {
69
- for ( const source of syncSources ) {
70
- yield * source
71
- }
72
- } ) ( )
102
+ return mergeSyncSources ( syncSources )
73
103
}
74
104
75
- return ( async function * ( ) {
76
- const output = pushable < T > ( {
77
- objectMode : true
78
- } )
79
-
80
- void Promise . resolve ( ) . then ( async ( ) => {
81
- try {
82
- await Promise . all (
83
- sources . map ( async ( source ) => {
84
- for await ( const item of source ) {
85
- output . push ( item )
86
- }
87
- } )
88
- )
89
-
90
- output . end ( )
91
- } catch ( err : any ) {
92
- output . end ( err )
93
- }
94
- } )
95
-
96
- yield * output
97
- } ) ( )
105
+ return mergeSources ( sources )
98
106
}
99
107
100
108
export default merge
0 commit comments