Skip to content

Commit 6150977

Browse files
committed
Add HTTPConnectionPool.StateMachine
1 parent 6492685 commit 6150977

9 files changed

+2959
-1
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+HTTP1State.swift

+698
Large diffs are not rendered by default.

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+HTTP2State.swift

+536
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
@_implementationOnly import NIOHTTP2
18+
19+
extension HTTPConnectionPool {
20+
struct StateMachine {
21+
struct Action {
22+
let task: TaskAction
23+
let connection: ConnectionAction
24+
25+
init(_ task: TaskAction, _ connection: ConnectionAction) {
26+
self.task = task
27+
self.connection = connection
28+
}
29+
}
30+
31+
enum ConnectionAction {
32+
enum IsShutdown: Equatable {
33+
case yes(unclean: Bool)
34+
case no
35+
}
36+
37+
case createConnection(Connection.ID, on: EventLoop)
38+
case replaceConnection(Connection, with: Connection.ID, on: EventLoop)
39+
40+
case scheduleTimeoutTimer(Connection.ID)
41+
case cancelTimeoutTimer(Connection.ID)
42+
43+
case closeConnection(Connection, isShutdown: IsShutdown)
44+
case cleanupConnection(close: [Connection], cancel: [Connection], isShutdown: IsShutdown)
45+
46+
case none
47+
}
48+
49+
enum TaskAction {
50+
case executeTask(HTTPRequestTask, Connection, cancelWaiter: Waiter.ID?)
51+
case executeTasks([(HTTPRequestTask, cancelWaiter: Waiter.ID?)], Connection)
52+
case failTask(HTTPRequestTask, Error, cancelWaiter: Waiter.ID?)
53+
case failTasks([(HTTPRequestTask, cancelWaiter: Waiter.ID?)], Error)
54+
55+
case scheduleWaiterTimeout(Waiter.ID, HTTPRequestTask, on: EventLoop)
56+
case cancelWaiterTimeout(Waiter.ID)
57+
58+
case none
59+
}
60+
61+
enum HTTPTypeStateMachine {
62+
case http1(HTTP1StateMachine)
63+
case http2(HTTP2StateMachine)
64+
65+
case modify
66+
}
67+
68+
var state: HTTPTypeStateMachine
69+
var isShuttingDown: Bool = false
70+
71+
let eventLoopGroup: EventLoopGroup
72+
let maximumConcurrentHTTP1Connections: Int
73+
74+
init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) {
75+
self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections
76+
let http1State = HTTP1StateMachine(
77+
idGenerator: idGenerator,
78+
maximumConcurrentConnections: maximumConcurrentHTTP1Connections
79+
)
80+
self.state = .http1(http1State)
81+
self.eventLoopGroup = eventLoopGroup
82+
}
83+
84+
mutating func executeTask(_ task: HTTPRequestTask, onPreffered prefferedEL: EventLoop, required: Bool) -> Action {
85+
switch self.state {
86+
case .http1(var http1StateMachine):
87+
return self.state.modify { state -> Action in
88+
let action = http1StateMachine.executeTask(task, onPreffered: prefferedEL, required: required)
89+
state = .http1(http1StateMachine)
90+
return state.modify(with: action)
91+
}
92+
case .http2(var http2StateMachine):
93+
return self.state.modify { state -> Action in
94+
let action = http2StateMachine.executeTask(task, onPreffered: prefferedEL, required: required)
95+
state = .http2(http2StateMachine)
96+
return state.modify(with: action)
97+
}
98+
case .modify:
99+
preconditionFailure("Invalid state")
100+
}
101+
}
102+
103+
mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action {
104+
switch self.state {
105+
case .http1(var httpStateMachine):
106+
return self.state.modify { state -> Action in
107+
let action = httpStateMachine.newHTTP1ConnectionCreated(connection)
108+
state = .http1(httpStateMachine)
109+
return state.modify(with: action)
110+
}
111+
112+
case .http2:
113+
preconditionFailure("Unimplemented. Switching back to HTTP/1.1 not supported for now")
114+
115+
case .modify:
116+
preconditionFailure("Invalid state")
117+
}
118+
}
119+
120+
mutating func newHTTP2ConnectionCreated(_ connection: Connection, settings: HTTP2Settings) -> Action {
121+
switch self.state {
122+
case .http1(let http1StateMachine):
123+
return self.state.modify { state -> Action in
124+
var http2StateMachine = HTTP2StateMachine(
125+
http1StateMachine: http1StateMachine,
126+
eventLoopGroup: self.eventLoopGroup
127+
)
128+
129+
let action = http2StateMachine.newHTTP2ConnectionCreated(connection, settings: settings)
130+
state = .http2(http2StateMachine)
131+
return state.modify(with: action)
132+
}
133+
134+
case .http2(var http2StateMachine):
135+
return self.state.modify { state -> Action in
136+
let action = http2StateMachine.newHTTP2ConnectionCreated(connection, settings: settings)
137+
state = .http2(http2StateMachine)
138+
return state.modify(with: action)
139+
}
140+
141+
case .modify:
142+
preconditionFailure("Invalid state")
143+
}
144+
}
145+
146+
mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
147+
switch self.state {
148+
case .http1(var http1StateMachine):
149+
return self.state.modify { state -> Action in
150+
let action = http1StateMachine.failedToCreateNewConnection(error, connectionID: connectionID)
151+
state = .http1(http1StateMachine)
152+
return state.modify(with: action)
153+
}
154+
155+
case .http2(var http2StateMachine):
156+
return self.state.modify { state -> Action in
157+
let action = http2StateMachine.failedToCreateNewConnection(error, connectionID: connectionID)
158+
state = .http2(http2StateMachine)
159+
return state.modify(with: action)
160+
}
161+
162+
case .modify:
163+
preconditionFailure("Invalid state")
164+
}
165+
}
166+
167+
mutating func waiterTimeout(_ waitID: Waiter.ID) -> Action {
168+
switch self.state {
169+
case .http1(var http1StateMachine):
170+
return self.state.modify { state -> Action in
171+
let action = http1StateMachine.timeoutWaiter(waitID)
172+
state = .http1(http1StateMachine)
173+
return state.modify(with: action)
174+
}
175+
case .http2(var http2StateMachine):
176+
return self.state.modify { state -> Action in
177+
let action = http2StateMachine.timeoutWaiter(waitID)
178+
state = .http2(http2StateMachine)
179+
return state.modify(with: action)
180+
}
181+
case .modify:
182+
preconditionFailure("Invalid state")
183+
}
184+
}
185+
186+
mutating func cancelWaiter(_ waitID: Waiter.ID) -> Action {
187+
switch self.state {
188+
case .http1(var http1StateMachine):
189+
return self.state.modify { state -> Action in
190+
let action = http1StateMachine.cancelWaiter(waitID)
191+
state = .http1(http1StateMachine)
192+
return state.modify(with: action)
193+
}
194+
case .http2(var http2StateMachine):
195+
return self.state.modify { state -> Action in
196+
let action = http2StateMachine.cancelWaiter(waitID)
197+
state = .http2(http2StateMachine)
198+
return state.modify(with: action)
199+
}
200+
case .modify:
201+
preconditionFailure("Invalid state")
202+
}
203+
}
204+
205+
mutating func connectionTimeout(_ connectionID: Connection.ID) -> Action {
206+
switch self.state {
207+
case .http1(var http1StateMachine):
208+
return self.state.modify { state -> Action in
209+
let action = http1StateMachine.connectionTimeout(connectionID)
210+
state = .http1(http1StateMachine)
211+
return state.modify(with: action)
212+
}
213+
case .http2(var http2StateMachine):
214+
return self.state.modify { state -> Action in
215+
let action = http2StateMachine.connectionTimeout(connectionID)
216+
state = .http2(http2StateMachine)
217+
return state.modify(with: action)
218+
}
219+
case .modify:
220+
preconditionFailure("Invalid state")
221+
}
222+
}
223+
224+
/// A connection has been closed
225+
mutating func connectionClosed(_ connectionID: Connection.ID) -> Action {
226+
switch self.state {
227+
case .http1(var http1StateMachine):
228+
return self.state.modify { state -> Action in
229+
let action = http1StateMachine.connectionClosed(connectionID)
230+
state = .http1(http1StateMachine)
231+
return state.modify(with: action)
232+
}
233+
case .http2(var http2StateMachine):
234+
return self.state.modify { state -> Action in
235+
let action = http2StateMachine.connectionClosed(connectionID)
236+
state = .http2(http2StateMachine)
237+
return state.modify(with: action)
238+
}
239+
case .modify:
240+
preconditionFailure("Invalid state")
241+
}
242+
}
243+
244+
mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
245+
guard case .http1(var http1StateMachine) = self.state else {
246+
preconditionFailure("Invalid state")
247+
}
248+
249+
return self.state.modify { state -> Action in
250+
let action = http1StateMachine.http1ConnectionReleased(connectionID)
251+
state = .http1(http1StateMachine)
252+
return state.modify(with: action)
253+
}
254+
}
255+
256+
/// A connection is done processing a task
257+
mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID, availableStreams: Int) -> Action {
258+
switch self.state {
259+
case .http1:
260+
preconditionFailure("Unimplemented for now")
261+
case .http2(var http2StateMachine):
262+
return self.state.modify { state -> Action in
263+
let action = http2StateMachine.http2ConnectionStreamClosed(connectionID, availableStreams: availableStreams)
264+
state = .http2(http2StateMachine)
265+
return state.modify(with: action)
266+
}
267+
case .modify:
268+
preconditionFailure("Invalid state")
269+
}
270+
}
271+
272+
mutating func shutdown() -> Action {
273+
guard !self.isShuttingDown else {
274+
preconditionFailure("Shutdown must only be called once")
275+
}
276+
277+
self.isShuttingDown = true
278+
279+
switch self.state {
280+
case .http1(var http1StateMachine):
281+
return self.state.modify { state -> Action in
282+
let action = http1StateMachine.shutdown()
283+
state = .http1(http1StateMachine)
284+
return state.modify(with: action)
285+
}
286+
case .http2(var http2StateMachine):
287+
return self.state.modify { state -> Action in
288+
let action = http2StateMachine.shutdown()
289+
state = .http2(http2StateMachine)
290+
return state.modify(with: action)
291+
}
292+
case .modify:
293+
preconditionFailure("Invalid state")
294+
}
295+
}
296+
}
297+
}
298+
299+
extension HTTPConnectionPool.StateMachine.HTTPTypeStateMachine {
300+
mutating func modify<T>(_ closure: (inout Self) throws -> (T)) rethrows -> T {
301+
self = .modify
302+
defer {
303+
if case .modify = self {
304+
preconditionFailure("Invalid state. Use closure to modify state")
305+
}
306+
}
307+
return try closure(&self)
308+
}
309+
310+
mutating func modify(with action: HTTPConnectionPool.StateMachine.Action)
311+
-> HTTPConnectionPool.StateMachine.Action {
312+
return action
313+
}
314+
}
315+
316+
extension HTTPConnectionPool.StateMachine: CustomStringConvertible {
317+
var description: String {
318+
switch self.state {
319+
case .http1(let http1):
320+
return ".http1(\(http1))"
321+
case .http2(let http2):
322+
return ".http2(\(http2))"
323+
case .modify:
324+
preconditionFailure("Invalid state")
325+
}
326+
}
327+
}

0 commit comments

Comments
 (0)