Skip to content

Replaced async model with true async #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions src/Nest/Domain/Connection/AsyncRequestOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Nest.Domain.Connection;

namespace Nest
{
public class AsyncRequestOperation : TaskCompletionSource<ConnectionStatus>, IDisposable
{
private readonly HttpWebRequest m_request;
private readonly string m_requestData;
private readonly IConnectionSettings m_connectionSettings;
private ConnectionStatusTracer m_tracer;
private WebResponse m_response;
private Stream m_responseStream;

public AsyncRequestOperation( HttpWebRequest request, string requestData, IConnectionSettings connectionSettings, ConnectionStatusTracer tracer )
{
m_request = request;
m_requestData = requestData;
m_connectionSettings = connectionSettings;
m_tracer = tracer;
Start();
}

private void Start()
{
if ( this.m_requestData != null )
WriteRequestDataAsync();
else
GetResponseAsync();
}

private void WriteRequestDataAsync()
{
this.m_request.BeginGetRequestStream( this.Monitor( ar =>
{
var r = this.m_request.EndGetRequestStream( ar );
var buffer = Encoding.UTF8.GetBytes( this.m_requestData );
r.BeginWrite( buffer, 0, buffer.Length, this.Monitor( writeIar =>
{
r.EndWrite( writeIar );
GetResponseAsync();
} ), null );
} ), null );
}

private void GetResponseAsync()
{
this.m_request.BeginGetResponse( this.Monitor( iarResponse =>
{
m_response = m_request.EndGetResponse( iarResponse );
m_responseStream = m_response.GetResponseStream();

var buffer = new byte[8192];
var result = new MemoryStream( buffer.Length );
ReadResponseStreamAsync( this.m_responseStream, buffer, result );

} ), null );
}

private void ReadResponseStreamAsync( Stream stream, byte[] buffer, MemoryStream result )
{
stream.BeginRead( buffer, 0, buffer.Length, this.Monitor( iar =>
{
var bytes = stream.EndRead( iar );
if ( bytes == 0 )
{
Done( result );
return;
}

result.Write( buffer, 0, bytes );
ReadResponseStreamAsync( stream, buffer, result );

} ), null );
}

private void Done( ConnectionStatus connectionStatus )
{
m_tracer.SetResult( connectionStatus );
TrySetResult( connectionStatus );
Dispose();
}

private void Done( Stream result )
{
result.Position = 0;
var reader = new StreamReader( result );
Done( new ConnectionStatus( reader.ReadToEnd() )
{
Request = this.m_requestData,
RequestUrl = this.m_request.RequestUri.ToString(),
RequestMethod = this.m_request.Method
} );

}

private AsyncCallback Monitor( AsyncCallback callback )
{
return ar =>
{
try
{
callback( ar );
}
catch ( WebException webException )
{
var connectionStatus = new ConnectionStatus( webException )
{
Request = this.m_requestData,
RequestUrl = this.m_request.RequestUri.ToString(),
RequestMethod = this.m_request.Method
};
m_connectionSettings.ConnectionStatusHandler( connectionStatus );
Done( connectionStatus );
}
catch ( Exception e )
{
TrySetException( e );
Dispose();
}
};
}

public void Dispose()
{
Dispose( ref m_response );
Dispose( ref m_responseStream );
Dispose( ref m_tracer );
}

private static void Dispose<T>( ref T disposable ) where T : class, IDisposable
{
var d = Interlocked.Exchange( ref disposable, null );
if ( d != null )
d.Dispose();
}
}
}
130 changes: 8 additions & 122 deletions src/Nest/Domain/Connection/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.IO;
using System.Net;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Nest.Domain.Connection;

Expand All @@ -16,7 +13,6 @@ public class Connection : IConnection
const int BUFFER_SIZE = 1024;

private IConnectionSettings _ConnectionSettings { get; set; }
private Semaphore _ResourceLock;
private readonly bool _enableTrace;

public Connection(IConnectionSettings settings)
Expand All @@ -25,7 +21,6 @@ public Connection(IConnectionSettings settings)
throw new ArgumentNullException("settings");

this._ConnectionSettings = settings;
this._ResourceLock = new Semaphore(settings.MaximumAsyncConnections, settings.MaximumAsyncConnections);
this._enableTrace = settings.TraceEnabled;
}

Expand Down Expand Up @@ -205,126 +200,17 @@ protected virtual ConnectionStatus DoSynchronousRequest(HttpWebRequest request,

return cs;
}
}

}
}

protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request, string data = null)
{
var tcs = new TaskCompletionSource<ConnectionStatus>();
var timeout = this._ConnectionSettings.Timeout;
if (!this._ResourceLock.WaitOne(timeout))
{
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
{
var m = "Could not start the operation before the timeout of " + timeout +
"ms completed while waiting for the semaphore";
var cs = new ConnectionStatus(new TimeoutException(m));
tcs.SetResult(cs);
tracer.SetResult(cs);
return tcs.Task;
}
}
try
{
return Task.Factory.StartNew(() =>
{
using (var tracer = new ConnectionStatusTracer(this._ConnectionSettings.TraceEnabled))
{
this.Iterate(this._AsyncSteps(request, tcs, data), tcs);
var cs = tcs.Task.Result;
tracer.SetResult(cs);
_ConnectionSettings.ConnectionStatusHandler(cs);
return cs;
}
}, TaskCreationOptions.LongRunning);
}
finally
{
this._ResourceLock.Release();
}
}

private IEnumerable<Task> _AsyncSteps(HttpWebRequest request, TaskCompletionSource<ConnectionStatus> tcs, string data = null)
{
var timeout = this._ConnectionSettings.Timeout;

var state = new ConnectionState { Connection = request };

if (data != null)
{
var getRequestStream = Task.Factory.FromAsync<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream, null);
ThreadPool.RegisterWaitForSingleObject((getRequestStream as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
yield return getRequestStream;

var requestStream = getRequestStream.Result;
try
{
byte[] buffer = Encoding.UTF8.GetBytes(data);
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, buffer, 0, buffer.Length, state);
yield return writeToRequestStream;
}
finally
{
requestStream.Close();
}
}

// Get the response
var getResponse = Task.Factory.FromAsync<WebResponse>(request.BeginGetResponse, request.EndGetResponse, null);
ThreadPool.RegisterWaitForSingleObject((getResponse as IAsyncResult).AsyncWaitHandle, ThreadTimeoutCallback, request, timeout, true);
yield return getResponse;

// Get the response stream
using (var response = (HttpWebResponse)getResponse.Result)
using (var responseStream = response.GetResponseStream())
{
// Copy all data from the response stream
var output = new MemoryStream();
var buffer = new byte[BUFFER_SIZE];
while (responseStream != null)
{
var read = Task<int>.Factory.FromAsync(responseStream.BeginRead, responseStream.EndRead, buffer, 0, BUFFER_SIZE, null);
yield return read;
if (read.Result == 0) break;
output.Write(buffer, 0, read.Result);
}

// Decode the data and store the result
var result = Encoding.UTF8.GetString(output.ToArray());
var cs = new ConnectionStatus(result) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method };
tcs.TrySetResult(cs);
}
yield break;

}

public void Iterate(IEnumerable<Task> asyncIterator, TaskCompletionSource<ConnectionStatus> tcs)
{
var enumerator = asyncIterator.GetEnumerator();
Action<Task> recursiveBody = null;
recursiveBody = completedTask =>
{
if (completedTask != null && completedTask.IsFaulted)
{
//none of the individual steps in _AsyncSteps run in parallel for 1 request
//as this would be impossible we can assume Aggregate Exception.InnerException
var exception = completedTask.Exception.InnerException;

//cleanly exit from exceptions in stages if the exception is a webexception
if (exception is WebException)
tcs.SetResult(new ConnectionStatus(exception));
else
tcs.TrySetException(exception);
enumerator.Dispose();
}
else if (enumerator.MoveNext())
{
enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
}
else enumerator.Dispose();
};
recursiveBody(null);
var operation = new AsyncRequestOperation(
request,
data,
_ConnectionSettings,
new ConnectionStatusTracer( this._ConnectionSettings.TraceEnabled ) );
return operation.Task;
}

private Uri _CreateUriString(string path)
Expand Down Expand Up @@ -386,4 +272,4 @@ public static void LeaveDotsAndSlashesEscaped(Uri uri)
fieldInfo.SetValue(uriParser, uriSyntaxFlags);
}
}
}
}
1 change: 1 addition & 0 deletions src/Nest/Nest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<Reference Include="System.Windows.Forms" />
</ItemGroup>
<ItemGroup>
<Compile Include="Domain\Connection\AsyncRequestOperation.cs" />
<Compile Include="Domain\Responses\IReindexResponse.cs" />
<Compile Include="DSL\ReindexDescriptor.cs" />
<Compile Include="Exception\ReindexException.cs" />
Expand Down