Skip to content

Commit 068235c

Browse files
committed
Add HTTP1Connection
1 parent ed44283 commit 068235c

8 files changed

+752
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
19+
final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
20+
typealias OutboundIn = HTTPExecutingRequest
21+
typealias OutboundOut = HTTPClientRequestPart
22+
typealias InboundIn = HTTPClientResponsePart
23+
24+
var channelContext: ChannelHandlerContext!
25+
26+
var state: HTTP1ConnectionStateMachine = .init() {
27+
didSet {
28+
self.channelContext.eventLoop.assertInEventLoop()
29+
30+
self.logger.trace("Connection state did change", metadata: [
31+
"state": "\(String(describing: self.state))",
32+
])
33+
}
34+
}
35+
36+
/// the currently executing request
37+
private var request: HTTPExecutingRequest?
38+
private var idleReadTimeoutTimer: Scheduled<Void>?
39+
40+
let connection: HTTP1Connection
41+
let logger: Logger
42+
43+
init(connection: HTTP1Connection, logger: Logger) {
44+
self.connection = connection
45+
self.logger = logger
46+
}
47+
48+
func handlerAdded(context: ChannelHandlerContext) {
49+
self.channelContext = context
50+
51+
if context.channel.isActive {
52+
let action = self.state.channelActive(isWritable: context.channel.isWritable)
53+
self.run(action, context: context)
54+
}
55+
}
56+
57+
// MARK: Channel Inbound Handler
58+
59+
func channelActive(context: ChannelHandlerContext) {
60+
let action = self.state.channelActive(isWritable: context.channel.isWritable)
61+
self.run(action, context: context)
62+
}
63+
64+
func channelInactive(context: ChannelHandlerContext) {
65+
let action = self.state.channelInactive()
66+
self.run(action, context: context)
67+
}
68+
69+
func channelWritabilityChanged(context: ChannelHandlerContext) {
70+
self.logger.trace("Channel writability changed", metadata: [
71+
"writable": "\(context.channel.isWritable)",
72+
])
73+
74+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
75+
self.run(action, context: context)
76+
}
77+
78+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
79+
let httpPart = unwrapInboundIn(data)
80+
81+
self.logger.trace("Message received", metadata: [
82+
"message": "\(httpPart)",
83+
])
84+
85+
let action = self.state.channelRead(httpPart)
86+
self.run(action, context: context)
87+
}
88+
89+
func channelReadComplete(context: ChannelHandlerContext) {
90+
let action = self.state.channelReadComplete()
91+
self.run(action, context: context)
92+
}
93+
94+
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
95+
context.close(mode: mode, promise: promise)
96+
}
97+
98+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
99+
self.logger.trace("New request to execute")
100+
101+
let req = self.unwrapOutboundIn(data)
102+
self.request = req
103+
104+
req.willExecuteRequest(self)
105+
106+
let action = self.state.runNewRequest(head: req.requestHead, metadata: req.requestFramingMetadata)
107+
self.run(action, context: context)
108+
}
109+
110+
func read(context: ChannelHandlerContext) {
111+
self.logger.trace("Read")
112+
113+
let action = self.state.read()
114+
self.run(action, context: context)
115+
}
116+
117+
func errorCaught(context: ChannelHandlerContext, error: Error) {
118+
self.logger.trace("Error caught", metadata: [
119+
"error": "\(error)",
120+
])
121+
122+
let action = self.state.errorHappened(error)
123+
self.run(action, context: context)
124+
}
125+
126+
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
127+
switch event {
128+
case HTTPConnectionEvent.cancelRequest:
129+
let action = self.state.requestCancelled(closeConnection: true)
130+
self.run(action, context: context)
131+
default:
132+
context.fireUserInboundEventTriggered(event)
133+
}
134+
}
135+
136+
// MARK: - Run Actions
137+
138+
func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
139+
switch action {
140+
case .sendRequestHead(let head, startBody: let startBody):
141+
if startBody {
142+
context.write(wrapOutboundOut(.head(head)), promise: nil)
143+
context.flush()
144+
145+
self.request!.requestHeadSent()
146+
self.request!.resumeRequestBodyStream()
147+
} else {
148+
context.write(wrapOutboundOut(.head(head)), promise: nil)
149+
context.write(wrapOutboundOut(.end(nil)), promise: nil)
150+
context.flush()
151+
152+
self.request!.requestHeadSent()
153+
}
154+
155+
case .sendBodyPart(let part):
156+
context.writeAndFlush(wrapOutboundOut(.body(part)), promise: nil)
157+
158+
case .sendRequestEnd:
159+
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: nil)
160+
161+
case .pauseRequestBodyStream:
162+
self.request!.pauseRequestBodyStream()
163+
164+
case .resumeRequestBodyStream:
165+
self.request!.resumeRequestBodyStream()
166+
167+
case .fireChannelActive:
168+
context.fireChannelActive()
169+
170+
case .fireChannelInactive:
171+
context.fireChannelInactive()
172+
173+
case .fireChannelError(let error, let close):
174+
context.fireErrorCaught(error)
175+
if close {
176+
context.close(promise: nil)
177+
}
178+
179+
case .read:
180+
context.read()
181+
182+
case .close:
183+
context.close(promise: nil)
184+
185+
case .wait:
186+
break
187+
188+
case .forwardResponseHead(let head, let pauseRequestBodyStream):
189+
self.request!.receiveResponseHead(head)
190+
if pauseRequestBodyStream {
191+
self.request!.pauseRequestBodyStream()
192+
}
193+
194+
case .forwardResponseBodyParts(let buffer):
195+
self.request!.receiveResponseBodyParts(buffer)
196+
197+
case .succeedRequest(let finalAction, let buffer):
198+
// The order here is very important...
199+
// We first nil our own task property! `taskCompleted` will potentially lead to
200+
// situations in which we get a new request right away. We should finish the task
201+
// after the connection was notified, that we finished. A
202+
// `HTTPClient.shutdown(requiresCleanShutdown: true)` will fail if we do it the
203+
// other way around.
204+
205+
let oldRequest = self.request!
206+
self.request = nil
207+
208+
switch finalAction {
209+
case .close:
210+
context.close(promise: nil)
211+
case .sendRequestEnd:
212+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
213+
case .informConnectionIsIdle:
214+
self.connection.taskCompleted()
215+
case .none:
216+
break
217+
}
218+
219+
oldRequest.succeedRequest(buffer)
220+
221+
case .failRequest(let error, let finalAction):
222+
let oldRequest = self.request!
223+
self.request = nil
224+
225+
switch finalAction {
226+
case .close:
227+
context.close(promise: nil)
228+
case .sendRequestEnd:
229+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
230+
case .informConnectionIsIdle:
231+
self.connection.taskCompleted()
232+
case .none:
233+
break
234+
}
235+
236+
oldRequest.fail(error)
237+
}
238+
}
239+
240+
// MARK: - Private Methods -
241+
242+
private func resetIdleReadTimeoutTimer(_ idleReadTimeout: TimeAmount, context: ChannelHandlerContext) {
243+
if let oldTimer = self.idleReadTimeoutTimer {
244+
oldTimer.cancel()
245+
}
246+
247+
self.idleReadTimeoutTimer = context.channel.eventLoop.scheduleTask(in: idleReadTimeout) {
248+
let action = self.state.idleReadTimeoutTriggered()
249+
self.run(action, context: context)
250+
}
251+
}
252+
253+
private func clearIdleReadTimeoutTimer() {
254+
guard let oldTimer = self.idleReadTimeoutTimer else {
255+
preconditionFailure("Expected an idleReadTimeoutTimer to exist.")
256+
}
257+
258+
self.idleReadTimeoutTimer = nil
259+
oldTimer.cancel()
260+
}
261+
262+
// MARK: Private HTTPRequestExecutor
263+
264+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutingRequest) {
265+
guard self.request === request else {
266+
// very likely we got threading issues here...
267+
return
268+
}
269+
270+
let action = self.state.requestStreamPartReceived(data)
271+
self.run(action, context: self.channelContext)
272+
}
273+
274+
private func finishRequestBodyStream0(_ request: HTTPExecutingRequest) {
275+
guard self.request === request else {
276+
// very likely we got threading issues here...
277+
return
278+
}
279+
280+
let action = self.state.requestStreamFinished()
281+
self.run(action, context: self.channelContext)
282+
}
283+
284+
private func demandResponseBodyStream0(_ request: HTTPExecutingRequest) {
285+
guard self.request === request else {
286+
// very likely we got threading issues here...
287+
return
288+
}
289+
290+
self.logger.trace("Downstream requests more response body data")
291+
292+
let action = self.state.demandMoreResponseBodyParts()
293+
self.run(action, context: self.channelContext)
294+
}
295+
296+
func cancelRequest0(_ request: HTTPExecutingRequest) {
297+
guard self.request === request else {
298+
// very likely we got threading issues here...
299+
return
300+
}
301+
302+
let action = self.state.requestCancelled(closeConnection: true)
303+
self.run(action, context: self.channelContext)
304+
}
305+
}
306+
307+
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
308+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutingRequest) {
309+
if self.channelContext.eventLoop.inEventLoop {
310+
self.writeRequestBodyPart0(data, request: request)
311+
} else {
312+
self.channelContext.eventLoop.execute {
313+
self.writeRequestBodyPart0(data, request: request)
314+
}
315+
}
316+
}
317+
318+
func finishRequestBodyStream(_ request: HTTPExecutingRequest) {
319+
if self.channelContext.eventLoop.inEventLoop {
320+
self.finishRequestBodyStream0(request)
321+
} else {
322+
self.channelContext.eventLoop.execute {
323+
self.finishRequestBodyStream0(request)
324+
}
325+
}
326+
}
327+
328+
func demandResponseBodyStream(_ request: HTTPExecutingRequest) {
329+
if self.channelContext.eventLoop.inEventLoop {
330+
self.demandResponseBodyStream0(request)
331+
} else {
332+
self.channelContext.eventLoop.execute {
333+
self.demandResponseBodyStream0(request)
334+
}
335+
}
336+
}
337+
338+
func cancelRequest(_ request: HTTPExecutingRequest) {
339+
if self.channelContext.eventLoop.inEventLoop {
340+
self.cancelRequest0(request)
341+
} else {
342+
self.channelContext.eventLoop.execute {
343+
self.cancelRequest0(request)
344+
}
345+
}
346+
}
347+
}

0 commit comments

Comments
 (0)