Skip to content

Commit f85a310

Browse files
committed
Merge branch 'async-connection-improvements'
2 parents e0e7a54 + f0dfb1d commit f85a310

15 files changed

+486
-55
lines changed

Diff for: src/Nest.sln

+16
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nest.Dsl.Factory", "Nest.Ds
4040
EndProject
4141
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nest.ProfilerHelper", "Nest.ProfilerHelper\Nest.ProfilerHelper.csproj", "{2727A374-9866-4A9D-9A40-6175334B5992}"
4242
EndProject
43+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{59540309-1B42-4064-AFB4-D31F346D15E6}"
44+
EndProject
4345
Global
4446
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4547
Debug|Any CPU = Debug|Any CPU
@@ -142,6 +144,16 @@ Global
142144
{2727A374-9866-4A9D-9A40-6175334B5992}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
143145
{2727A374-9866-4A9D-9A40-6175334B5992}.Release|Mixed Platforms.Build.0 = Release|Any CPU
144146
{2727A374-9866-4A9D-9A40-6175334B5992}.Release|x86.ActiveCfg = Release|Any CPU
147+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
148+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
149+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
150+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
151+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Debug|x86.ActiveCfg = Debug|Any CPU
152+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
153+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Release|Any CPU.Build.0 = Release|Any CPU
154+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
155+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Release|Mixed Platforms.Build.0 = Release|Any CPU
156+
{59540309-1B42-4064-AFB4-D31F346D15E6}.Release|x86.ActiveCfg = Release|Any CPU
145157
EndGlobalSection
146158
GlobalSection(SolutionProperties) = preSolution
147159
HideSolutionNode = FALSE
@@ -150,6 +162,10 @@ Global
150162
{E39CC264-A7B3-490D-84B2-D3016D86CD87} = {DDC38E1C-13BF-4C96-A3BF-60F14DFC5069}
151163
{B9FE4875-0171-40F7-A357-064A93BE09C6} = {DDC38E1C-13BF-4C96-A3BF-60F14DFC5069}
152164
{2727A374-9866-4A9D-9A40-6175334B5992} = {DDC38E1C-13BF-4C96-A3BF-60F14DFC5069}
165+
{59540309-1B42-4064-AFB4-D31F346D15E6} = {DDC38E1C-13BF-4C96-A3BF-60F14DFC5069}
166+
EndGlobalSection
167+
GlobalSection(Performance) = preSolution
168+
HasPerformanceSessions = true
153169
EndGlobalSection
154170
GlobalSection(MonoDevelopProperties) = preSolution
155171
StartupItem = ElasticSearch.ConsolePlayground\ElasticSearch.ConsolePlayground.csproj

Diff for: src/Nest/Domain/Connection/AsyncRequestOperation.cs

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using System;
2+
using System.IO;
3+
using System.Net;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Nest.Domain.Connection;
8+
9+
namespace Nest
10+
{
11+
public class AsyncRequestOperation : TaskCompletionSource<ConnectionStatus>, IDisposable
12+
{
13+
private readonly HttpWebRequest m_request;
14+
private readonly string m_requestData;
15+
private readonly IConnectionSettings m_connectionSettings;
16+
private ConnectionStatusTracer m_tracer;
17+
private WebResponse m_response;
18+
private Stream m_responseStream;
19+
20+
public AsyncRequestOperation(HttpWebRequest request, string requestData, IConnectionSettings connectionSettings, ConnectionStatusTracer tracer)
21+
{
22+
m_request = request;
23+
m_requestData = requestData;
24+
m_connectionSettings = connectionSettings;
25+
m_tracer = tracer;
26+
Start();
27+
}
28+
29+
private void Start()
30+
{
31+
if (this.m_requestData != null)
32+
WriteRequestDataAsync();
33+
else
34+
GetResponseAsync();
35+
}
36+
37+
private void WriteRequestDataAsync()
38+
{
39+
this.m_request.BeginGetRequestStream(this.Monitor(ar =>
40+
{
41+
var r = this.m_request.EndGetRequestStream(ar);
42+
var buffer = Encoding.UTF8.GetBytes(this.m_requestData);
43+
r.BeginWrite(buffer, 0, buffer.Length, this.Monitor(writeIar =>
44+
{
45+
r.EndWrite(writeIar);
46+
GetResponseAsync();
47+
}), null);
48+
}), null);
49+
}
50+
51+
private void GetResponseAsync()
52+
{
53+
this.m_request.BeginGetResponse(this.Monitor(iarResponse =>
54+
{
55+
m_response = m_request.EndGetResponse(iarResponse);
56+
m_responseStream = m_response.GetResponseStream();
57+
58+
var buffer = new byte[8192];
59+
var result = new MemoryStream(buffer.Length);
60+
ReadResponseStreamAsync(this.m_responseStream, buffer, result);
61+
62+
}), null);
63+
}
64+
65+
private void ReadResponseStreamAsync(Stream stream, byte[] buffer, MemoryStream result)
66+
{
67+
stream.BeginRead(buffer, 0, buffer.Length, this.Monitor(iar =>
68+
{
69+
var bytes = stream.EndRead(iar);
70+
if (bytes == 0)
71+
{
72+
Done(result);
73+
return;
74+
}
75+
76+
result.Write(buffer, 0, bytes);
77+
ReadResponseStreamAsync(stream, buffer, result);
78+
79+
}), null);
80+
}
81+
82+
private void Done(ConnectionStatus connectionStatus)
83+
{
84+
m_tracer.SetResult(connectionStatus);
85+
TrySetResult(connectionStatus);
86+
Dispose();
87+
}
88+
89+
private void Done(Stream result)
90+
{
91+
result.Position = 0;
92+
var reader = new StreamReader(result);
93+
Done(new ConnectionStatus(reader.ReadToEnd())
94+
{
95+
Request = this.m_requestData,
96+
RequestUrl = this.m_request.RequestUri.ToString(),
97+
RequestMethod = this.m_request.Method
98+
});
99+
100+
}
101+
102+
private AsyncCallback Monitor(AsyncCallback callback)
103+
{
104+
return ar =>
105+
{
106+
try
107+
{
108+
callback(ar);
109+
}
110+
catch (WebException webException)
111+
{
112+
var connectionStatus = new ConnectionStatus(webException)
113+
{
114+
Request = this.m_requestData,
115+
RequestUrl = this.m_request.RequestUri.ToString(),
116+
RequestMethod = this.m_request.Method
117+
};
118+
m_connectionSettings.ConnectionStatusHandler(connectionStatus);
119+
Done(connectionStatus);
120+
}
121+
catch (Exception e)
122+
{
123+
TrySetException(e);
124+
Dispose();
125+
}
126+
};
127+
}
128+
129+
public void Dispose()
130+
{
131+
Dispose(ref m_response);
132+
Dispose(ref m_responseStream);
133+
Dispose(ref m_tracer);
134+
}
135+
136+
private static void Dispose<T>(ref T disposable) where T : class, IDisposable
137+
{
138+
var d = Interlocked.Exchange(ref disposable, null);
139+
if (d != null)
140+
d.Dispose();
141+
}
142+
}
143+
}

Diff for: src/Nest/Domain/Connection/Connection.cs

+52-51
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class Connection : IConnection
1515
{
1616
const int BUFFER_SIZE = 1024;
1717

18-
private IConnectionSettings _ConnectionSettings { get; set; }
18+
protected IConnectionSettings _ConnectionSettings { get; set; }
1919
private Semaphore _ResourceLock;
2020
private readonly bool _enableTrace;
2121

@@ -227,17 +227,13 @@ protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request,
227227
}
228228
try
229229
{
230-
return Task.Factory.StartNew(() =>
231-
{
232-
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
233-
{
230+
//return Task.Factory.StartNew(() =>
231+
//{
232+
234233
this.Iterate(this._AsyncSteps(request, tcs, data), tcs);
235-
var cs = tcs.Task.Result;
236-
tracer.SetResult(cs);
237-
_ConnectionSettings.ConnectionStatusHandler(cs);
238-
return cs;
239-
}
240-
}, TaskCreationOptions.LongRunning);
234+
return tcs.Task;
235+
236+
//}, TaskCreationOptions.LongRunning);
241237
}
242238
finally
243239
{
@@ -247,55 +243,60 @@ protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request,
247243

248244
private IEnumerable<Task> _AsyncSteps(HttpWebRequest request, TaskCompletionSource<ConnectionStatus> tcs, string data = null)
249245
{
250-
var timeout = this._ConnectionSettings.Timeout;
251-
252-
var state = new ConnectionState { Connection = request };
253-
254-
if (data != null)
246+
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
255247
{
256-
var getRequestStream = Task.Factory.FromAsync<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, null);
257-
ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
258-
yield return getRequestStream;
248+
var timeout = this._ConnectionSettings.Timeout;
259249

260-
var requestStream = getRequestStream.Result;
261-
try
262-
{
263-
byte[] buffer = Encoding.UTF8.GetBytes(data);
264-
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, buffer, 0, buffer.Length, state);
265-
yield return writeToRequestStream;
266-
}
267-
finally
250+
var state = new ConnectionState { Connection = request };
251+
252+
if (data != null)
268253
{
269-
requestStream.Close();
254+
var getRequestStream = Task.Factory.FromAsync<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, null);
255+
//ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
256+
yield return getRequestStream;
257+
258+
var requestStream = getRequestStream.Result;
259+
try
260+
{
261+
byte[] buffer = Encoding.UTF8.GetBytes(data);
262+
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, buffer, 0, buffer.Length, state);
263+
yield return writeToRequestStream;
264+
}
265+
finally
266+
{
267+
requestStream.Close();
268+
}
270269
}
271-
}
272270

273-
// Get the response
274-
var getResponse = Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null);
275-
ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
276-
yield return getResponse;
271+
// Get the response
272+
var getResponse = Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null);
273+
//ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
274+
yield return getResponse;
277275

278-
// Get the response stream
279-
using (var response = (HttpWebResponse)getResponse.Result)
280-
using (var responseStream = response.GetResponseStream())
281-
{
282-
// Copy all data from the response stream
283-
var output = new MemoryStream();
284-
var buffer = new byte[BUFFER_SIZE];
285-
while (responseStream != null)
276+
// Get the response stream
277+
using (var response = (HttpWebResponse)getResponse.Result)
278+
using (var responseStream = response.GetResponseStream())
286279
{
287-
var read = Task<int>.Factory.FromAsync(responseStream.BeginRead, responseStream.EndRead, buffer, 0, BUFFER_SIZE, null);
288-
yield return read;
289-
if (read.Result == 0) break;
290-
output.Write(buffer, 0, read.Result);
291-
}
280+
// Copy all data from the response stream
281+
var output = new MemoryStream();
282+
var buffer = new byte[BUFFER_SIZE];
283+
while (responseStream != null)
284+
{
285+
var read = Task<int>.Factory.FromAsync(responseStream.BeginRead, responseStream.EndRead, buffer, 0, BUFFER_SIZE, null);
286+
yield return read;
287+
if (read.Result == 0) break;
288+
output.Write(buffer, 0, read.Result);
289+
}
292290

293-
// Decode the data and store the result
294-
var result = Encoding.UTF8.GetString(output.ToArray());
295-
var cs = new ConnectionStatus(result) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method };
296-
tcs.TrySetResult(cs);
291+
// Decode the data and store the result
292+
var result = Encoding.UTF8.GetString(output.ToArray());
293+
var cs = new ConnectionStatus(result) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method };
294+
tcs.TrySetResult(cs);
295+
tracer.SetResult(cs);
296+
_ConnectionSettings.ConnectionStatusHandler(cs);
297+
}
298+
yield break;
297299
}
298-
yield break;
299300

300301
}
301302

Diff for: src/Nest/Domain/Connection/NoTasksHttpConnection.cs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
 using System;
2+
using System.IO;
3+
using System.Net;
4+
using System.Reflection;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
using Nest.Domain.Connection;
8+
9+
namespace Nest
10+
{
11+
public class NoTaskHttpConnection : Connection
12+
{
13+
public NoTaskHttpConnection(IConnectionSettings settings) : base(settings)
14+
{
15+
}
16+
17+
protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request, string data = null)
18+
{
19+
var operation = new AsyncRequestOperation(
20+
request,
21+
data,
22+
_ConnectionSettings,
23+
new ConnectionStatusTracer( this._ConnectionSettings.TraceEnabled ) );
24+
return operation.Task;
25+
}
26+
27+
}
28+
}

Diff for: src/Nest/Nest.csproj

+2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
<Reference Include="System.Windows.Forms" />
6969
</ItemGroup>
7070
<ItemGroup>
71+
<Compile Include="Domain\Connection\AsyncRequestOperation.cs" />
72+
<Compile Include="Domain\Connection\NoTasksHttpConnection.cs" />
7173
<Compile Include="Domain\Responses\IReindexResponse.cs" />
7274
<Compile Include="DSL\ReindexDescriptor.cs" />
7375
<Compile Include="Exception\ReindexException.cs" />

0 commit comments

Comments
 (0)