Skip to content

Commit 0d33412

Browse files
committed
Make HTTPBin ready for pooling tests and add some unit tests
1 parent 5237768 commit 0d33412

File tree

6 files changed

+130
-36
lines changed

6 files changed

+130
-36
lines changed

Sources/AsyncHTTPClient/ConnectionPool.swift

+19-14
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class ConnectionPool {
3030
/// The main data structure used by the `ConnectionPool` to retreive and create connections associated
3131
/// to a a given `Key` .
3232
/// - Warning: This property shouldn't be directly accessed, use the `ConnectionPool` subscript instead
33-
private var _connectionProviders: [Key: ConnectionProvider] = [:]
33+
var _connectionProviders: [Key: ConnectionProvider] = [:]
3434

3535
private let lock = Lock()
3636

@@ -113,7 +113,7 @@ class ConnectionPool {
113113
return provider.getConnection(preference: preference)
114114

115115
case .make(let providerPromise):
116-
// If no connection provider doesn't exist for the given key, create a new one
116+
// If no connection provider exists for the given key, create a new one
117117
let selectedEventLoop = providerPromise.futureResult.eventLoop
118118
// Completed when we have been able to determine the connection
119119
// should be using HTTP/1.1 or HTTP/2
@@ -247,7 +247,7 @@ class ConnectionPool {
247247
}
248248

249249
/// This enum abstracts the underlying kind of provider
250-
private enum ConnectionProvider {
250+
enum ConnectionProvider {
251251
case http1(HTTP1ConnectionProvider)
252252
case http2(HTTP2ConnectionProvider)
253253
indirect case future(EventLoopFuture<ConnectionProvider>)
@@ -292,7 +292,7 @@ class ConnectionPool {
292292
}
293293
}
294294

295-
private class HTTP1ConnectionProvider {
295+
class HTTP1ConnectionProvider {
296296
let loopGroup: EventLoopGroup
297297
let configuration: HTTPClient.Configuration
298298
let key: ConnectionPool.Key
@@ -329,6 +329,7 @@ class ConnectionPool {
329329
self.availableConnections.append(initialConnection)
330330
self.parentPool = parentPool
331331
self.isClosed = Atomic(value: false)
332+
self.configureClose(of: initialConnection)
332333
}
333334

334335
deinit {
@@ -355,6 +356,18 @@ class ConnectionPool {
355356
}
356357
}
357358

359+
private func configureClose(of connection: Connection) {
360+
connection.channel.closeFuture.whenComplete { _ in
361+
// FIXME: Maybe there should be an assertionFailure in the else
362+
if !connection.isLeased {
363+
if let indexToRemove = self.availableConnections.firstIndex(where: { $0 === connection }) {
364+
self.availableConnections.swapAt(self.availableConnections.startIndex, indexToRemove)
365+
self.availableConnections.removeFirst()
366+
}
367+
}
368+
}
369+
}
370+
358371
private func makeConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture<Connection> {
359372
let bootstrap = ClientBootstrap.makeHTTPClientBootstrapBase(group: self.loopGroup, host: self.key.host, port: self.key.port, configuration: self.configuration) { channel in
360373
channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration).flatMap {
@@ -364,15 +377,7 @@ class ConnectionPool {
364377
let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy)
365378
return bootstrap.connect(host: address.host, port: address.port).map { channel in
366379
let connection = Connection(key: self.key, channel: channel, parentPool: self.parentPool)
367-
connection.channel.closeFuture.whenComplete { _ in
368-
// FIXME: Maybe there should be an assertionFailure in the else
369-
if !connection.isLeased {
370-
if let indexToRemove = self.availableConnections.firstIndex(where: { $0 === connection }) {
371-
self.availableConnections.swapAt(self.availableConnections.startIndex, indexToRemove)
372-
self.availableConnections.removeFirst()
373-
}
374-
}
375-
}
380+
self.configureClose(of: connection)
376381
connection.isLeased = true
377382
return connection
378383
}
@@ -420,7 +425,7 @@ class ConnectionPool {
420425
}
421426
}
422427

423-
private class HTTP2ConnectionProvider {
428+
class HTTP2ConnectionProvider {
424429
init(group: EventLoopGroup, key: Key, configuration: HTTPClient.Configuration, channel: Channel, multiplexer: HTTP2StreamMultiplexer, parentPool: ConnectionPool) {
425430
self.loopGroup = group
426431
self.key = key

Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extension HTTPClientInternalTests {
3030
("testProxyStreaming", testProxyStreaming),
3131
("testProxyStreamingFailure", testProxyStreamingFailure),
3232
("testUploadStreamingBackpressure", testUploadStreamingBackpressure),
33+
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
3334
]
3435
}
3536
}

Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift

+17
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,21 @@ class HTTPClientInternalTests: XCTestCase {
192192

193193
XCTAssertEqual(delegate.reads, 3)
194194
}
195+
196+
func testResponseConnectionCloseGet() throws {
197+
let httpBin = HTTPBin(ssl: false)
198+
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew,
199+
configuration: HTTPClient.Configuration(certificateVerification: .none))
200+
defer {
201+
XCTAssertNoThrow(try httpClient.syncShutdown())
202+
XCTAssertNoThrow(try httpBin.shutdown())
203+
}
204+
205+
let req = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET, headers: ["Connection": "close"], body: nil)
206+
_ = try! httpClient.execute(request: req).wait()
207+
let el = httpClient.eventLoopGroup.next()
208+
try! el.scheduleTask(in: .milliseconds(500)) {
209+
XCTAssertEqual(httpClient.pool._connectionProviders.count, 0)
210+
}.futureResult.wait()
211+
}
195212
}

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+72-19
Original file line numberDiff line numberDiff line change
@@ -111,27 +111,29 @@ internal final class HTTPBin {
111111
return channel.pipeline.addHandler(try! NIOSSLServerHandler(context: context), position: .first)
112112
}
113113

114-
init(ssl: Bool = false, simulateProxy: HTTPProxySimulator.Option? = nil, channelPromise: EventLoopPromise<Channel>? = nil) {
114+
init(ssl: Bool = false, simulateProxy: HTTPProxySimulator.Option? = nil, channelPromise: EventLoopPromise<Channel>? = nil, connectionDelay: TimeAmount = .nanoseconds(0), maxChannelAge: TimeAmount? = nil) {
115115
self.serverChannel = try! ServerBootstrap(group: self.group)
116116
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
117117
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
118118
.childChannelInitializer { channel in
119-
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).flatMap {
120-
if let simulateProxy = simulateProxy {
121-
let responseEncoder = HTTPResponseEncoder()
122-
let requestDecoder = ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .forwardBytes))
123-
124-
return channel.pipeline.addHandlers([responseEncoder, requestDecoder, HTTPProxySimulator(option: simulateProxy, encoder: responseEncoder, decoder: requestDecoder)], position: .first)
125-
} else {
126-
return channel.eventLoop.makeSucceededFuture(())
127-
}
128-
}.flatMap {
129-
if ssl {
130-
return HTTPBin.configureTLS(channel: channel).flatMap {
131-
channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise))
119+
channel.eventLoop.scheduleTask(in: connectionDelay) {}.futureResult.flatMap {
120+
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).flatMap {
121+
if let simulateProxy = simulateProxy {
122+
let responseEncoder = HTTPResponseEncoder()
123+
let requestDecoder = ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .forwardBytes))
124+
125+
return channel.pipeline.addHandlers([responseEncoder, requestDecoder, HTTPProxySimulator(option: simulateProxy, encoder: responseEncoder, decoder: requestDecoder)], position: .first)
126+
} else {
127+
return channel.eventLoop.makeSucceededFuture(())
128+
}
129+
}.flatMap {
130+
if ssl {
131+
return HTTPBin.configureTLS(channel: channel).flatMap {
132+
channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise))
133+
}
134+
} else {
135+
return channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise))
132136
}
133-
} else {
134-
return channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise))
135137
}
136138
}
137139
}.bind(host: "127.0.0.1", port: 0).wait()
@@ -229,14 +231,53 @@ internal final class HttpBinHandler: ChannelInboundHandler {
229231

230232
let channelPromise: EventLoopPromise<Channel>?
231233
var resps = CircularBuffer<HTTPResponseBuilder>()
232-
233-
init(channelPromise: EventLoopPromise<Channel>? = nil) {
234+
var closeAfterResponse = false
235+
var delay: TimeAmount = .seconds(0)
236+
let creationDate = Date()
237+
let maxChannelAge: TimeAmount?
238+
var shouldClose = false
239+
var isServingRequest = false
240+
241+
init(channelPromise: EventLoopPromise<Channel>? = nil, maxChannelAge: TimeAmount? = nil) {
234242
self.channelPromise = channelPromise
243+
self.maxChannelAge = maxChannelAge
244+
}
245+
246+
func handlerAdded(context: ChannelHandlerContext) {
247+
if let maxChannelAge = self.maxChannelAge {
248+
context.eventLoop.scheduleTask(in: maxChannelAge) {
249+
if !self.isServingRequest {
250+
context.close(promise: nil)
251+
} else {
252+
self.shouldClose = true
253+
}
254+
}
255+
}
256+
}
257+
258+
func parseAndSetOptions(from head: HTTPRequestHead) {
259+
if let delay = head.headers["X-internal-delay"].first {
260+
if let milliseconds = TimeAmount.Value(delay) {
261+
self.delay = TimeAmount.milliseconds(milliseconds)
262+
} else {
263+
assertionFailure("Invalid interval format")
264+
}
265+
} else {
266+
self.delay = .nanoseconds(0)
267+
}
268+
269+
if let connection = head.headers["Connection"].first {
270+
self.closeAfterResponse = (connection == "close")
271+
} else {
272+
self.closeAfterResponse = false
273+
}
235274
}
236275

237276
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
277+
self.isServingRequest = true
238278
switch self.unwrapInboundIn(data) {
239279
case .head(let req):
280+
self.parseAndSetOptions(from: req)
240281
let url = URL(string: req.uri)!
241282
switch url.path {
242283
case "/ok":
@@ -333,7 +374,19 @@ internal final class HttpBinHandler: ChannelInboundHandler {
333374
responseBody.writeBytes(serialized)
334375
context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil)
335376
}
336-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
377+
context.eventLoop.scheduleTask(in: self.delay) {
378+
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { result in
379+
self.isServingRequest = false
380+
switch result {
381+
case .success:
382+
if self.closeAfterResponse || self.shouldClose {
383+
context.close(promise: nil)
384+
}
385+
case .failure(let error):
386+
assertionFailure("\(error)")
387+
}
388+
}
389+
}
337390
}
338391
}
339392

Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ extension HTTPClientTests {
5858
("testWrongContentLengthWithIgnoreErrorForSSLUncleanShutdown", testWrongContentLengthWithIgnoreErrorForSSLUncleanShutdown),
5959
("testEventLoopArgument", testEventLoopArgument),
6060
("testStressGetHttps", testStressGetHttps),
61+
("testResponseDelayGet", testResponseDelayGet),
6162
]
6263
}
6364
}

Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+20-3
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ class HTTPClientTests: XCTestCase {
563563
XCTAssertEqual(true, response)
564564
}
565565

566-
func testStressGetHttps() {
566+
func testStressGetHttps() throws {
567567
let httpBin = HTTPBin(ssl: true)
568568
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew,
569569
configuration: HTTPClient.Configuration(certificateVerification: .none))
@@ -574,9 +574,10 @@ class HTTPClientTests: XCTestCase {
574574

575575
let eventLoop = httpClient.eventLoopGroup
576576
let lastReqPromise = eventLoop.next().makePromise(of: Void.self)
577-
let requestCount = 5000
577+
let requestCount = 200
578578
for i in 1 ... requestCount {
579-
httpClient.get(url: "https://localhost:\(httpBin.port)/get").whenComplete { result in
579+
let req = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)/get", method: .GET, headers: ["X-internal-delay": "100"])
580+
httpClient.execute(request: req).whenComplete { result in
580581
switch result {
581582
case .success(let response):
582583
XCTAssertEqual(.ok, response.status)
@@ -590,4 +591,20 @@ class HTTPClientTests: XCTestCase {
590591
}
591592
try! lastReqPromise.futureResult.wait()
592593
}
594+
595+
func testResponseDelayGet() throws {
596+
let httpBin = HTTPBin(ssl: false)
597+
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew,
598+
configuration: HTTPClient.Configuration(certificateVerification: .none))
599+
defer {
600+
XCTAssertNoThrow(try httpClient.syncShutdown())
601+
XCTAssertNoThrow(try httpBin.shutdown())
602+
}
603+
604+
let req = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET, headers: ["X-internal-delay": "2000"], body: nil)
605+
let start = Date()
606+
let response = try! httpClient.execute(request: req).wait()
607+
XCTAssertEqual(Date().timeIntervalSince(start), 2, accuracy: 0.25)
608+
XCTAssertEqual(response.status, .ok)
609+
}
593610
}

0 commit comments

Comments
 (0)