Skip to content

Commit 010ca84

Browse files
committed
Merge pull request #1211 from elasticsearch/fix/compress-body
Fix/compress body
2 parents 251d92e + 23a85ab commit 010ca84

File tree

6 files changed

+70
-23
lines changed

6 files changed

+70
-23
lines changed

src/Connections/Elasticsearch.Net.Connection.HttpClient/HttpClientConnection.cs

+11-6
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public HttpClientConnection(IConnectionConfigurationValues settings, HttpClientH
5858
{
5959
Timeout = TimeSpan.FromMilliseconds(_settings.Timeout)
6060
};
61-
if (settings.EnableCompressedResponses && innerHandler.SupportsAutomaticDecompression)
61+
if (settings.EnableCompressedResponses || settings.EnableHttpCompression && innerHandler.SupportsAutomaticDecompression)
6262
{
6363
innerHandler.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
6464
Client.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
@@ -97,12 +97,10 @@ public ElasticsearchResponse<Stream> DoRequestSync(HttpMethod method, Uri uri, b
9797
{
9898
ThrowIfDisposed();
9999

100-
var requestTask = DoRequest(method, uri, data, requestSpecificConfig);
101-
102100
try
103101
{
104-
requestTask.Wait();
105-
return requestTask.Result;
102+
return this.DoRequestInternal(method, uri, data, requestSpecificConfig)
103+
.Result;
106104
}
107105
catch (AggregateException ex)
108106
{
@@ -123,6 +121,12 @@ public ElasticsearchResponse<Stream> DoRequestSync(HttpMethod method, Uri uri, b
123121
/// <param name="requestSpecificConfig">The request specific configuration.</param>
124122
/// <returns>Task&lt;ElasticsearchResponse&lt;Stream&gt;&gt;.</returns>
125123
public async Task<ElasticsearchResponse<Stream>> DoRequest(HttpMethod method, Uri uri, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
124+
{
125+
return await this.DoRequestInternal(method, uri, data, requestSpecificConfig).ConfigureAwait(false);
126+
}
127+
128+
public async Task<ElasticsearchResponse<Stream>> DoRequestInternal(
129+
HttpMethod method, Uri uri, byte[] data = null, IRequestConfiguration requestSpecificConfig = null)
126130
{
127131
ThrowIfDisposed();
128132

@@ -146,7 +150,8 @@ public async Task<ElasticsearchResponse<Stream>> DoRequest(HttpMethod method, Ur
146150
request.Content.Headers.ContentType = new MediaTypeHeaderValue(DefaultContentType);
147151
}
148152

149-
var response = await Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
153+
var response = await Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead)
154+
.ConfigureAwait(false);
150155

151156
if (method == HttpMethod.Head || response.Content == null)
152157
{

src/Elasticsearch.Net/Connection/Configuration/ConnectionConfiguration.cs

+17-3
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ public class ConnectionConfiguration<T> : IConnectionConfigurationValues, IHideO
113113
private TimeSpan? _sniffLifeSpan;
114114
TimeSpan? IConnectionConfigurationValues.SniffInformationLifeSpan { get{ return _sniffLifeSpan; } }
115115

116-
private bool _compressionEnabled;
117-
bool IConnectionConfigurationValues.EnableCompressedResponses { get{ return _compressionEnabled; } }
116+
private bool _enableCompressedResponses;
117+
bool IConnectionConfigurationValues.EnableCompressedResponses { get{ return _enableCompressedResponses; } }
118+
119+
private bool _enableHttpCompression;
120+
bool IConnectionConfigurationValues.EnableHttpCompression { get{ return _enableHttpCompression; } }
118121

119122
private bool _traceEnabled;
120123
bool IConnectionConfigurationValues.TraceEnabled { get{ return _traceEnabled; } }
@@ -179,9 +182,20 @@ public T SniffLifeSpan(TimeSpan sniffTimeSpan)
179182
/// Enable compressed responses from elasticsearch (NOTE that that nodes need to be configured to allow this)
180183
/// http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-http.html
181184
/// </summary>
185+
[Obsolete("Scheduled to be removed in 2.0, please use EnableHttpCompression")]
182186
public T EnableCompressedResponses(bool enabled = true)
183187
{
184-
this._compressionEnabled = enabled;
188+
this._enableCompressedResponses = enabled;
189+
return (T) this;
190+
}
191+
192+
/// <summary>
193+
/// Enable gzip compressed requests and responses, do note that you need to configure elasticsearch to set this
194+
/// <see cref="http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-http.html"/>
195+
/// </summary>
196+
public T EnableHttpCompression(bool enabled = true)
197+
{
198+
this._enableHttpCompression = enabled;
185199
return (T) this;
186200
}
187201

src/Elasticsearch.Net/Connection/Configuration/IConnectionConfigurationValues.cs

+7
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,14 @@ public interface IConnectionConfigurationValues
7070
/// <summary>
7171
/// When set signals elasticsearch to respond with compressed responses
7272
/// </summary>
73+
[Obsolete("Scheduled to be removed in 2.0, please use EnableHttpCompression")]
7374
bool EnableCompressedResponses { get; }
75+
76+
/// <summary>
77+
/// Enable gzip compressed requests and responses, do note that you need to configure elasticsearch to set this
78+
/// <see cref="http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-http.html"/>
79+
/// </summary>
80+
bool EnableHttpCompression { get; }
7481

7582
/// <summary>
7683
/// When set will force all connections through this proxy

src/Elasticsearch.Net/Connection/HttpConnection.cs

+32-6
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,22 @@ public virtual ElasticsearchResponse<Stream> PostSync(Uri uri, byte[] data, IReq
6767
{
6868
return this.BodyRequest(uri, data, "POST", requestSpecificConfig);
6969
}
70+
7071
public virtual ElasticsearchResponse<Stream> PutSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
7172
{
7273
return this.BodyRequest(uri, data, "PUT", requestSpecificConfig);
7374
}
75+
7476
public virtual ElasticsearchResponse<Stream> DeleteSync(Uri uri, IRequestConfiguration requestSpecificConfig = null)
7577
{
7678
return this.HeaderOnlyRequest(uri, "DELETE", requestSpecificConfig);
7779
}
80+
7881
public virtual ElasticsearchResponse<Stream> DeleteSync(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
7982
{
8083
return this.BodyRequest(uri, data, "DELETE", requestSpecificConfig);
8184
}
8285

83-
8486
private ElasticsearchResponse<Stream> HeaderOnlyRequest(Uri uri, string method, IRequestConfiguration requestSpecificConfig)
8587
{
8688
var r = this.CreateHttpWebRequest(uri, method, null, requestSpecificConfig);
@@ -93,17 +95,18 @@ private ElasticsearchResponse<Stream> BodyRequest(Uri uri, byte[] data, string m
9395
return this.DoSynchronousRequest(r, data, requestSpecificConfig);
9496
}
9597

96-
9798
public virtual Task<ElasticsearchResponse<Stream>> Get(Uri uri, IRequestConfiguration requestSpecificConfig = null)
9899
{
99100
var r = this.CreateHttpWebRequest(uri, "GET", null, requestSpecificConfig);
100101
return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
101102
}
103+
102104
public virtual Task<ElasticsearchResponse<Stream>> Head(Uri uri, IRequestConfiguration requestSpecificConfig = null)
103105
{
104106
var r = this.CreateHttpWebRequest(uri, "HEAD", null, requestSpecificConfig);
105107
return this.DoAsyncRequest(r, requestSpecificConfig: requestSpecificConfig);
106108
}
109+
107110
public virtual Task<ElasticsearchResponse<Stream>> Post(Uri uri, byte[] data, IRequestConfiguration requestSpecificConfig = null)
108111
{
109112
var r = this.CreateHttpWebRequest(uri, "POST", data, requestSpecificConfig);
@@ -121,6 +124,7 @@ public virtual Task<ElasticsearchResponse<Stream>> Delete(Uri uri, byte[] data,
121124
var r = this.CreateHttpWebRequest(uri, "DELETE", data, requestSpecificConfig);
122125
return this.DoAsyncRequest(r, data, requestSpecificConfig: requestSpecificConfig);
123126
}
127+
124128
public virtual Task<ElasticsearchResponse<Stream>> Delete(Uri uri, IRequestConfiguration requestSpecificConfig = null)
125129
{
126130
var r = this.CreateHttpWebRequest(uri, "DELETE", null, requestSpecificConfig);
@@ -201,10 +205,13 @@ protected virtual HttpWebRequest CreateWebRequest(Uri uri, string method, byte[]
201205
request.Pipelined = this.ConnectionSettings.HttpPipeliningEnabled
202206
|| (requestSpecificConfig != null && requestSpecificConfig.EnableHttpPipelining);
203207

204-
if (this.ConnectionSettings.EnableCompressedResponses)
208+
if (this.ConnectionSettings.EnableCompressedResponses
209+
|| this.ConnectionSettings.EnableHttpCompression)
205210
{
206211
request.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
207212
request.Headers.Add("Accept-Encoding", "gzip,deflate");
213+
if (this.ConnectionSettings.EnableHttpCompression)
214+
request.Headers.Add("Content-Encoding", "gzip");
208215
}
209216

210217
if (requestSpecificConfig != null && !string.IsNullOrWhiteSpace(requestSpecificConfig.ContentType))
@@ -235,9 +242,14 @@ protected virtual ElasticsearchResponse<Stream> DoSynchronousRequest(HttpWebRequ
235242

236243
if (data != null)
237244
{
245+
238246
using (var r = request.GetRequestStream())
239247
{
240-
r.Write(data, 0, data.Length);
248+
if (this.ConnectionSettings.EnableHttpCompression)
249+
using (var zipStream = new GZipStream(r, CompressionMode.Compress))
250+
zipStream.Write(data, 0, data.Length);
251+
else
252+
r.Write(data, 0, data.Length);
241253
}
242254
}
243255
try
@@ -325,8 +337,22 @@ private IEnumerable<Task> _AsyncSteps(HttpWebRequest request, TaskCompletionSour
325337
var requestStream = getRequestStream.Result;
326338
try
327339
{
328-
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, data, 0, data.Length, null);
329-
yield return writeToRequestStream;
340+
if (this.ConnectionSettings.EnableHttpCompression)
341+
{
342+
using (var zipStream = new GZipStream(requestStream, CompressionMode.Compress))
343+
{
344+
345+
var writeToRequestStream = Task.Factory.FromAsync(zipStream.BeginWrite, zipStream.EndWrite, data, 0,
346+
data.Length, null);
347+
yield return writeToRequestStream;
348+
}
349+
}
350+
else
351+
{
352+
var writeToRequestStream = Task.Factory.FromAsync(requestStream.BeginWrite, requestStream.EndWrite, data, 0,
353+
data.Length, null);
354+
yield return writeToRequestStream;
355+
}
330356
}
331357
finally
332358
{

src/Tests/Elasticsearch.Net.Integration.Yaml/YamlTestsBase.cs

-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ static YamlTestsBase()
3131
var uri = new Uri("http://"+host+":9200/");
3232
var settings = new ConnectionConfiguration(uri).ExposeRawResponse();
3333

34-
var jsonNetSerializer = new ElasticsearchJsonNetSerializer();
35-
3634
_client = new ElasticsearchClient(settings);
3735
//_client = new ElasticsearchClient(settings, serializer: jsonNetSerializer);
3836
var infoResponse = _client.Info();

src/Tests/Nest.Tests.Integration/Core/AsyncTests.cs

+3-6
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,14 @@ public class AsyncTests : IntegrationTests
88
{
99

1010
[Test]
11-
public void TestIndex()
11+
public async void TestIndex()
1212
{
1313
var newProject = new ElasticsearchProject
1414
{
1515
Name = "COBOLES", //COBOL ES client ?
1616
};
17-
var t = this.Client.IndexAsync<ElasticsearchProject>(newProject);
18-
t.Wait();
19-
Assert.True(t.Result.IsValid);
20-
Assert.True(t.IsCompleted, "task did not complete");
21-
Assert.True(t.IsCompleted, "task did not complete");
17+
var t = await this.Client.IndexAsync<ElasticsearchProject>(newProject);
18+
Assert.True(t.IsValid);
2219
}
2320

2421
}

0 commit comments

Comments
 (0)