-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathAsyncRequestOperation.cs
143 lines (127 loc) · 3.57 KB
/
AsyncRequestOperation.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Nest.Domain.Connection;
namespace Nest
{
public class AsyncRequestOperation : TaskCompletionSource<ConnectionStatus>, IDisposable
{
private readonly HttpWebRequest m_request;
private readonly string m_requestData;
private readonly IConnectionSettings m_connectionSettings;
private ConnectionStatusTracer m_tracer;
private WebResponse m_response;
private Stream m_responseStream;
public AsyncRequestOperation( HttpWebRequest request, string requestData, IConnectionSettings connectionSettings, ConnectionStatusTracer tracer )
{
m_request = request;
m_requestData = requestData;
m_connectionSettings = connectionSettings;
m_tracer = tracer;
Start();
}
private void Start()
{
if ( this.m_requestData != null )
WriteRequestDataAsync();
else
GetResponseAsync();
}
private void WriteRequestDataAsync()
{
this.m_request.BeginGetRequestStream( this.Monitor( ar =>
{
var r = this.m_request.EndGetRequestStream( ar );
var buffer = Encoding.UTF8.GetBytes( this.m_requestData );
r.BeginWrite( buffer, 0, buffer.Length, this.Monitor( writeIar =>
{
r.EndWrite( writeIar );
GetResponseAsync();
} ), null );
} ), null );
}
private void GetResponseAsync()
{
this.m_request.BeginGetResponse( this.Monitor( iarResponse =>
{
m_response = m_request.EndGetResponse( iarResponse );
m_responseStream = m_response.GetResponseStream();
var buffer = new byte[8192];
var result = new MemoryStream( buffer.Length );
ReadResponseStreamAsync( this.m_responseStream, buffer, result );
} ), null );
}
private void ReadResponseStreamAsync( Stream stream, byte[] buffer, MemoryStream result )
{
stream.BeginRead( buffer, 0, buffer.Length, this.Monitor( iar =>
{
var bytes = stream.EndRead( iar );
if ( bytes == 0 )
{
Done( result );
return;
}
result.Write( buffer, 0, bytes );
ReadResponseStreamAsync( stream, buffer, result );
} ), null );
}
private void Done( ConnectionStatus connectionStatus )
{
m_tracer.SetResult( connectionStatus );
TrySetResult( connectionStatus );
Dispose();
}
private void Done( Stream result )
{
result.Position = 0;
var reader = new StreamReader( result );
Done( new ConnectionStatus( reader.ReadToEnd() )
{
Request = this.m_requestData,
RequestUrl = this.m_request.RequestUri.ToString(),
RequestMethod = this.m_request.Method
} );
}
private AsyncCallback Monitor( AsyncCallback callback )
{
return ar =>
{
try
{
callback( ar );
}
catch ( WebException webException )
{
var connectionStatus = new ConnectionStatus( webException )
{
Request = this.m_requestData,
RequestUrl = this.m_request.RequestUri.ToString(),
RequestMethod = this.m_request.Method
};
m_connectionSettings.ConnectionStatusHandler( connectionStatus );
Done( connectionStatus );
}
catch ( Exception e )
{
TrySetException( e );
Dispose();
}
};
}
public void Dispose()
{
Dispose( ref m_response );
Dispose( ref m_responseStream );
Dispose( ref m_tracer );
}
private static void Dispose<T>( ref T disposable ) where T : class, IDisposable
{
var d = Interlocked.Exchange( ref disposable, null );
if ( d != null )
d.Dispose();
}
}
}