Skip to content

Commit 16ad758

Browse files
authored
[HTTP1Connection] Handle 101 Switching Protocols (#442)
1 parent 316cbf9 commit 16ad758

7 files changed

+170
-4
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ConnectionStateMachine.swift

+3-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ struct HTTP1ConnectionStateMachine {
261261
let action = requestStateMachine.channelRead(part)
262262

263263
if case .head(let head) = part, close == false {
264-
close = !head.isKeepAlive
264+
// since the HTTPClient does not support protocol switching, we must close any
265+
// connection that has received a status `.switchingProtocols`
266+
close = !head.isKeepAlive || head.status == .switchingProtocols
265267
}
266268
state = .inRequest(requestStateMachine, close: close)
267269
return state.modify(with: action)

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

+11-3
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,10 @@ struct HTTPRequestStateMachine {
450450
}
451451

452452
private mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action {
453-
guard head.status.code >= 200 else {
454-
// we ignore any leading 1xx headers... No state change needed.
453+
guard head.status.code >= 200 || head.status == .switchingProtocols else {
454+
// We ignore any leading 1xx headers except for 101 (switching protocols). The
455+
// HTTP1ConnectionStateMachine ensures the connection close for 101 after the `.end` is
456+
// received.
455457
return .wait
456458
}
457459

@@ -527,7 +529,13 @@ struct HTTPRequestStateMachine {
527529
preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)")
528530

529531
case .running(_, .waitingForHead):
530-
preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)")
532+
// If we receive a http response header with a status code of 1xx, we ignore the header
533+
// except for 101, which we consume.
534+
// If the remote closes the connection after sending a 1xx (not 101) response head, we
535+
// will receive a response end from the parser. We need to protect against this case.
536+
let error = HTTPClientError.httpEndReceivedAfterHeadWith1xx
537+
self.state = .failed(error)
538+
return .failRequest(error, .close)
531539

532540
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, var responseStreamState))
533541
where head.status.code < 300:

Diff for: Sources/AsyncHTTPClient/HTTPClient.swift

+3
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
901901
case requestStreamCancelled
902902
case getConnectionFromPoolTimeout
903903
case deadlineExceeded
904+
case httpEndReceivedAfterHeadWith1xx
904905
}
905906

906907
private var code: Code
@@ -983,4 +984,6 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
983984
/// - A connection could not be created within the timout period.
984985
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
985986
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
987+
988+
public static let httpEndReceivedAfterHeadWith1xx = HTTPClientError(code: .httpEndReceivedAfterHeadWith1xx)
986989
}

Diff for: Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests+XCTest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ extension HTTP1ConnectionStateMachineTests {
4040
("testChannelReadsAreIgnoredIfConnectionIsClosing", testChannelReadsAreIgnoredIfConnectionIsClosing),
4141
("testRequestIsCancelledWhileWaitingForWritable", testRequestIsCancelledWhileWaitingForWritable),
4242
("testConnectionIsClosedIfErrorHappensWhileInRequest", testConnectionIsClosedIfErrorHappensWhileInRequest),
43+
("testConnectionIsClosedAfterSwitchingProtocols", testConnectionIsClosedAfterSwitchingProtocols),
44+
("testWeDontCrashAfterEarlyHintsAndConnectionClose", testWeDontCrashAfterEarlyHintsAndConnectionClose),
4345
]
4446
}
4547
}

Diff for: Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift

+24
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,30 @@ class HTTP1ConnectionStateMachineTests: XCTestCase {
243243
let decompressionError = NIOHTTPDecompression.DecompressionError.limit
244244
XCTAssertEqual(state.errorHappened(decompressionError), .failRequest(decompressionError, .close))
245245
}
246+
247+
func testConnectionIsClosedAfterSwitchingProtocols() {
248+
var state = HTTP1ConnectionStateMachine()
249+
XCTAssertEqual(state.channelActive(isWritable: true), .fireChannelActive)
250+
let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
251+
let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
252+
let newRequestAction = state.runNewRequest(head: requestHead, metadata: metadata, ignoreUncleanSSLShutdown: false)
253+
XCTAssertEqual(newRequestAction, .sendRequestHead(requestHead, startBody: false))
254+
let responseHead = HTTPResponseHead(version: .http1_1, status: .switchingProtocols)
255+
XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
256+
XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, []))
257+
}
258+
259+
func testWeDontCrashAfterEarlyHintsAndConnectionClose() {
260+
var state = HTTP1ConnectionStateMachine()
261+
XCTAssertEqual(state.channelActive(isWritable: true), .fireChannelActive)
262+
let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
263+
let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
264+
let newRequestAction = state.runNewRequest(head: requestHead, metadata: metadata, ignoreUncleanSSLShutdown: false)
265+
XCTAssertEqual(newRequestAction, .sendRequestHead(requestHead, startBody: false))
266+
let responseHead = HTTPResponseHead(version: .http1_1, status: .init(statusCode: 103, reasonPhrase: "Early Hints"))
267+
XCTAssertEqual(state.channelRead(.head(responseHead)), .wait)
268+
XCTAssertEqual(state.channelRead(.end(nil)), .failRequest(HTTPClientError.httpEndReceivedAfterHeadWith1xx, .close))
269+
}
246270
}
247271

248272
extension HTTP1ConnectionStateMachine.Action: Equatable {

Diff for: Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ extension HTTP1ConnectionTests {
3232
("testConnectionClosesOnCloseHeader", testConnectionClosesOnCloseHeader),
3333
("testConnectionClosesOnRandomlyAppearingCloseHeader", testConnectionClosesOnRandomlyAppearingCloseHeader),
3434
("testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader", testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader),
35+
("testConnectionIsClosedAfterSwitchingProtocols", testConnectionIsClosedAfterSwitchingProtocols),
36+
("testConnectionDoesntCrashAfterConnectionCloseAndEarlyHints", testConnectionDoesntCrashAfterConnectionCloseAndEarlyHints),
3537
("testDownloadStreamingBackpressure", testDownloadStreamingBackpressure),
3638
]
3739
}

Diff for: Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift

+125
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,131 @@ class HTTP1ConnectionTests: XCTestCase {
365365
XCTAssertEqual(httpBin.activeConnections, 0)
366366
}
367367

368+
func testConnectionIsClosedAfterSwitchingProtocols() {
369+
let embedded = EmbeddedChannel()
370+
let logger = Logger(label: "test.http1.connection")
371+
372+
XCTAssertNoThrow(try embedded.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 3000)).wait())
373+
374+
var maybeConnection: HTTP1Connection?
375+
let connectionDelegate = MockConnectionDelegate()
376+
XCTAssertNoThrow(maybeConnection = try HTTP1Connection.start(
377+
channel: embedded,
378+
connectionID: 0,
379+
delegate: connectionDelegate,
380+
configuration: .init(decompression: .enabled(limit: .ratio(4))),
381+
logger: logger
382+
))
383+
guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point.") }
384+
385+
var maybeRequest: HTTPClient.Request?
386+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://swift.org/"))
387+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
388+
389+
let delegate = ResponseAccumulator(request: request)
390+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
391+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
392+
request: request,
393+
eventLoopPreference: .delegate(on: embedded.eventLoop),
394+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
395+
redirectHandler: nil,
396+
connectionDeadline: .now() + .seconds(30),
397+
requestOptions: .forTests(),
398+
delegate: delegate
399+
))
400+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
401+
402+
connection.executeRequest(requestBag)
403+
404+
XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // head
405+
XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // end
406+
407+
let responseString = """
408+
HTTP/1.1 101 Switching Protocols\r\n\
409+
Upgrade: websocket\r\n\
410+
Sec-WebSocket-Accept: xAMUK7/Il9bLRFJrikq6mm8CNZI=\r\n\
411+
Connection: upgrade\r\n\
412+
date: Mon, 27 Sep 2021 17:53:14 GMT\r\n\
413+
\r\n\
414+
\r\nfoo bar baz
415+
"""
416+
417+
XCTAssertTrue(embedded.isActive)
418+
XCTAssertEqual(connectionDelegate.hitConnectionClosed, 0)
419+
XCTAssertEqual(connectionDelegate.hitConnectionReleased, 0)
420+
XCTAssertNoThrow(try embedded.writeInbound(ByteBuffer(string: responseString)))
421+
XCTAssertFalse(embedded.isActive)
422+
(embedded.eventLoop as! EmbeddedEventLoop).run() // tick once to run futures.
423+
XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1)
424+
XCTAssertEqual(connectionDelegate.hitConnectionReleased, 0)
425+
426+
var response: HTTPClient.Response?
427+
XCTAssertNoThrow(response = try requestBag.task.futureResult.wait())
428+
XCTAssertEqual(response?.status, .switchingProtocols)
429+
XCTAssertEqual(response?.headers.count, 4)
430+
XCTAssertEqual(response?.body, nil)
431+
}
432+
433+
func testConnectionDoesntCrashAfterConnectionCloseAndEarlyHints() {
434+
let embedded = EmbeddedChannel()
435+
let logger = Logger(label: "test.http1.connection")
436+
437+
XCTAssertNoThrow(try embedded.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 3000)).wait())
438+
439+
var maybeConnection: HTTP1Connection?
440+
let connectionDelegate = MockConnectionDelegate()
441+
XCTAssertNoThrow(maybeConnection = try HTTP1Connection.start(
442+
channel: embedded,
443+
connectionID: 0,
444+
delegate: connectionDelegate,
445+
configuration: .init(decompression: .enabled(limit: .ratio(4))),
446+
logger: logger
447+
))
448+
guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point.") }
449+
450+
var maybeRequest: HTTPClient.Request?
451+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://swift.org/"))
452+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
453+
454+
let delegate = ResponseAccumulator(request: request)
455+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
456+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
457+
request: request,
458+
eventLoopPreference: .delegate(on: embedded.eventLoop),
459+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
460+
redirectHandler: nil,
461+
connectionDeadline: .now() + .seconds(30),
462+
requestOptions: .forTests(),
463+
delegate: delegate
464+
))
465+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
466+
467+
connection.executeRequest(requestBag)
468+
469+
XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // head
470+
XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // end
471+
472+
let responseString = """
473+
HTTP/1.1 103 Early Hints\r\n\
474+
date: Mon, 27 Sep 2021 17:53:14 GMT\r\n\
475+
\r\n\
476+
\r\n
477+
"""
478+
479+
XCTAssertTrue(embedded.isActive)
480+
XCTAssertEqual(connectionDelegate.hitConnectionClosed, 0)
481+
XCTAssertEqual(connectionDelegate.hitConnectionReleased, 0)
482+
XCTAssertNoThrow(try embedded.writeInbound(ByteBuffer(string: responseString)))
483+
XCTAssertFalse(embedded.isActive)
484+
(embedded.eventLoop as! EmbeddedEventLoop).run() // tick once to run futures.
485+
XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1)
486+
XCTAssertEqual(connectionDelegate.hitConnectionReleased, 0)
487+
488+
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
489+
XCTAssertEqual($0 as? HTTPClientError, .httpEndReceivedAfterHeadWith1xx)
490+
}
491+
}
492+
368493
// In order to test backpressure we need to make sure that reads will not happen
369494
// until the backpressure promise is succeeded. Since we cannot guarantee when
370495
// messages will be delivered to a client pipeline and we need this test to be

0 commit comments

Comments
 (0)