Skip to content

Commit 4709561

Browse files
committed
Replaced async model with true async
Rewrote DoAsyncRequest to use real async web methods instead of starting new tasks and allocating thread pool resources for waits and callbacks.
1 parent d21b231 commit 4709561

File tree

3 files changed

+152
-122
lines changed

3 files changed

+152
-122
lines changed

Diff for: src/Nest/Domain/Connection/AsyncRequestOperation.cs

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using System;
2+
using System.IO;
3+
using System.Net;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Nest.Domain.Connection;
8+
9+
namespace Nest
10+
{
11+
public class AsyncRequestOperation : TaskCompletionSource<ConnectionStatus>, IDisposable
12+
{
13+
private readonly HttpWebRequest m_request;
14+
private readonly string m_requestData;
15+
private readonly IConnectionSettings m_connectionSettings;
16+
private ConnectionStatusTracer m_tracer;
17+
private WebResponse m_response;
18+
private Stream m_responseStream;
19+
20+
public AsyncRequestOperation( HttpWebRequest request, string requestData, IConnectionSettings connectionSettings, ConnectionStatusTracer tracer )
21+
{
22+
m_request = request;
23+
m_requestData = requestData;
24+
m_connectionSettings = connectionSettings;
25+
m_tracer = tracer;
26+
Start();
27+
}
28+
29+
private void Start()
30+
{
31+
if ( this.m_requestData != null )
32+
WriteRequestDataAsync();
33+
else
34+
GetResponseAsync();
35+
}
36+
37+
private void WriteRequestDataAsync()
38+
{
39+
this.m_request.BeginGetRequestStream( this.Monitor( ar =>
40+
{
41+
var r = this.m_request.EndGetRequestStream( ar );
42+
var buffer = Encoding.UTF8.GetBytes( this.m_requestData );
43+
r.BeginWrite( buffer, 0, buffer.Length, this.Monitor( writeIar =>
44+
{
45+
r.EndWrite( writeIar );
46+
GetResponseAsync();
47+
} ), null );
48+
} ), null );
49+
}
50+
51+
private void GetResponseAsync()
52+
{
53+
this.m_request.BeginGetResponse( this.Monitor( iarResponse =>
54+
{
55+
m_response = m_request.EndGetResponse( iarResponse );
56+
m_responseStream = m_response.GetResponseStream();
57+
58+
var buffer = new byte[8192];
59+
var result = new MemoryStream( buffer.Length );
60+
ReadResponseStreamAsync( this.m_responseStream, buffer, result );
61+
62+
} ), null );
63+
}
64+
65+
private void ReadResponseStreamAsync( Stream stream, byte[] buffer, MemoryStream result )
66+
{
67+
stream.BeginRead( buffer, 0, buffer.Length, this.Monitor( iar =>
68+
{
69+
var bytes = stream.EndRead( iar );
70+
if ( bytes == 0 )
71+
{
72+
Done( result );
73+
return;
74+
}
75+
76+
result.Write( buffer, 0, bytes );
77+
ReadResponseStreamAsync( stream, buffer, result );
78+
79+
} ), null );
80+
}
81+
82+
private void Done( ConnectionStatus connectionStatus )
83+
{
84+
m_tracer.SetResult( connectionStatus );
85+
TrySetResult( connectionStatus );
86+
Dispose();
87+
}
88+
89+
private void Done( Stream result )
90+
{
91+
result.Position = 0;
92+
var reader = new StreamReader( result );
93+
Done( new ConnectionStatus( reader.ReadToEnd() )
94+
{
95+
Request = this.m_requestData,
96+
RequestUrl = this.m_request.RequestUri.ToString(),
97+
RequestMethod = this.m_request.Method
98+
} );
99+
100+
}
101+
102+
private AsyncCallback Monitor( AsyncCallback callback )
103+
{
104+
return ar =>
105+
{
106+
try
107+
{
108+
callback( ar );
109+
}
110+
catch ( WebException webException )
111+
{
112+
var connectionStatus = new ConnectionStatus( webException )
113+
{
114+
Request = this.m_requestData,
115+
RequestUrl = this.m_request.RequestUri.ToString(),
116+
RequestMethod = this.m_request.Method
117+
};
118+
m_connectionSettings.ConnectionStatusHandler( connectionStatus );
119+
Done( connectionStatus );
120+
}
121+
catch ( Exception e )
122+
{
123+
TrySetException( e );
124+
Dispose();
125+
}
126+
};
127+
}
128+
129+
public void Dispose()
130+
{
131+
Dispose( ref m_response );
132+
Dispose( ref m_responseStream );
133+
Dispose( ref m_tracer );
134+
}
135+
136+
private static void Dispose<T>( ref T disposable ) where T : class, IDisposable
137+
{
138+
var d = Interlocked.Exchange( ref disposable, null );
139+
if ( d != null )
140+
d.Dispose();
141+
}
142+
}
143+
}

Diff for: src/Nest/Domain/Connection/Connection.cs

+8-122
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Collections.Specialized;
42
using System.IO;
53
using System.Net;
64
using System.Reflection;
75
using System.Text;
8-
using System.Threading;
96
using System.Threading.Tasks;
107
using Nest.Domain.Connection;
118

@@ -16,7 +13,6 @@ public class Connection : IConnection
1613
const int BUFFER_SIZE = 1024;
1714

1815
private IConnectionSettings _ConnectionSettings { get; set; }
19-
private Semaphore _ResourceLock;
2016
private readonly bool _enableTrace;
2117

2218
public Connection(IConnectionSettings settings)
@@ -25,7 +21,6 @@ public Connection(IConnectionSettings settings)
2521
throw new ArgumentNullException("settings");
2622

2723
this._ConnectionSettings = settings;
28-
this._ResourceLock = new Semaphore(settings.MaximumAsyncConnections, settings.MaximumAsyncConnections);
2924
this._enableTrace = settings.TraceEnabled;
3025
}
3126

@@ -205,126 +200,17 @@ protected virtual ConnectionStatus DoSynchronousRequest(HttpWebRequest request,
205200

206201
return cs;
207202
}
208-
}
209-
203+
}
210204
}
211205

212206
protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request, string data = null)
213207
{
214-
var tcs = new TaskCompletionSource<ConnectionStatus>();
215-
var timeout = this._ConnectionSettings.Timeout;
216-
if (!this._ResourceLock.WaitOne(timeout))
217-
{
218-
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
219-
{
220-
var m = "Could not start the operation before the timeout of " + timeout +
221-
"ms completed while waiting for the semaphore";
222-
var cs = new ConnectionStatus(new TimeoutException(m));
223-
tcs.SetResult(cs);
224-
tracer.SetResult(cs);
225-
return tcs.Task;
226-
}
227-
}
228-
try
229-
{
230-
return Task.Factory.StartNew(() =>
231-
{
232-
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
233-
{
234-
this.Iterate(this._AsyncSteps(request, tcs, data), tcs);
235-
var cs = tcs.Task.Result;
236-
tracer.SetResult(cs);
237-
_ConnectionSettings.ConnectionStatusHandler(cs);
238-
return cs;
239-
}
240-
}, TaskCreationOptions.LongRunning);
241-
}
242-
finally
243-
{
244-
this._ResourceLock.Release();
245-
}
246-
}
247-
248-
private IEnumerable<Task> _AsyncSteps(HttpWebRequest request, TaskCompletionSource<ConnectionStatus> tcs, string data = null)
249-
{
250-
var timeout = this._ConnectionSettings.Timeout;
251-
252-
var state = new ConnectionState { Connection = request };
253-
254-
if (data != null)
255-
{
256-
var getRequestStream = Task.Factory.FromAsync<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, null);
257-
ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
258-
yield return getRequestStream;
259-
260-
var requestStream = getRequestStream.Result;
261-
try
262-
{
263-
byte[] buffer = Encoding.UTF8.GetBytes(data);
264-
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, buffer, 0, buffer.Length, state);
265-
yield return writeToRequestStream;
266-
}
267-
finally
268-
{
269-
requestStream.Close();
270-
}
271-
}
272-
273-
// Get the response
274-
var getResponse = Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null);
275-
ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
276-
yield return getResponse;
277-
278-
// Get the response stream
279-
using (var response = (HttpWebResponse)getResponse.Result)
280-
using (var responseStream = response.GetResponseStream())
281-
{
282-
// Copy all data from the response stream
283-
var output = new MemoryStream();
284-
var buffer = new byte[BUFFER_SIZE];
285-
while (responseStream != null)
286-
{
287-
var read = Task<int>.Factory.FromAsync(responseStream.BeginRead, responseStream.EndRead, buffer, 0, BUFFER_SIZE, null);
288-
yield return read;
289-
if (read.Result == 0) break;
290-
output.Write(buffer, 0, read.Result);
291-
}
292-
293-
// Decode the data and store the result
294-
var result = Encoding.UTF8.GetString(output.ToArray());
295-
var cs = new ConnectionStatus(result) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method };
296-
tcs.TrySetResult(cs);
297-
}
298-
yield break;
299-
300-
}
301-
302-
public void Iterate(IEnumerable<Task> asyncIterator, TaskCompletionSource<ConnectionStatus> tcs)
303-
{
304-
var enumerator = asyncIterator.GetEnumerator();
305-
Action<Task> recursiveBody = null;
306-
recursiveBody = completedTask =>
307-
{
308-
if (completedTask != null && completedTask.IsFaulted)
309-
{
310-
//none of the individual steps in _AsyncSteps run in parallel for 1 request
311-
//as this would be impossible we can assume Aggregate Exception.InnerException
312-
var exception = completedTask.Exception.InnerException;
313-
314-
//cleanly exit from exceptions in stages if the exception is a webexception
315-
if (exception is WebException)
316-
tcs.SetResult(new ConnectionStatus(exception));
317-
else
318-
tcs.TrySetException(exception);
319-
enumerator.Dispose();
320-
}
321-
else if (enumerator.MoveNext())
322-
{
323-
enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
324-
}
325-
else enumerator.Dispose();
326-
};
327-
recursiveBody(null);
208+
var operation = new AsyncRequestOperation(
209+
request,
210+
data,
211+
_ConnectionSettings,
212+
new ConnectionStatusTracer( this._ConnectionSettings.TraceEnabled ) );
213+
return operation.Task;
328214
}
329215

330216
private Uri _CreateUriString(string path)
@@ -386,4 +272,4 @@ public static void LeaveDotsAndSlashesEscaped(Uri uri)
386272
fieldInfo.SetValue(uriParser, uriSyntaxFlags);
387273
}
388274
}
389-
}
275+
}

Diff for: src/Nest/Nest.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
<Reference Include="System.Windows.Forms" />
6969
</ItemGroup>
7070
<ItemGroup>
71+
<Compile Include="Domain\Connection\AsyncRequestOperation.cs" />
7172
<Compile Include="Domain\Responses\IReindexResponse.cs" />
7273
<Compile Include="DSL\ReindexDescriptor.cs" />
7374
<Compile Include="Exception\ReindexException.cs" />

0 commit comments

Comments
 (0)