1
+ using System ;
2
+ using System . IO ;
3
+ using System . Net ;
4
+ using System . Text ;
5
+ using System . Threading ;
6
+ using System . Threading . Tasks ;
7
+ using Nest ;
8
+ using Nest . Domain . Connection ;
9
+
10
+ namespace ProtocolLoadTest
11
+ {
12
+ public class AsyncRequestOperation : TaskCompletionSource < ConnectionStatus > , IDisposable
13
+ {
14
+ private readonly HttpWebRequest m_request ;
15
+ private readonly string m_requestData ;
16
+ private readonly IConnectionSettings m_connectionSettings ;
17
+ private ConnectionStatusTracer m_tracer ;
18
+ private WebResponse m_response ;
19
+ private Stream m_responseStream ;
20
+
21
+ public AsyncRequestOperation ( HttpWebRequest request , string requestData , IConnectionSettings connectionSettings , ConnectionStatusTracer tracer )
22
+ {
23
+ m_request = request ;
24
+ m_requestData = requestData ;
25
+ m_connectionSettings = connectionSettings ;
26
+ m_tracer = tracer ;
27
+ Start ( ) ;
28
+ }
29
+
30
+ private void Start ( )
31
+ {
32
+ if ( this . m_requestData != null )
33
+ WriteRequestDataAsync ( ) ;
34
+ else
35
+ GetResponseAsync ( ) ;
36
+ }
37
+
38
+ private void WriteRequestDataAsync ( )
39
+ {
40
+ this . m_request . BeginGetRequestStream ( this . Monitor ( ar =>
41
+ {
42
+ var r = this . m_request . EndGetRequestStream ( ar ) ;
43
+ var buffer = Encoding . UTF8 . GetBytes ( this . m_requestData ) ;
44
+ r . BeginWrite ( buffer , 0 , buffer . Length , this . Monitor ( writeIar =>
45
+ {
46
+ r . EndWrite ( writeIar ) ;
47
+ GetResponseAsync ( ) ;
48
+ } ) , null ) ;
49
+ } ) , null ) ;
50
+ }
51
+
52
+ private void GetResponseAsync ( )
53
+ {
54
+ this . m_request . BeginGetResponse ( this . Monitor ( iarResponse =>
55
+ {
56
+ m_response = m_request . EndGetResponse ( iarResponse ) ;
57
+ m_responseStream = m_response . GetResponseStream ( ) ;
58
+
59
+ var buffer = new byte [ 8192 ] ;
60
+ var result = new MemoryStream ( buffer . Length ) ;
61
+ ReadResponseStreamAsync ( this . m_responseStream , buffer , result ) ;
62
+
63
+ } ) , null ) ;
64
+ }
65
+
66
+ private void ReadResponseStreamAsync ( Stream stream , byte [ ] buffer , MemoryStream result )
67
+ {
68
+ stream . BeginRead ( buffer , 0 , buffer . Length , this . Monitor ( iar =>
69
+ {
70
+ var bytes = stream . EndRead ( iar ) ;
71
+ if ( bytes == 0 )
72
+ {
73
+ Done ( result ) ;
74
+ return ;
75
+ }
76
+
77
+ result . Write ( buffer , 0 , bytes ) ;
78
+ ReadResponseStreamAsync ( stream , buffer , result ) ;
79
+
80
+ } ) , null ) ;
81
+ }
82
+
83
+ private void Done ( ConnectionStatus connectionStatus )
84
+ {
85
+ m_tracer . SetResult ( connectionStatus ) ;
86
+ TrySetResult ( connectionStatus ) ;
87
+ Dispose ( ) ;
88
+ }
89
+
90
+ private void Done ( Stream result )
91
+ {
92
+ result . Position = 0 ;
93
+ var reader = new StreamReader ( result ) ;
94
+ Done ( new ConnectionStatus ( reader . ReadToEnd ( ) )
95
+ {
96
+ Request = this . m_requestData ,
97
+ RequestUrl = this . m_request . RequestUri . ToString ( ) ,
98
+ RequestMethod = this . m_request . Method
99
+ } ) ;
100
+
101
+ }
102
+
103
+ private AsyncCallback Monitor ( AsyncCallback callback )
104
+ {
105
+ return ar =>
106
+ {
107
+ try
108
+ {
109
+ callback ( ar ) ;
110
+ }
111
+ catch ( WebException webException )
112
+ {
113
+ var connectionStatus = new ConnectionStatus ( webException )
114
+ {
115
+ Request = this . m_requestData ,
116
+ RequestUrl = this . m_request . RequestUri . ToString ( ) ,
117
+ RequestMethod = this . m_request . Method
118
+ } ;
119
+ m_connectionSettings . ConnectionStatusHandler ( connectionStatus ) ;
120
+ Done ( connectionStatus ) ;
121
+ }
122
+ catch ( Exception e )
123
+ {
124
+ TrySetException ( e ) ;
125
+ Dispose ( ) ;
126
+ }
127
+ } ;
128
+ }
129
+
130
+ public void Dispose ( )
131
+ {
132
+ Dispose ( ref m_response ) ;
133
+ Dispose ( ref m_responseStream ) ;
134
+ Dispose ( ref m_tracer ) ;
135
+ }
136
+
137
+ private static void Dispose < T > ( ref T disposable ) where T : class , IDisposable
138
+ {
139
+ var d = Interlocked . Exchange ( ref disposable , null ) ;
140
+ if ( d != null )
141
+ d . Dispose ( ) ;
142
+ }
143
+ }
144
+ }
0 commit comments