Skip to content

Commit d939e6e

Browse files
committed
fix: added tests for double sink error
1 parent 887b24d commit d939e6e

File tree

3 files changed

+28
-4
lines changed

3 files changed

+28
-4
lines changed

src/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export const ERR_STREAM_RESET = 'ERR_STREAM_RESET'
2727
export const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT'
2828
export const ERR_MAX_OUTBOUND_STREAMS_EXCEEDED = 'ERROR_MAX_OUTBOUND_STREAMS_EXCEEDED'
2929
export const ERR_DECODE_IN_PROGRESS = 'ERR_DECODE_IN_PROGRESS'
30+
export const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK'
31+
export const ERR_SINK_ENDED = 'ERR_SINK_ENDED'
3032

3133
/**
3234
* INITIAL_STREAM_WINDOW is the initial stream window size.

src/stream.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { CodeError } from '@libp2p/interfaces/errors'
55
import { abortableSource } from 'abortable-iterator'
66
import type { Uint8ArrayList } from 'uint8arraylist'
77
import { Flag, FrameHeader, FrameType, HEADER_LENGTH } from './frame.js'
8-
import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, ERR_STREAM_RESET, INITIAL_STREAM_WINDOW } from './constants.js'
8+
import { ERR_DOUBLE_SINK, ERR_RECV_WINDOW_EXCEEDED, ERR_SINK_ENDED, ERR_STREAM_ABORT, ERR_STREAM_RESET, INITIAL_STREAM_WINDOW } from './constants.js'
99
import type { Logger } from '@libp2p/logger'
1010
import type { Config } from './config.js'
1111
import { anySignal } from 'any-signal'
@@ -85,6 +85,7 @@ export class YamuxStream implements Stream {
8585

8686
private sourceEnd: boolean
8787
private sinkEnd: boolean
88+
private sinkSunk: boolean
8889

8990
private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void
9091
private readonly onStreamEnd: () => void
@@ -122,6 +123,7 @@ export class YamuxStream implements Stream {
122123

123124
this.sourceEnd = false
124125
this.sinkEnd = false
126+
this.sinkSunk = false
125127

126128
this.sourceInput = pushable<Uint8ArrayList>({
127129
onEnd: (err?: Error) => {
@@ -140,8 +142,14 @@ export class YamuxStream implements Stream {
140142
this.source = this.createSource()
141143

142144
this.sink = async (source: Source<Uint8Array | Uint8ArrayList>): Promise<void> => {
145+
if (this.sinkSunk) {
146+
throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK)
147+
}
148+
149+
this.sinkSunk = true
150+
143151
if (this.writeState !== HalfStreamState.Open) {
144-
throw new Error('stream closed for writing')
152+
throw new CodeError('stream closed for writing', ERR_SINK_ENDED)
145153
}
146154

147155
const signal = anySignal([this.abortController.signal, this.resetController.signal, this.closeController.signal])
@@ -292,7 +300,7 @@ export class YamuxStream implements Stream {
292300
}
293301
}
294302

295-
abort (err?: Error): void {
303+
abort (err: Error): void {
296304
switch (this.state) {
297305
case StreamState.Finished:
298306
return

test/stream.spec.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import { pipe } from 'it-pipe'
44
import { expect } from 'aegir/chai'
5+
import randomInt from 'random-int'
56
import { sleep, testClientServer } from './util.js'
67
import { HalfStreamState, StreamState } from '../src/stream.js'
78
import { Pushable, pushable } from 'it-pushable'
@@ -219,7 +220,7 @@ describe('stream', () => {
219220
expect(server['localGoAway']).to.equal(GoAwayCode.ProtocolError)
220221
})
221222

222-
it.skip('test stream sink error', async () => {
223+
it('test stream sink error', async () => {
223224
const { client, server } = testClientServer()
224225

225226
// don't let the server respond
@@ -242,4 +243,17 @@ describe('stream', () => {
242243

243244
await sendPipe
244245
})
246+
247+
it('test stream double sink error', async () => {
248+
const { client } = testClientServer()
249+
250+
const id = randomInt(1000)
251+
const c1 = client.newStream(id.toString())
252+
253+
await c1.sink([])
254+
255+
// cannot sink twice
256+
await expect(c1.sink([]))
257+
.to.eventually.be.rejected.with.property('code', 'ERR_DOUBLE_SINK')
258+
})
245259
})

0 commit comments

Comments
 (0)