Skip to content

Commit 850b1a4

Browse files
committed
Merge pull request #1093 from elasticsearch/fix/max-retry-timeout
Do not retry when timeout is hit
2 parents e582589 + 5600b11 commit 850b1a4

File tree

10 files changed

+380
-22
lines changed

10 files changed

+380
-22
lines changed

src/Elasticsearch.Net/Connection/Configuration/ConnectionConfiguration.cs

+16
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public class ConnectionConfiguration<T> : IConnectionConfigurationValues, IHideO
6161
private int? _maxDeadTimeout;
6262
int? IConnectionConfigurationValues.MaxDeadTimeout { get{ return _maxDeadTimeout; } }
6363

64+
private TimeSpan? _maxRetryTimeout;
65+
TimeSpan? IConnectionConfigurationValues.MaxRetryTimeout { get{ return _maxRetryTimeout; } }
66+
6467
private string _proxyUsername;
6568
string IConnectionConfigurationValues.ProxyUsername { get{ return _proxyUsername; } }
6669

@@ -278,6 +281,19 @@ public T SetMaxDeadTimeout(int timeout)
278281
this._maxDeadTimeout = timeout;
279282
return (T) this;
280283
}
284+
285+
/// <summary>
286+
/// Limits the total runtime including retries separately from <see cref="Timeout"/>
287+
/// <pre>
288+
/// When not specified defaults to <see cref="Timeout"/> which itself defaults to 60seconds
289+
/// </pre>
290+
/// </summary>
291+
public T SetMaxRetryTimeout(TimeSpan maxRetryTimeout)
292+
{
293+
this._maxRetryTimeout = maxRetryTimeout;
294+
return (T) this;
295+
}
296+
281297
/// <summary>
282298
/// Semaphore asynchronous connections automatically by giving
283299
/// it a maximum concurrent connections.

src/Elasticsearch.Net/Connection/Configuration/IConnectionConfigurationValues.cs

+11-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
namespace Elasticsearch.Net.Connection
88
{
9+
//TODO change timeouts to TimeSpans in 2.0?
10+
911
public interface IConnectionConfigurationValues
1012
{
1113
/// <summary>
@@ -26,7 +28,7 @@ public interface IConnectionConfigurationValues
2628
int Timeout { get; }
2729

2830
/// <summary>
29-
/// The timeout to use for ping calls that are issues to check whether a node is up or not.
31+
/// The timeout in milliseconds to use for ping calls that are issues to check whether a node is up or not.
3032
/// </summary>
3133
int? PingTimeout { get; }
3234

@@ -46,6 +48,14 @@ public interface IConnectionConfigurationValues
4648
/// </summary>
4749
int? MaxRetries { get; }
4850

51+
/// <summary>
52+
/// Limits the total runtime including retries separately from <see cref="Timeout"/>
53+
/// <pre>
54+
/// When not specified defaults to <see cref="Timeout"/> which itself defaults to 60seconds
55+
/// </pre>
56+
/// </summary>
57+
TimeSpan? MaxRetryTimeout { get; }
58+
4959
/// <summary>
5060
/// This signals that we do not want to send initial pings to unknown/previously dead nodes
5161
/// and just send the call straightaway

src/Elasticsearch.Net/Connection/ITransportDelegator.cs

+7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ internal interface ITransportDelegator
2626

2727
bool SniffingDisabled(IRequestConfiguration requestConfiguration);
2828
bool SniffOnFaultDiscoveredMoreNodes(ITransportRequestState requestState, int retried, ElasticsearchResponse<Stream> streamResponse);
29+
30+
/// <summary>
31+
/// Returns whether the current delegation over nodes took too long and we should quit.
32+
/// if <see cref="ConnectionSettings.SetMaxRetryTimeout"/> is set we'll use that timeout otherwise we default to th value of
33+
/// <see cref="ConnectionSettings.SetTimeout"/> which itself defaults to 60 seconds
34+
/// </summary>
35+
bool TookTooLongToRetry(ITransportRequestState requestState);
2936

3037
/// <summary>
3138
/// Selects next node uri on request state

src/Elasticsearch.Net/Connection/RequestHandlers/RequestHandlerBase.cs

+36-18
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class RequestHandlerBase
1616
{
1717
protected const int BufferSize = 4096;
1818
protected static readonly string MaxRetryExceptionMessage = "Failed after retrying {2} times: '{0} {1}'. {3}";
19+
protected static readonly string TookTooLongExceptionMessage = "Retry timeout {4} was hit after retrying {2} times: '{0} {1}'. {3}";
1920
protected static readonly string MaxRetryInnerMessage = "InnerException: {0}, InnerMessage: {1}, InnerStackTrace: {2}";
2021

2122
protected readonly IConnectionConfigurationValues _settings;
@@ -123,42 +124,59 @@ protected bool DoneProcessing<T>(
123124

124125
protected void ThrowMaxRetryExceptionWhenNeeded<T>(TransportRequestState<T> requestState, int maxRetries)
125126
{
126-
if (requestState.Retried < maxRetries) return;
127+
var tookToLong = this._delegator.TookTooLongToRetry(requestState);
128+
129+
//not out of date and we havent depleted our retries, get the hell out of here
130+
if (!tookToLong && requestState.Retried < maxRetries) return;
131+
127132
var innerExceptions = requestState.SeenExceptions.Where(e => e != null).ToList();
128133
var innerException = !innerExceptions.HasAny()
129134
? null
130135
: (innerExceptions.Count() == 1)
131136
? innerExceptions.First()
132137
: new AggregateException(requestState.SeenExceptions);
138+
139+
//When we are not using pooling we forcefully rethrow the exception
140+
//and never wrap it in a maxretry exception
133141
if (!requestState.UsingPooling && innerException != null)
134142
throw innerException;
135-
136-
var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, innerException);
143+
144+
var exceptionMessage = tookToLong
145+
? CreateTookTooLongExceptionMessage(requestState, innerException)
146+
: CreateMaxRetryExceptionMessage(requestState, innerException);
137147
throw new MaxRetryException(exceptionMessage, innerException);
138148
}
139149

150+
protected string CreateInnerExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
151+
{
152+
if (e == null) return null;
153+
var aggregate = e as AggregateException;
154+
if (aggregate == null)
155+
return "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
156+
aggregate = aggregate.Flatten();
157+
var innerExceptions = aggregate.InnerExceptions
158+
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
159+
.ToList();
160+
return "\r\n" + string.Join("\r\n", innerExceptions);
161+
}
162+
140163
protected string CreateMaxRetryExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
141164
{
142-
string innerException = null;
143-
if (e != null)
144-
{
145-
var aggregate = e as AggregateException;
146-
if (aggregate != null)
147-
{
148-
aggregate = aggregate.Flatten();
149-
var innerExceptions = aggregate.InnerExceptions
150-
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
151-
.ToList();
152-
innerException = "\r\n" + string.Join("\r\n", innerExceptions);
153-
}
154-
else
155-
innerException = "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
156-
}
165+
string innerException = CreateInnerExceptionMessage(requestState, e);
157166
var exceptionMessage = MaxRetryExceptionMessage
158167
.F(requestState.Method, requestState.Path, requestState.Retried, innerException);
159168
return exceptionMessage;
160169
}
161170

171+
protected string CreateTookTooLongExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
172+
{
173+
string innerException = CreateInnerExceptionMessage(requestState, e);
174+
var timeout = this._settings.MaxRetryTimeout.GetValueOrDefault(TimeSpan.FromMilliseconds(this._settings.Timeout));
175+
var exceptionMessage = TookTooLongExceptionMessage
176+
.F(requestState.Method, requestState.Path, requestState.Retried, innerException, timeout);
177+
return exceptionMessage;
178+
}
179+
162180
protected void OptionallyCloseResponseStreamAndSetSuccess<T>(
163181
ITransportRequestState requestState,
164182
ElasticsearchServerError error,

src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public interface ITransportRequestState
1111
Uri CreatePathOnCurrentNode(string path);
1212
IRequestConfiguration RequestConfiguration { get; }
1313
int Retried { get; }
14+
DateTime StartedOn { get; }
1415
bool SniffedOnConnectionFailure { get; set; }
1516
int? Seed { get; set; }
1617
Uri CurrentNode { get; set; }

src/Elasticsearch.Net/Connection/Transport.cs

+22-2
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ bool ITransportDelegator.Ping(ITransportRequestState requestState)
9999
using (response.Response)
100100
return response.Success;
101101
}
102-
catch(ElasticsearchAuthenticationException)
102+
catch (ElasticsearchAuthenticationException)
103103
{
104104
throw;
105105
}
@@ -188,7 +188,7 @@ IList<Uri> ITransportDelegator.Sniff(ITransportRequestState ownerState = null)
188188
}
189189
if (response.HttpStatusCode.HasValue && response.HttpStatusCode == (int)HttpStatusCode.Unauthorized)
190190
throw new ElasticsearchAuthenticationException(response);
191-
if (response.Response == null)
191+
if (response.Response == null)
192192
return null;
193193

194194
using (response.Response)
@@ -247,6 +247,26 @@ void ITransportDelegator.SniffOnConnectionFailure(ITransportRequestState request
247247

248248
/* REQUEST STATE *** ********************************************/
249249

250+
/// <summary>
251+
/// Returns whether the current delegation over nodes took too long and we should quit.
252+
/// if <see cref="ConnectionSettings.SetMaxRetryTimeout"/> is set we'll use that timeout otherwise we default to th value of
253+
/// <see cref="ConnectionSettings.SetTimeout"/> which itself defaults to 60 seconds
254+
/// </summary>
255+
bool ITransportDelegator.TookTooLongToRetry(ITransportRequestState requestState)
256+
{
257+
var timeout = this.Settings.MaxRetryTimeout.GetValueOrDefault(TimeSpan.FromMilliseconds(this.Settings.Timeout));
258+
var startedOn = requestState.StartedOn;
259+
var now = this._dateTimeProvider.Now();
260+
261+
//we apply a soft margin so that if a request timesout at 59 seconds when the maximum is 60
262+
//we also abort.
263+
var margin = (timeout.TotalMilliseconds / 100.0) * 98;
264+
var marginTimeSpan = TimeSpan.FromMilliseconds(margin);
265+
var timespanCall = (now - startedOn);
266+
var tookToLong = timespanCall >= marginTimeSpan;
267+
return tookToLong;
268+
}
269+
250270
/// <summary>
251271
/// Returns either the fixed maximum set on the connection configuration settings or the number of nodes
252272
/// </summary>

src/Tests/Elasticsearch.Net.Tests.Unit/ConnectionPools/SniffingConnectionPoolTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void SniffOnConnectionFaultCausesSniffOn503()
184184
Assert.Throws<MaxRetryException>(()=>client1.Info()); //info call 5
185185

186186
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
187-
nowCall.MustHaveHappened(Repeated.Exactly.Times(8));
187+
nowCall.MustHaveHappened(Repeated.Exactly.Times(10));
188188

189189
}
190190
}

src/Tests/Elasticsearch.Net.Tests.Unit/Elasticsearch.Net.Tests.Unit.csproj

+2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
<Compile Include="Failover\Concurrent\ConcurrencyTestConnection.cs" />
6464
<Compile Include="Failover\Retries\ClientExceptionsUsingPoolingTests.cs" />
6565
<Compile Include="Failover\Retries\ClientExceptionsWithoutPoolingTests.cs" />
66+
<Compile Include="Failover\Timeout\DontRetryAfterMaxRetryTimeoutTests.cs" />
67+
<Compile Include="Failover\Timeout\DontRetryAfterDefaultTimeoutTests.cs" />
6668
<Compile Include="Memory\Helpers\AsyncMemorySetup.cs" />
6769
<Compile Include="Memory\Helpers\IMemorySetup.cs" />
6870
<Compile Include="Memory\ResponseAsyncCodePathsMemoryTests.cs" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Autofac;
6+
using Autofac.Extras.FakeItEasy;
7+
using Elasticsearch.Net.Connection;
8+
using Elasticsearch.Net.Connection.Configuration;
9+
using Elasticsearch.Net.ConnectionPool;
10+
using Elasticsearch.Net.Exceptions;
11+
using Elasticsearch.Net.Providers;
12+
using Elasticsearch.Net.Tests.Unit.Stubs;
13+
using FakeItEasy;
14+
using FluentAssertions;
15+
using NUnit.Framework;
16+
17+
namespace Elasticsearch.Net.Tests.Unit.Failover.Timeout
18+
{
19+
[TestFixture]
20+
public class DontRetryAfterDefaultTimeoutTests
21+
{
22+
[Test]
23+
public void FailEarlyIfTimeoutIsExhausted()
24+
{
25+
using (var fake = new AutoFake())
26+
{
27+
var dateTimeProvider = ProvideDateTimeProvider(fake);
28+
var config = ProvideConfiguration(dateTimeProvider);
29+
var connection = ProvideConnection(fake, config, dateTimeProvider);
30+
31+
var getCall = FakeCalls.GetSyncCall(fake);
32+
var ok = FakeResponse.Ok(config);
33+
var bad = FakeResponse.Bad(config);
34+
getCall.ReturnsNextFromSequence(
35+
bad, //info 1 - 9204
36+
bad, //info 2 - 9203 DEAD
37+
ok //info 2 retry - 9202
38+
);
39+
40+
var seenNodes = new List<Uri>();
41+
getCall.Invokes((Uri u, IRequestConfiguration o) => seenNodes.Add(u));
42+
43+
var pingCall = FakeCalls.PingAtConnectionLevel(fake);
44+
pingCall.Returns(ok);
45+
46+
var client1 = fake.Resolve<ElasticsearchClient>();
47+
48+
//event though the third node should have returned ok, the first 2 calls took a minute
49+
var e = Assert.Throws<MaxRetryException>(() => client1.Info());
50+
e.Message.Should()
51+
.StartWith("Retry timeout 00:01:00 was hit after retrying 1 times:");
52+
53+
IElasticsearchResponse response = null;
54+
Assert.DoesNotThrow(() => response = client1.Info() );
55+
response.Should().NotBeNull();
56+
response.Success.Should().BeTrue();
57+
58+
}
59+
}
60+
61+
[Test]
62+
public void FailEarlyIfTimeoutIsExhausted_Async()
63+
{
64+
using (var fake = new AutoFake())
65+
{
66+
var dateTimeProvider = ProvideDateTimeProvider(fake);
67+
var config = ProvideConfiguration(dateTimeProvider);
68+
var connection = ProvideConnection(fake, config, dateTimeProvider);
69+
70+
var getCall = FakeCalls.GetCall(fake);
71+
var ok = Task.FromResult(FakeResponse.Ok(config));
72+
var bad = Task.FromResult(FakeResponse.Bad(config));
73+
getCall.ReturnsNextFromSequence(
74+
bad,
75+
bad,
76+
ok
77+
);
78+
79+
var seenNodes = new List<Uri>();
80+
getCall.Invokes((Uri u, IRequestConfiguration o) => seenNodes.Add(u));
81+
82+
var pingCall = FakeCalls.PingAtConnectionLevelAsync(fake);
83+
pingCall.Returns(ok);
84+
85+
var client1 = fake.Resolve<ElasticsearchClient>();
86+
//event though the third node should have returned ok, the first 2 calls took a minute
87+
var e = Assert.Throws<MaxRetryException>(async () => await client1.InfoAsync());
88+
e.Message.Should()
89+
.StartWith("Retry timeout 00:01:00 was hit after retrying 1 times:");
90+
91+
IElasticsearchResponse response = null;
92+
Assert.DoesNotThrow(async () => response = await client1.InfoAsync() );
93+
response.Should().NotBeNull();
94+
response.Success.Should().BeTrue();
95+
}
96+
}
97+
98+
private static IConnection ProvideConnection(AutoFake fake, ConnectionConfiguration config, IDateTimeProvider dateTimeProvider)
99+
{
100+
fake.Provide<IConnectionConfigurationValues>(config);
101+
var param = new TypedParameter(typeof(IDateTimeProvider), dateTimeProvider);
102+
var transport = fake.Provide<ITransport, Transport>(param);
103+
var connection = fake.Resolve<IConnection>();
104+
return connection;
105+
}
106+
107+
private static ConnectionConfiguration ProvideConfiguration(IDateTimeProvider dateTimeProvider)
108+
{
109+
var connectionPool = new StaticConnectionPool(new[]
110+
{
111+
new Uri("http://localhost:9204"),
112+
new Uri("http://localhost:9203"),
113+
new Uri("http://localhost:9202"),
114+
new Uri("http://localhost:9201")
115+
}, randomizeOnStartup: false, dateTimeProvider: dateTimeProvider);
116+
var config = new ConnectionConfiguration(connectionPool).EnableMetrics();
117+
return config;
118+
}
119+
120+
private static IDateTimeProvider ProvideDateTimeProvider(AutoFake fake)
121+
{
122+
var now = DateTime.UtcNow;
123+
var dateTimeProvider = fake.Resolve<IDateTimeProvider>();
124+
var nowCall = A.CallTo(() => dateTimeProvider.Now());
125+
nowCall.ReturnsNextFromSequence(
126+
now, //initital sniff now from constructor
127+
now, //pool select next node
128+
now.AddSeconds(30), //info 1 took to long?
129+
now.AddSeconds(30), //pool select next node?
130+
now.AddMinutes(1) //info 2 took to long?
131+
);
132+
A.CallTo(() => dateTimeProvider.AliveTime(A<Uri>._, A<int>._)).Returns(new DateTime());
133+
//dead time will return a fixed timeout of 1 minute
134+
A.CallTo(() => dateTimeProvider.DeadTime(A<Uri>._, A<int>._, A<int?>._, A<int?>._))
135+
.Returns(DateTime.UtcNow.AddMinutes(1));
136+
//make sure the transport layer uses a different datetimeprovider
137+
fake.Provide<IDateTimeProvider>(new DateTimeProvider());
138+
return dateTimeProvider;
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)