Skip to content

Commit 9f26aff

Browse files
authored
fix: request abort signal (nodejs#3209)
* fix: request abort signal * fixup * fixup * fixup
1 parent d51dabc commit 9f26aff

File tree

3 files changed

+108
-8
lines changed

3 files changed

+108
-8
lines changed

lib/api/api-request.js

+30-6
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
const assert = require('node:assert')
44
const { Readable } = require('./readable')
5-
const { InvalidArgumentError } = require('../core/errors')
5+
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
66
const util = require('../core/util')
77
const { getResolveErrorBodyCallback } = require('./util')
88
const { AsyncResource } = require('node:async_hooks')
9-
const { addSignal, removeSignal } = require('./abort-signal')
109

1110
class RequestHandler extends AsyncResource {
1211
constructor (opts, callback) {
@@ -56,14 +55,33 @@ class RequestHandler extends AsyncResource {
5655
this.onInfo = onInfo || null
5756
this.throwOnError = throwOnError
5857
this.highWaterMark = highWaterMark
58+
this.signal = signal
59+
this.reason = null
60+
this.removeAbortListener = null
5961

6062
if (util.isStream(body)) {
6163
body.on('error', (err) => {
6264
this.onError(err)
6365
})
6466
}
6567

66-
addSignal(this, signal)
68+
if (this.signal) {
69+
if (this.signal.aborted) {
70+
this.reason = this.signal.reason ?? new RequestAbortedError()
71+
} else {
72+
this.removeAbortListener = util.addAbortListener(this.signal, () => {
73+
this.removeAbortListener?.()
74+
this.removeAbortListener = null
75+
76+
this.reason = this.signal.reason ?? new RequestAbortedError()
77+
if (this.res) {
78+
util.destroy(this.res, this.reason)
79+
} else if (this.abort) {
80+
this.abort(this.reason)
81+
}
82+
})
83+
}
84+
}
6785
}
6886

6987
onConnect (abort, context) {
@@ -95,6 +113,13 @@ class RequestHandler extends AsyncResource {
95113
const contentLength = parsedHeaders['content-length']
96114
const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark })
97115

116+
if (this.removeAbortListener) {
117+
// TODO (fix): 'close' is sufficient but breaks tests.
118+
body
119+
.on('end', this.removeAbortListener)
120+
.on('error', this.removeAbortListener)
121+
}
122+
98123
this.callback = null
99124
this.res = body
100125
if (callback !== null) {
@@ -123,8 +148,6 @@ class RequestHandler extends AsyncResource {
123148
onComplete (trailers) {
124149
const { res } = this
125150

126-
removeSignal(this)
127-
128151
util.parseHeaders(trailers, this.trailers)
129152

130153
res.push(null)
@@ -133,7 +156,8 @@ class RequestHandler extends AsyncResource {
133156
onError (err) {
134157
const { res, callback, body, opaque } = this
135158

136-
removeSignal(this)
159+
this.removeAbortListener?.()
160+
this.removeAbortListener = null
137161

138162
if (callback) {
139163
// TODO: Does this need queueMicrotask?

test/issue-2590.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ test('aborting request with custom reason', async (t) => {
2727

2828
await t.rejects(
2929
request(`http://localhost:${server.address().port}`, { signal: ac.signal }),
30-
/Request aborted/
30+
/Error: aborted/
3131
)
3232

3333
await t.rejects(
3434
request(`http://localhost:${server.address().port}`, { signal: ac2.signal }),
35-
{ code: 'UND_ERR_ABORTED' }
35+
{ name: 'AbortError' }
3636
)
3737

3838
await t.completed

test/request-signal.js

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
'use strict'
2+
3+
const { createServer } = require('node:http')
4+
const { test, after } = require('node:test')
5+
const { tspl } = require('@matteo.collina/tspl')
6+
const { request } = require('..')
7+
8+
test('pre abort signal w/ reason', async (t) => {
9+
t = tspl(t, { plan: 1 })
10+
11+
const server = createServer((req, res) => {
12+
res.end('asd')
13+
})
14+
after(() => server.close())
15+
16+
server.listen(0, async () => {
17+
const ac = new AbortController()
18+
const _err = new Error()
19+
ac.abort(_err)
20+
try {
21+
await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
22+
} catch (err) {
23+
t.equal(err, _err)
24+
}
25+
})
26+
await t.completed
27+
})
28+
29+
test('post abort signal', async (t) => {
30+
t = tspl(t, { plan: 1 })
31+
32+
const server = createServer((req, res) => {
33+
res.end('asd')
34+
})
35+
after(() => server.close())
36+
37+
server.listen(0, async () => {
38+
const ac = new AbortController()
39+
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
40+
ac.abort()
41+
try {
42+
/* eslint-disable-next-line no-unused-vars */
43+
for await (const chunk of ures.body) {
44+
// Do nothing...
45+
}
46+
} catch (err) {
47+
t.equal(err.name, 'AbortError')
48+
}
49+
})
50+
await t.completed
51+
})
52+
53+
test('post abort signal w/ reason', async (t) => {
54+
t = tspl(t, { plan: 1 })
55+
56+
const server = createServer((req, res) => {
57+
res.end('asd')
58+
})
59+
after(() => server.close())
60+
61+
server.listen(0, async () => {
62+
const ac = new AbortController()
63+
const _err = new Error()
64+
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
65+
ac.abort(_err)
66+
try {
67+
/* eslint-disable-next-line no-unused-vars */
68+
for await (const chunk of ures.body) {
69+
// Do nothing...
70+
}
71+
} catch (err) {
72+
t.equal(err, _err)
73+
}
74+
})
75+
await t.completed
76+
})

0 commit comments

Comments
 (0)