Skip to content

Commit 265df2c

Browse files
committed
fix: updated close stream and close controller
1 parent 51cccc4 commit 265df2c

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

src/stream.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,15 @@ export class YamuxStream implements Stream {
136136
throw new Error('stream closed for writing')
137137
}
138138

139-
source = abortableSource(source, anySignal([
139+
const signal = anySignal([
140140
this.abortController.signal,
141141
this.resetController.signal,
142142
this.closeController.signal
143-
]), { returnOnAbort: true })
143+
])
144144

145145
try {
146+
source = abortableSource(source, signal)
147+
146148
for await (let data of source) {
147149
// send in chunks, waiting for window updates
148150
while (data.length !== 0) {
@@ -157,16 +159,22 @@ export class YamuxStream implements Stream {
157159
}
158160
}
159161
} catch (e: any) {
162+
if (this.closeController.signal.aborted) {
163+
return
164+
}
165+
160166
if (this.resetController.signal.aborted) {
161167
this.reset()
162168
}
163169

170+
if (this.abortController.signal.aborted) {
171+
this.abort()
172+
}
173+
164174
if (e.code === ERR_STREAM_RESET) {
165175
this.reset()
166176
this.log?.trace('%s stream %s reset')
167177
}
168-
169-
this.log?.error('stream sink error id=%s', this._id, e)
170178
} finally {
171179
this.log?.trace('stream sink ended id=%s', this._id)
172180
this.closeWrite()
@@ -218,6 +226,10 @@ export class YamuxStream implements Stream {
218226
}
219227

220228
closeWrite (): void {
229+
this.log?.trace('stream close write id=%s', this._id)
230+
231+
this.closeController.abort()
232+
221233
if (this.state === StreamState.Finished) {
222234
return
223235
}
@@ -226,8 +238,6 @@ export class YamuxStream implements Stream {
226238
return
227239
}
228240

229-
this.log?.trace('stream close write id=%s', this._id)
230-
231241
this.writeState = HalfStreamState.Closed
232242

233243
this.sendClose()

test/stream.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ describe('stream', () => {
153153
expect(s1.state).to.equal(StreamState.SYNReceived)
154154
})
155155

156-
it('should end a stream when it is reset', async () => {
156+
it('test stream reset', async () => {
157157
const { client } = testClientServer()
158158

159159
const c1 = client.newStream()
@@ -171,7 +171,7 @@ describe('stream', () => {
171171
expect(c1.state).to.equal(StreamState.Finished)
172172

173173
const expectedError = onResetSpy.onReset.getCalls()[0]?.args[0]
174-
expect(expectedError !== undefined && (expectedError as unknown as { code: string }).code, ERR_STREAM_RESET)
174+
expect((expectedError as unknown as { code: string }).code).to.equal(ERR_STREAM_RESET)
175175
})
176176

177177
it('test stream close read', async () => {

0 commit comments

Comments
 (0)