|
1 |
| -import { pipe } from 'it-pipe' |
2 |
| -import { Pushable, pushableV } from 'it-pushable' |
3 |
| -import { abortableSource } from 'abortable-iterator' |
4 |
| -import { encode } from './encode.js' |
5 |
| -import { decode } from './decode.js' |
6 |
| -import { restrictSize } from './restrict-size.js' |
7 |
| -import { MessageTypes, MessageTypeNames, Message } from './message-types.js' |
8 |
| -import { createStream } from './stream.js' |
9 |
| -import { toString as uint8ArrayToString } from 'uint8arrays' |
10 |
| -import { trackedMap } from '@libp2p/tracked-map' |
11 |
| -import { logger } from '@libp2p/logger' |
12 |
| -import type { Sink } from 'it-stream-types' |
13 |
| -import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer' |
14 |
| -import type { Stream } from '@libp2p/interfaces/connection' |
15 |
| -import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics' |
16 |
| -import each from 'it-foreach' |
| 1 | +import type { Components } from '@libp2p/interfaces/components' |
| 2 | +import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' |
| 3 | +import { MplexStreamMuxer } from './mplex.js' |
17 | 4 |
|
18 |
| -const log = logger('libp2p:mplex') |
19 |
| - |
20 |
| -function printMessage (msg: Message) { |
21 |
| - const output: any = { |
22 |
| - ...msg, |
23 |
| - type: `${MessageTypeNames[msg.type]} (${msg.type})` |
24 |
| - } |
25 |
| - |
26 |
| - if (msg.type === MessageTypes.NEW_STREAM) { |
27 |
| - output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()) |
28 |
| - } |
29 |
| - |
30 |
| - if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { |
31 |
| - output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16') |
32 |
| - } |
33 |
| - |
34 |
| - return output |
35 |
| -} |
36 |
| - |
37 |
| -export interface MplexStream extends Stream { |
38 |
| - source: Pushable<Uint8Array> |
39 |
| -} |
40 |
| - |
41 |
| -export interface MplexOptions extends MuxerOptions { |
| 5 | +export interface MplexInit extends StreamMuxerInit { |
42 | 6 | maxMsgSize?: number
|
43 |
| - metrics?: ComponentMetricsTracker |
44 | 7 | }
|
45 | 8 |
|
46 |
| -export class Mplex implements Muxer { |
47 |
| - static multicodec = '/mplex/6.7.0' |
48 |
| - |
49 |
| - public sink: Sink<Uint8Array> |
50 |
| - public source: AsyncIterable<Uint8Array> |
51 |
| - |
52 |
| - private _streamId: number |
53 |
| - private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> } |
54 |
| - private readonly _options: MplexOptions |
55 |
| - private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } |
56 |
| - |
57 |
| - constructor (options?: MplexOptions) { |
58 |
| - options = options ?? {} |
59 |
| - |
60 |
| - this._streamId = 0 |
61 |
| - this._streams = { |
62 |
| - /** |
63 |
| - * Stream to ids map |
64 |
| - */ |
65 |
| - initiators: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }), |
66 |
| - /** |
67 |
| - * Stream to ids map |
68 |
| - */ |
69 |
| - receivers: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' }) |
70 |
| - } |
71 |
| - this._options = options |
72 |
| - |
73 |
| - /** |
74 |
| - * An iterable sink |
75 |
| - */ |
76 |
| - this.sink = this._createSink() |
77 |
| - |
78 |
| - /** |
79 |
| - * An iterable source |
80 |
| - */ |
81 |
| - const source = this._createSource() |
82 |
| - this._source = source |
83 |
| - this.source = source |
84 |
| - } |
85 |
| - |
86 |
| - /** |
87 |
| - * Returns a Map of streams and their ids |
88 |
| - */ |
89 |
| - get streams () { |
90 |
| - // Inbound and Outbound streams may have the same ids, so we need to make those unique |
91 |
| - const streams: Stream[] = [] |
92 |
| - this._streams.initiators.forEach(stream => { |
93 |
| - streams.push(stream) |
94 |
| - }) |
95 |
| - this._streams.receivers.forEach(stream => { |
96 |
| - streams.push(stream) |
97 |
| - }) |
98 |
| - return streams |
99 |
| - } |
100 |
| - |
101 |
| - /** |
102 |
| - * Initiate a new stream with the given name. If no name is |
103 |
| - * provided, the id of the stream will be used. |
104 |
| - */ |
105 |
| - newStream (name?: string): Stream { |
106 |
| - const id = this._streamId++ |
107 |
| - name = name == null ? id.toString() : name.toString() |
108 |
| - const registry = this._streams.initiators |
109 |
| - return this._newStream({ id, name, type: 'initiator', registry }) |
110 |
| - } |
111 |
| - |
112 |
| - /** |
113 |
| - * Called whenever an inbound stream is created |
114 |
| - */ |
115 |
| - _newReceiverStream (options: { id: number, name: string }) { |
116 |
| - const { id, name } = options |
117 |
| - const registry = this._streams.receivers |
118 |
| - return this._newStream({ id, name, type: 'receiver', registry }) |
119 |
| - } |
120 |
| - |
121 |
| - _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) { |
122 |
| - const { id, name, type, registry } = options |
123 |
| - |
124 |
| - log('new %s stream %s %s', type, id, name) |
125 |
| - |
126 |
| - if (registry.has(id)) { |
127 |
| - throw new Error(`${type} stream ${id} already exists!`) |
128 |
| - } |
129 |
| - |
130 |
| - const send = (msg: Message) => { |
131 |
| - if (log.enabled) { |
132 |
| - log('%s stream %s send', type, id, printMessage(msg)) |
133 |
| - } |
134 |
| - |
135 |
| - if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { |
136 |
| - msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice() |
137 |
| - } |
138 |
| - |
139 |
| - this._source.push(msg) |
140 |
| - } |
141 |
| - |
142 |
| - const onEnd = () => { |
143 |
| - log('%s stream %s %s ended', type, id, name) |
144 |
| - registry.delete(id) |
145 |
| - |
146 |
| - if (this._options.onStreamEnd != null) { |
147 |
| - this._options.onStreamEnd(stream) |
148 |
| - } |
149 |
| - } |
150 |
| - |
151 |
| - const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize }) |
152 |
| - registry.set(id, stream) |
153 |
| - return stream |
154 |
| - } |
155 |
| - |
156 |
| - /** |
157 |
| - * Creates a sink with an abortable source. Incoming messages will |
158 |
| - * also have their size restricted. All messages will be varint decoded. |
159 |
| - */ |
160 |
| - _createSink () { |
161 |
| - const sink: Sink<Uint8Array> = async source => { |
162 |
| - if (this._options.signal != null) { |
163 |
| - source = abortableSource(source, this._options.signal) |
164 |
| - } |
165 |
| - |
166 |
| - try { |
167 |
| - await pipe( |
168 |
| - source, |
169 |
| - source => each(source, (buf) => { |
170 |
| - // console.info('incoming', uint8ArrayToString(buf, 'base64')) |
171 |
| - }), |
172 |
| - decode, |
173 |
| - restrictSize(this._options.maxMsgSize), |
174 |
| - async source => { |
175 |
| - for await (const msg of source) { |
176 |
| - this._handleIncoming(msg) |
177 |
| - } |
178 |
| - } |
179 |
| - ) |
180 |
| - |
181 |
| - this._source.end() |
182 |
| - } catch (err: any) { |
183 |
| - log('error in sink', err) |
184 |
| - this._source.end(err) // End the source with an error |
185 |
| - } |
186 |
| - } |
187 |
| - |
188 |
| - return sink |
189 |
| - } |
190 |
| - |
191 |
| - /** |
192 |
| - * Creates a source that restricts outgoing message sizes |
193 |
| - * and varint encodes them |
194 |
| - */ |
195 |
| - _createSource () { |
196 |
| - const onEnd = (err?: Error) => { |
197 |
| - const { initiators, receivers } = this._streams |
198 |
| - // Abort all the things! |
199 |
| - for (const s of initiators.values()) { |
200 |
| - s.abort(err) |
201 |
| - } |
202 |
| - for (const s of receivers.values()) { |
203 |
| - s.abort(err) |
204 |
| - } |
205 |
| - } |
206 |
| - const source = pushableV<Message>({ onEnd }) |
207 |
| - |
208 |
| - return Object.assign(encode(source), { |
209 |
| - push: source.push, |
210 |
| - end: source.end, |
211 |
| - return: source.return |
212 |
| - }) |
213 |
| - } |
214 |
| - |
215 |
| - _handleIncoming (message: Message) { |
216 |
| - const { id, type } = message |
217 |
| - |
218 |
| - if (log.enabled) { |
219 |
| - log('incoming message', printMessage(message)) |
220 |
| - } |
221 |
| - |
222 |
| - // Create a new stream? |
223 |
| - if (message.type === MessageTypes.NEW_STREAM) { |
224 |
| - const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }) |
225 |
| - |
226 |
| - if (this._options.onIncomingStream != null) { |
227 |
| - this._options.onIncomingStream(stream) |
228 |
| - } |
229 |
| - |
230 |
| - return |
231 |
| - } |
232 |
| - |
233 |
| - const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers |
234 |
| - const stream = list.get(id) |
235 |
| - |
236 |
| - if (stream == null) { |
237 |
| - return log('missing stream %s', id) |
238 |
| - } |
| 9 | +export class Mplex implements StreamMuxerFactory { |
| 10 | + public protocol = '/mplex/6.7.0' |
239 | 11 |
|
240 |
| - switch (type) { |
241 |
| - case MessageTypes.MESSAGE_INITIATOR: |
242 |
| - case MessageTypes.MESSAGE_RECEIVER: |
243 |
| - stream.source.push(message.data.slice()) |
244 |
| - break |
245 |
| - case MessageTypes.CLOSE_INITIATOR: |
246 |
| - case MessageTypes.CLOSE_RECEIVER: |
247 |
| - stream.close() |
248 |
| - break |
249 |
| - case MessageTypes.RESET_INITIATOR: |
250 |
| - case MessageTypes.RESET_RECEIVER: |
251 |
| - stream.reset() |
252 |
| - break |
253 |
| - default: |
254 |
| - log('unknown message type %s', type) |
255 |
| - } |
| 12 | + createStreamMuxer (components: Components, init?: MplexInit) { |
| 13 | + return new MplexStreamMuxer(components, init) |
256 | 14 | }
|
257 | 15 | }
|
0 commit comments