@@ -25,52 +25,66 @@ internal final class HTTPClientProxyHandler: ChannelDuplexHandler, RemovableChan
25
25
typealias OutboundIn = HTTPClientRequestPart
26
26
typealias OutboundOut = HTTPClientRequestPart
27
27
28
- enum BufferItem {
28
+ enum WriteItem {
29
29
case write( NIOAny , EventLoopPromise < Void > ? )
30
30
case flush
31
31
}
32
32
33
+ enum ReadState {
34
+ case awaitingResponse
35
+ case connecting
36
+ }
37
+
33
38
private let host : String
34
39
private let port : Int
35
40
private var onConnect : ( Channel ) -> EventLoopFuture < Void >
36
- private var buffer : [ BufferItem ]
41
+ private var writeBuffer : CircularBuffer < WriteItem >
42
+ private var readBuffer : CircularBuffer < NIOAny >
43
+ private var readState : ReadState
37
44
38
45
init ( host: String , port: Int , onConnect: @escaping ( Channel ) -> EventLoopFuture < Void > ) {
39
46
self . host = host
40
47
self . port = port
41
48
self . onConnect = onConnect
42
- self . buffer = [ ]
49
+ self . writeBuffer = . init( )
50
+ self . readBuffer = . init( )
51
+ self . readState = . awaitingResponse
43
52
}
44
53
45
54
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
46
- let res = self . unwrapInboundIn ( data)
47
- switch res {
48
- case . head( let head) :
49
- switch head. status. code {
50
- case 200 ..< 300 :
51
- // Any 2xx (Successful) response indicates that the sender (and all
52
- // inbound proxies) will switch to tunnel mode immediately after the
53
- // blank line that concludes the successful response's header section
55
+ switch self . readState {
56
+ case . awaitingResponse:
57
+ let res = self . unwrapInboundIn ( data)
58
+ switch res {
59
+ case . head( let head) :
60
+ switch head. status. code {
61
+ case 200 ..< 300 :
62
+ // Any 2xx (Successful) response indicates that the sender (and all
63
+ // inbound proxies) will switch to tunnel mode immediately after the
64
+ // blank line that concludes the successful response's header section
65
+ break
66
+ default :
67
+ // Any response other than a successful response
68
+ // indicates that the tunnel has not yet been formed and that the
69
+ // connection remains governed by HTTP.
70
+ context. fireErrorCaught ( HTTPClientErrors . InvalidProxyResponseError ( ) )
71
+ }
72
+ case . end:
73
+ _ = self . handleConnect ( context: context)
74
+ case . body:
54
75
break
55
- default :
56
- // Any response other than a successful response
57
- // indicates that the tunnel has not yet been formed and that the
58
- // connection remains governed by HTTP.
59
- context. fireErrorCaught ( HTTPClientErrors . InvalidProxyResponseError ( ) )
60
76
}
61
- case . end:
62
- _ = self . handleConnect ( context: context)
63
- case . body:
64
- break
77
+ case . connecting:
78
+ self . readBuffer. append ( data)
65
79
}
66
80
}
67
81
68
82
func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
69
- self . buffer . append ( . write( data, promise) )
83
+ self . writeBuffer . append ( . write( data, promise) )
70
84
}
71
85
72
86
func flush( context: ChannelHandlerContext ) {
73
- self . buffer . append ( . flush)
87
+ self . writeBuffer . append ( . flush)
74
88
}
75
89
76
90
func channelActive( context: ChannelHandlerContext ) {
@@ -82,18 +96,18 @@ internal final class HTTPClientProxyHandler: ChannelDuplexHandler, RemovableChan
82
96
83
97
private func handleConnect( context: ChannelHandlerContext ) -> EventLoopFuture < Void > {
84
98
return self . onConnect ( context. channel) . flatMap {
85
- while self . buffer . count > 0 {
86
- // make a copy of the current buffer and clear it in case any
87
- // calls to context.write cause more requests to be buffered
88
- let buffer = self . buffer
89
- self . buffer = [ ]
90
- buffer . forEach { item in
91
- switch item {
92
- case . flush :
93
- context . flush ( )
94
- case . write ( let data , let promise ) :
95
- context . write ( data, promise : promise)
96
- }
99
+ // forward any buffered reads
100
+ while ! self . readBuffer . isEmpty {
101
+ context. fireChannelRead ( self . readBuffer . removeFirst ( ) )
102
+ }
103
+
104
+ // calls to context.write may be re-entrant
105
+ while ! self . writeBuffer . isEmpty {
106
+ switch self . writeBuffer . removeFirst ( ) {
107
+ case . flush:
108
+ context . flush ( )
109
+ case . write( let data, let promise) :
110
+ context . write ( data , promise : promise )
97
111
}
98
112
}
99
113
return context. pipeline. removeHandler ( self )
0 commit comments