Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit d3bff7c

Browse files
Alan Shawachingbrain
Alan Shaw
andauthored
feat: add lazy select (#18)
Co-authored-by: Alex Potsides <[email protected]>
1 parent cfb887b commit d3bff7c

File tree

5 files changed

+117
-2
lines changed

5 files changed

+117
-2
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
"it-first": "^1.0.6",
150150
"it-handshake": "^4.0.1",
151151
"it-length-prefixed": "^8.0.2",
152+
"it-merge": "^1.0.4",
152153
"it-pipe": "^2.0.3",
153154
"it-pushable": "^3.0.0",
154155
"it-reader": "^6.0.1",

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ export interface MultistreamSelectInit extends AbortOptions {
2121
writeBytes?: boolean
2222
}
2323

24-
export { select } from './select.js'
24+
export { select, lazySelect } from './select.js'
2525
export { handle } from './handle.js'

src/select.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import { handshake } from 'it-handshake'
55
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
66
import { PROTOCOL_ID } from './index.js'
77
import type { Duplex } from 'it-stream-types'
8-
import type { Uint8ArrayList } from 'uint8arraylist'
8+
import { Uint8ArrayList } from 'uint8arraylist'
9+
import { pushable } from 'it-pushable'
10+
import merge from 'it-merge'
11+
import { reader } from 'it-reader'
912
import type { ByteArrayInit, ByteListInit, MultistreamSelectInit, ProtocolStream } from './index.js'
1013

1114
const log = logger('libp2p:mss:select')
@@ -58,3 +61,56 @@ export async function select (stream: Duplex<any>, protocols: string | string[],
5861
rest()
5962
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
6063
}
64+
65+
/**
66+
* Lazily negotiates a protocol.
67+
*
68+
* It *does not* block writes waiting for the other end to respond. Instead, it
69+
* simply assumes the negotiation went successfully and starts writing data.
70+
*
71+
* Use when it is known that the receiver supports the desired protocol.
72+
*/
73+
export function lazySelect (stream: Duplex<Uint8Array>, protocol: string): ProtocolStream<Uint8Array>
74+
export function lazySelect (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>, protocol: string): ProtocolStream<Uint8ArrayList, Uint8ArrayList | Uint8Array>
75+
export function lazySelect (stream: Duplex<any>, protocol: string): ProtocolStream<any> {
76+
// This is a signal to write the multistream headers if the consumer tries to
77+
// read from the source
78+
const negotiateTrigger = pushable()
79+
let negotiated = false
80+
return {
81+
stream: {
82+
sink: async source => await stream.sink((async function * () {
83+
let first = true
84+
for await (const chunk of merge(source, negotiateTrigger)) {
85+
if (first) {
86+
first = false
87+
negotiated = true
88+
negotiateTrigger.end()
89+
const p1 = uint8ArrayFromString(PROTOCOL_ID)
90+
const p2 = uint8ArrayFromString(protocol)
91+
const list = new Uint8ArrayList(multistream.encode(p1), multistream.encode(p2))
92+
if (chunk.length > 0) list.append(chunk)
93+
yield * list
94+
} else {
95+
yield chunk
96+
}
97+
}
98+
})()),
99+
source: (async function * () {
100+
if (!negotiated) negotiateTrigger.push(new Uint8Array())
101+
const byteReader = reader(stream.source)
102+
let response = await multistream.readString(byteReader)
103+
if (response === PROTOCOL_ID) {
104+
response = await multistream.readString(byteReader)
105+
}
106+
if (response !== protocol) {
107+
throw errCode(new Error('protocol selection failed'), 'ERR_UNSUPPORTED_PROTOCOL')
108+
}
109+
for await (const chunk of byteReader) {
110+
yield * chunk
111+
}
112+
})()
113+
},
114+
protocol
115+
}
116+
}

test/dialer.spec.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,19 @@ describe('Dialer', () => {
116116
await expect(mss.select(duplex, protocol)).to.eventually.be.rejected().with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
117117
})
118118
})
119+
120+
describe('dialer.lazySelect', () => {
121+
it('should lazily select a single protocol', async () => {
122+
const protocol = '/echo/1.0.0'
123+
const duplex = pair()
124+
125+
const selection = mss.lazySelect(duplex, protocol)
126+
expect(selection.protocol).to.equal(protocol)
127+
128+
// Ensure stream is usable after selection
129+
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
130+
const output = await pipe(input, selection.stream, async (source) => await all(source))
131+
expect(new Uint8ArrayList(...output).slice()).to.eql(new Uint8ArrayList(...input).slice())
132+
})
133+
})
119134
})

test/integration.spec.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,47 @@ describe('Dialer and Listener integration', () => {
7979
])
8080
expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice())
8181
})
82+
83+
it('should handle and lazySelect', async () => {
84+
const protocol = '/echo/1.0.0'
85+
const pair = duplexPair()
86+
87+
const dialerSelection = mss.lazySelect(pair[0], protocol)
88+
expect(dialerSelection.protocol).to.equal(protocol)
89+
90+
// Ensure stream is usable after selection
91+
const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))]
92+
// Since the stream is lazy, we need to write to it before handling
93+
const dialerOutPromise = pipe(input, dialerSelection.stream, async source => await all(source))
94+
95+
const listenerSelection = await mss.handle(pair[1], protocol)
96+
expect(listenerSelection.protocol).to.equal(protocol)
97+
98+
await pipe(listenerSelection.stream, listenerSelection.stream)
99+
100+
const dialerOut = await dialerOutPromise
101+
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
102+
})
103+
104+
it('should abort an unhandled lazySelect', async () => {
105+
const protocol = '/echo/1.0.0'
106+
const pair = duplexPair()
107+
108+
const dialerSelection = mss.lazySelect(pair[0], protocol)
109+
expect(dialerSelection.protocol).to.equal(protocol)
110+
111+
// Ensure stream is usable after selection
112+
const input = [new Uint8ArrayList(randomBytes(10), randomBytes(64), randomBytes(3))]
113+
// Since the stream is lazy, we need to write to it before handling
114+
const dialerResultPromise = pipe(input, dialerSelection.stream, async source => await all(source))
115+
116+
// The error message from this varies depending on how much data got
117+
// written when the dialer receives the `na` response and closes the
118+
// stream, so we just assert that this rejects.
119+
await expect(mss.handle(pair[1], '/unhandled/1.0.0')).to.eventually.be.rejected()
120+
121+
// Dialer should fail to negotiate the single protocol
122+
await expect(dialerResultPromise).to.eventually.be.rejected()
123+
.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
124+
})
82125
})

0 commit comments

Comments
 (0)