Skip to content

Commit 4a474d5

Browse files
authored
feat!: allow stream muxers and connection encrypters to yield lists (#2256)
Updates the stream type for `MultiaddrConnection` to `Uint8Array | Uint8ArrayList` - this lets us yield `Uint8ArrayList`s from stream muxers and connection encrypters instead of having to copy the list contents into a new `Uint8Array` every time. This lowers the connection latency slightly and increases stream throughput according to the [perf test results](https://observablehq.com/@libp2p-workspace/performance-dashboard?branch=fa6fd4179febbd14ed92d4a7e83d52f729a3af07). BREAKING CHANGE: the `minSendBytes` option has been removed from Mplex since the transport can now decide how to optimise sending data
1 parent ac7bc38 commit 4a474d5

File tree

35 files changed

+220
-283
lines changed

35 files changed

+220
-283
lines changed

packages/connection-encrypter-plaintext/package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@
5050
"@libp2p/interface": "^0.1.2",
5151
"@libp2p/peer-id": "^3.0.6",
5252
"@multiformats/multiaddr": "^12.1.10",
53-
"it-handshake": "^4.1.3",
54-
"it-length-prefixed": "^9.0.3",
55-
"it-map": "^3.0.4",
53+
"it-protobuf-stream": "^1.1.1",
5654
"it-stream-types": "^2.0.1",
5755
"protons-runtime": "^5.0.0",
5856
"uint8arraylist": "^2.4.3"

packages/connection-encrypter-plaintext/src/index.ts

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,49 +22,53 @@
2222

2323
import { UnexpectedPeerError, InvalidCryptoExchangeError } from '@libp2p/interface/errors'
2424
import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id'
25-
import { handshake } from 'it-handshake'
26-
import * as lp from 'it-length-prefixed'
27-
import map from 'it-map'
25+
import { pbStream } from 'it-protobuf-stream'
2826
import { Exchange, KeyType } from './pb/proto.js'
2927
import type { ComponentLogger, Logger } from '@libp2p/interface'
28+
import type { MultiaddrConnection } from '@libp2p/interface/connection'
3029
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface/connection-encrypter'
3130
import type { PeerId } from '@libp2p/interface/peer-id'
32-
import type { Duplex, Source } from 'it-stream-types'
31+
import type { Duplex } from 'it-stream-types'
3332
import type { Uint8ArrayList } from 'uint8arraylist'
3433

3534
const PROTOCOL = '/plaintext/2.0.0'
3635

37-
function lpEncodeExchange (exchange: Exchange): Uint8ArrayList {
38-
const pb = Exchange.encode(exchange)
39-
40-
return lp.encode.single(pb)
41-
}
42-
4336
export interface PlaintextComponents {
4437
logger: ComponentLogger
4538
}
4639

40+
export interface PlaintextInit {
41+
/**
42+
* The peer id exchange must complete within this many milliseconds
43+
* (default: 1000)
44+
*/
45+
timeout?: number
46+
}
47+
4748
class Plaintext implements ConnectionEncrypter {
4849
public protocol: string = PROTOCOL
4950
private readonly log: Logger
51+
private readonly timeout: number
5052

51-
constructor (components: PlaintextComponents) {
53+
constructor (components: PlaintextComponents, init: PlaintextInit = {}) {
5254
this.log = components.logger.forComponent('libp2p:plaintext')
55+
this.timeout = init.timeout ?? 1000
5356
}
5457

55-
async secureInbound (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
58+
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
5659
return this._encrypt(localId, conn, remoteId)
5760
}
5861

59-
async secureOutbound (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
62+
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
6063
return this._encrypt(localId, conn, remoteId)
6164
}
6265

6366
/**
6467
* Encrypt connection
6568
*/
66-
async _encrypt (localId: PeerId, conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remoteId?: PeerId): Promise<SecuredConnection> {
67-
const shake = handshake(conn)
69+
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
70+
const signal = AbortSignal.timeout(this.timeout)
71+
const pb = pbStream(conn).pb(Exchange)
6872

6973
let type = KeyType.RSA
7074

@@ -75,45 +79,40 @@ class Plaintext implements ConnectionEncrypter {
7579
}
7680

7781
// Encode the public key and write it to the remote peer
78-
shake.write(
79-
lpEncodeExchange({
80-
id: localId.toBytes(),
81-
pubkey: {
82-
Type: type,
83-
Data: localId.publicKey ?? new Uint8Array(0)
84-
}
85-
}).subarray()
86-
)
82+
await pb.write({
83+
id: localId.toBytes(),
84+
pubkey: {
85+
Type: type,
86+
Data: localId.publicKey ?? new Uint8Array(0)
87+
}
88+
}, {
89+
signal
90+
})
8791

8892
this.log('write pubkey exchange to peer %p', remoteId)
8993

9094
// Get the Exchange message
91-
const response = (await lp.decode.fromReader(shake.reader).next()).value
92-
93-
if (response == null) {
94-
throw new Error('Did not read response')
95-
}
96-
97-
const id = Exchange.decode(response)
98-
this.log('read pubkey exchange from peer %p', remoteId)
95+
const response = await pb.read({
96+
signal
97+
})
9998

10099
let peerId
101100
try {
102-
if (id.pubkey == null) {
101+
if (response.pubkey == null) {
103102
throw new Error('Public key missing')
104103
}
105104

106-
if (id.pubkey.Data.length === 0) {
105+
if (response.pubkey.Data.length === 0) {
107106
throw new Error('Public key data too short')
108107
}
109108

110-
if (id.id == null) {
109+
if (response.id == null) {
111110
throw new Error('Remote id missing')
112111
}
113112

114-
peerId = await peerIdFromKeys(id.pubkey.Data)
113+
peerId = await peerIdFromKeys(response.pubkey.Data)
115114

116-
if (!peerId.equals(peerIdFromBytes(id.id))) {
115+
if (!peerId.equals(peerIdFromBytes(response.id))) {
117116
throw new Error('Public key did not match id')
118117
}
119118
} catch (err: any) {
@@ -127,18 +126,13 @@ class Plaintext implements ConnectionEncrypter {
127126

128127
this.log('plaintext key exchange completed successfully with peer %p', peerId)
129128

130-
shake.rest()
131-
132129
return {
133-
conn: {
134-
sink: shake.stream.sink,
135-
source: map(shake.stream.source, (buf) => buf.subarray())
136-
},
130+
conn: pb.unwrap().unwrap(),
137131
remotePeer: peerId
138132
}
139133
}
140134
}
141135

142-
export function plaintext (): (components: PlaintextComponents) => ConnectionEncrypter {
143-
return (components) => new Plaintext(components)
136+
export function plaintext (init?: PlaintextInit): (components: PlaintextComponents) => ConnectionEncrypter {
137+
return (components) => new Plaintext(components, init)
144138
}

packages/interface-compliance-tests/src/connection-encryption/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ export default (common: TestSetup<ConnectionEncrypter>): void => {
5757
// Send some data and collect the result
5858
const input = uint8ArrayFromString('data to encrypt')
5959
const result = await pipe(
60-
[input],
60+
async function * () {
61+
yield input
62+
},
6163
outboundResult.conn,
6264
async (source) => all(source)
6365
)

packages/interface-compliance-tests/src/connection-encryption/utils/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ import { multiaddr } from '@multiformats/multiaddr'
33
import { duplexPair } from 'it-pair/duplex'
44
import type { MultiaddrConnection } from '@libp2p/interface/connection'
55
import type { Duplex, Source } from 'it-stream-types'
6+
import type { Uint8ArrayList } from 'uint8arraylist'
67

78
export function createMaConnPair (): [MultiaddrConnection, MultiaddrConnection] {
8-
const [local, remote] = duplexPair<Uint8Array>()
9+
const [local, remote] = duplexPair<Uint8Array | Uint8ArrayList>()
910

10-
function duplexToMaConn (duplex: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>): MultiaddrConnection {
11+
function duplexToMaConn (duplex: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>, Source<Uint8Array | Uint8ArrayList>, Promise<void>>): MultiaddrConnection {
1112
const output: MultiaddrConnection = {
1213
...duplex,
1314
close: async () => {},

packages/interface-compliance-tests/src/mocks/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ export interface Peer {
287287
}
288288

289289
export function multiaddrConnectionPair (a: { peerId: PeerId, registrar: Registrar }, b: { peerId: PeerId, registrar: Registrar }): [ MultiaddrConnection, MultiaddrConnection ] {
290-
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array>()
290+
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array | Uint8ArrayList>()
291291

292292
return [
293293
mockMultiaddrConnection(peerAtoPeerB, b.peerId),

packages/interface-compliance-tests/src/mocks/duplex.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Duplex, Source } from 'it-stream-types'
2+
import type { Uint8ArrayList } from 'uint8arraylist'
23

3-
export function mockDuplex (): Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
4+
export function mockDuplex (): Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>, Source<Uint8Array | Uint8ArrayList>, Promise<void>> {
45
return {
56
source: (async function * () {
67
yield * []

packages/interface-compliance-tests/src/mocks/multiaddr-connection.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import type { MultiaddrConnection } from '@libp2p/interface/connection'
66
import type { PeerId } from '@libp2p/interface/peer-id'
77
import type { Multiaddr } from '@multiformats/multiaddr'
88
import type { Duplex } from 'it-stream-types'
9+
import type { Uint8ArrayList } from 'uint8arraylist'
910

10-
export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Array>> & Partial<MultiaddrConnection>, peerId: PeerId): MultiaddrConnection {
11+
export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> & Partial<MultiaddrConnection>, peerId: PeerId): MultiaddrConnection {
1112
const maConn: MultiaddrConnection = {
1213
async close () {
1314

@@ -36,7 +37,7 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
3637
const { addrs, remotePeer } = opts
3738
const controller = new AbortController()
3839
const [localAddr, remoteAddr] = addrs
39-
const [inboundStream, outboundStream] = duplexPair<Uint8Array>()
40+
const [inboundStream, outboundStream] = duplexPair<Uint8Array | Uint8ArrayList>()
4041

4142
const outbound: MultiaddrConnection = {
4243
...outboundStream,

packages/interface-compliance-tests/src/stream-muxer/base-test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async function drainAndClose (stream: Duplex<any>): Promise<void> {
2222
export default (common: TestSetup<StreamMuxerFactory>): void => {
2323
describe('base', () => {
2424
it('Open a stream from the dialer', async () => {
25-
const p = duplexPair<Uint8Array>()
25+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
2626
const dialerFactory = await common.setup()
2727
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
2828
const onStreamPromise: DeferredPromise<Stream> = defer()
@@ -75,7 +75,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
7575
})
7676

7777
it('Open a stream from the listener', async () => {
78-
const p = duplexPair<Uint8Array>()
78+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
7979
const onStreamPromise: DeferredPromise<Stream> = defer()
8080
const dialerFactory = await common.setup()
8181
const dialer = dialerFactory.createStreamMuxer({
@@ -106,7 +106,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
106106
})
107107

108108
it('Open a stream on both sides', async () => {
109-
const p = duplexPair<Uint8Array>()
109+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
110110
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
111111
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
112112
const dialerFactory = await common.setup()
@@ -146,7 +146,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
146146

147147
it('Open a stream on one side, write, open a stream on the other side', async () => {
148148
const toString = (source: Source<Uint8ArrayList>): AsyncGenerator<string> => map(source, (u) => uint8ArrayToString(u.subarray()))
149-
const p = duplexPair<Uint8Array>()
149+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
150150
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
151151
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
152152
const dialerFactory = await common.setup()

packages/interface-compliance-tests/src/stream-muxer/close-test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
8484
}
8585
})
8686

87-
const p = duplexPair<Uint8Array>()
87+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
8888
void pipe(p[0], dialer, p[0])
8989
void pipe(p[1], listener, p[1])
9090

@@ -104,7 +104,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
104104

105105
// Pause, and then close the dialer
106106
await delay(50)
107-
await pipe([], dialer, drain)
107+
await pipe(async function * () {}, dialer, drain)
108108

109109
expect(openedStreams).to.have.equal(expectedStreams)
110110
expect(dialer.streams).to.have.lengthOf(0)
@@ -126,7 +126,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
126126
}
127127
})
128128

129-
const p = duplexPair<Uint8Array>()
129+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
130130
void pipe(p[0], dialer, p[0])
131131
void pipe(p[1], listener, p[1])
132132

@@ -169,7 +169,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
169169
}
170170
})
171171

172-
const p = duplexPair<Uint8Array>()
172+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
173173
void pipe(p[0], dialer, p[0])
174174
void pipe(p[1], listener, p[1])
175175

@@ -225,7 +225,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
225225
})
226226

227227
it('closing one of the muxed streams doesn\'t close others', async () => {
228-
const p = duplexPair<Uint8Array>()
228+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
229229
const dialerFactory = await common.setup()
230230
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
231231

@@ -276,7 +276,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
276276
it('can close a stream for writing', async () => {
277277
const deferred = pDefer<Error>()
278278

279-
const p = duplexPair<Uint8Array>()
279+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
280280
const dialerFactory = await common.setup()
281281
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
282282
const data = [randomBuffer(), randomBuffer()]
@@ -321,7 +321,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
321321

322322
it('can close a stream for reading', async () => {
323323
const deferred = pDefer<Uint8ArrayList[]>()
324-
const p = duplexPair<Uint8Array>()
324+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
325325
const dialerFactory = await common.setup()
326326
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
327327
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))
@@ -387,7 +387,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
387387
it('should wait for all data to be sent when closing streams', async () => {
388388
const deferred = pDefer<Message>()
389389

390-
const p = duplexPair<Uint8Array>()
390+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
391391
const dialerFactory = await common.setup()
392392
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
393393

@@ -429,7 +429,7 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
429429
it('should abort closing a stream with outstanding data to read', async () => {
430430
const deferred = pDefer<Message>()
431431
432-
const p = duplexPair<Uint8Array>()
432+
const p = duplexPair<Uint8Array | Uint8ArrayList>()
433433
const dialerFactory = await common.setup()
434434
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
435435

packages/interface-compliance-tests/src/stream-muxer/spawner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
99
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
1010

1111
export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMuxer>, nStreams: number, nMsg: number, limit?: number): Promise<void> => {
12-
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array>()
12+
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array | Uint8ArrayList>()
1313

1414
const msg = new Uint8ArrayList(uint8ArrayFromString('simple msg'))
1515

packages/interface/src/connection-encrypter/index.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import type { MultiaddrConnection } from '../connection/index.js'
12
import type { PeerId } from '../peer-id/index.js'
2-
import type { Duplex, Source } from 'it-stream-types'
3+
import type { Duplex } from 'it-stream-types'
4+
import type { Uint8ArrayList } from 'uint8arraylist'
35

46
/**
57
* A libp2p connection encrypter module must be compliant to this interface
@@ -13,18 +15,18 @@ export interface ConnectionEncrypter<Extension = unknown> {
1315
* pass it for extra verification, otherwise it will be determined during
1416
* the handshake.
1517
*/
16-
secureOutbound(localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<Extension>>
18+
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
1719

1820
/**
1921
* Decrypt incoming data. If the remote PeerId is known,
2022
* pass it for extra verification, otherwise it will be determined during
2123
* the handshake
2224
*/
23-
secureInbound(localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<Extension>>
25+
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
2426
}
2527

26-
export interface SecuredConnection<Extension = unknown> {
27-
conn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>
28+
export interface SecuredConnection<Stream = any, Extension = unknown> {
29+
conn: Stream
2830
remoteExtensions?: Extension
2931
remotePeer: PeerId
3032
}

packages/interface/src/connection/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ export interface MultiaddrConnectionTimeline {
318318
* a peer. It is a low-level primitive and is the raw connection
319319
* without encryption or stream multiplexing.
320320
*/
321-
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
321+
export interface MultiaddrConnection extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
322322
/**
323323
* Gracefully close the connection. All queued data will be written to the
324324
* underlying transport.

0 commit comments

Comments
 (0)