Skip to content

Commit 0433fde

Browse files
authored
Fix race condition inproduct check (#8296)
1 parent 0c3a7e7 commit 0433fde

File tree

2 files changed

+112
-419
lines changed

2 files changed

+112
-419
lines changed

Diff for: src/Elastic.Clients.Elasticsearch.Serverless/Client/ElasticsearchClient.cs renamed to src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.cs

+112-85
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,31 @@
77
using System.Linq;
88
using System.Runtime.CompilerServices;
99
using System.Text.Json;
10-
using System.Threading;
1110
using System.Threading.Tasks;
12-
using Elastic.Clients.Elasticsearch.Serverless.Requests;
11+
using System.Threading;
1312
using Elastic.Transport;
1413
using Elastic.Transport.Diagnostics;
15-
using Elastic.Transport.Products.Elasticsearch;
1614

15+
#if ELASTICSEARCH_SERVERLESS
16+
using Elastic.Clients.Elasticsearch.Serverless.Requests;
17+
#else
18+
using Elastic.Clients.Elasticsearch.Requests;
19+
#endif
20+
21+
#if ELASTICSEARCH_SERVERLESS
1722
namespace Elastic.Clients.Elasticsearch.Serverless;
23+
#else
24+
25+
namespace Elastic.Clients.Elasticsearch;
26+
#endif
1827

1928
/// <summary>
2029
/// A strongly-typed client for communicating with Elasticsearch server endpoints.
2130
/// </summary>
2231
public partial class ElasticsearchClient
2332
{
2433
private const string OpenTelemetrySpanAttributePrefix = "db.elasticsearch.";
34+
2535
// This should be updated if any of the code uses semantic conventions defined in newer schema versions.
2636
private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0";
2737

@@ -82,13 +92,14 @@ internal ElasticsearchClient(ITransport<IElasticsearchClientSettings> transport)
8292
public Serializer SourceSerializer => _transport.Configuration.SourceSerializer;
8393
public ITransport<IElasticsearchClientSettings> Transport => _transport;
8494

85-
private ProductCheckStatus _productCheckStatus;
95+
private int _productCheckStatus;
8696

8797
private enum ProductCheckStatus
8898
{
89-
NotChecked,
90-
Succeeded,
91-
Failed
99+
NotChecked = 0,
100+
InProgress = 1,
101+
Succeeded = 2,
102+
Failed = 3
92103
}
93104

94105
private partial void SetupNamespaces();
@@ -133,48 +144,115 @@ private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestPar
133144
where TResponse : TransportResponse, new()
134145
where TRequestParameters : RequestParameters, new()
135146
{
136-
if (_productCheckStatus == ProductCheckStatus.Failed)
137-
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
147+
// The product check modifies request parameters and therefore must not be executed concurrently.
148+
// We use a lockless CAS approach to make sure that only a single product check request is executed at a time.
149+
// We do not guarantee that the product check is always performed on the first request.
138150

139-
var (requestModified, hadRequestConfig, originalHeaders) = AttachProductCheckHeaderIfRequired<TRequest, TRequestParameters>(request);
140-
var (resolvedUrl, urlTemplate, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
141-
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);
151+
var productCheckStatus = Interlocked.CompareExchange(
152+
ref _productCheckStatus,
153+
(int)ProductCheckStatus.InProgress,
154+
(int)ProductCheckStatus.NotChecked
155+
);
142156

143-
if (_productCheckStatus == ProductCheckStatus.Succeeded && !requestModified)
157+
return productCheckStatus switch
144158
{
145-
if (isAsync)
146-
return new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken));
147-
else
148-
return new ValueTask<TResponse>(_transport.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData));
159+
(int)ProductCheckStatus.NotChecked => SendRequestWithProductCheck(),
160+
(int)ProductCheckStatus.InProgress or
161+
(int)ProductCheckStatus.Succeeded => SendRequest(),
162+
(int)ProductCheckStatus.Failed => throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError),
163+
_ => throw new InvalidOperationException("unreachable")
164+
};
165+
166+
ValueTask<TResponse> SendRequest()
167+
{
168+
var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
169+
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);
170+
171+
return isAsync
172+
? new ValueTask<TResponse>(_transport
173+
.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken))
174+
: new ValueTask<TResponse>(_transport
175+
.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData));
149176
}
150177

151-
return SendRequest(isAsync);
178+
async ValueTask<TResponse> SendRequestWithProductCheck()
179+
{
180+
try
181+
{
182+
return await SendRequestWithProductCheckCore().ConfigureAwait(false);
183+
}
184+
catch
185+
{
186+
// Re-try product check on next request.
187+
188+
// 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that
189+
// no other thread executes the product check at the same time. Locked access is not required here.
190+
if (_productCheckStatus is (int)ProductCheckStatus.InProgress)
191+
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
192+
193+
throw;
194+
}
195+
}
152196

153-
async ValueTask<TResponse> SendRequest(bool isAsync)
197+
async ValueTask<TResponse> SendRequestWithProductCheckCore()
154198
{
199+
// Attach product check header
200+
201+
var hadRequestConfig = false;
202+
HeadersList? originalHeaders = null;
203+
204+
if (request.RequestParameters.RequestConfiguration is null)
205+
request.RequestParameters.RequestConfiguration = new RequestConfiguration();
206+
else
207+
{
208+
originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse;
209+
hadRequestConfig = true;
210+
}
211+
212+
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0
213+
? new HeadersList("x-elastic-product")
214+
: new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product");
215+
216+
// Send request
217+
218+
var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
219+
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);
220+
155221
TResponse response;
156222

157223
if (isAsync)
158-
response = await _transport.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken).ConfigureAwait(false);
224+
{
225+
response = await _transport
226+
.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken)
227+
.ConfigureAwait(false);
228+
}
159229
else
160-
response = _transport.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData);
230+
{
231+
response = _transport
232+
.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData);
233+
}
234+
235+
// Evaluate product check result
236+
237+
var productCheckSucceeded = response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) &&
238+
values.FirstOrDefault(x => x.Equals("Elasticsearch", StringComparison.Ordinal)) is not null;
161239

162-
PostRequestProductCheck<TRequest, TResponse>(request, response);
240+
_productCheckStatus = productCheckSucceeded
241+
? (int)ProductCheckStatus.Succeeded
242+
: (int)ProductCheckStatus.Failed;
163243

164-
if (_productCheckStatus == ProductCheckStatus.Failed)
244+
if (_productCheckStatus == (int)ProductCheckStatus.Failed)
165245
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
166246

167-
if (request.RequestParameters.RequestConfiguration is not null)
168-
{
169-
if (!hadRequestConfig)
170-
{
171-
request.RequestParameters.RequestConfiguration = null;
172-
}
173-
else if (originalHeaders.HasValue && originalHeaders.Value.Count > 0)
174-
{
175-
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value;
176-
}
177-
}
247+
if (request.RequestParameters.RequestConfiguration is null)
248+
return response;
249+
250+
// Reset request configuration
251+
252+
if (!hadRequestConfig)
253+
request.RequestParameters.RequestConfiguration = null;
254+
else if (originalHeaders is { Count: > 0 })
255+
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value;
178256

179257
return response;
180258
}
@@ -215,42 +293,6 @@ private static OpenTelemetryData PrepareOpenTelemetryData<TRequest, TRequestPara
215293
return openTelemetryData;
216294
}
217295

218-
private (bool requestModified, bool hadRequestConfig, HeadersList? originalHeaders) AttachProductCheckHeaderIfRequired<TRequest, TRequestParameters>(TRequest request)
219-
where TRequest : Request<TRequestParameters>
220-
where TRequestParameters : RequestParameters, new()
221-
{
222-
var requestModified = false;
223-
var hadRequestConfig = false;
224-
HeadersList? originalHeaders = null;
225-
226-
// If we have not yet checked the product name, add the product header to the list of headers to parse.
227-
if (_productCheckStatus == ProductCheckStatus.NotChecked)
228-
{
229-
requestModified = true;
230-
231-
if (request.RequestParameters.RequestConfiguration is null)
232-
{
233-
request.RequestParameters.RequestConfiguration = new RequestConfiguration();
234-
}
235-
else
236-
{
237-
originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse;
238-
hadRequestConfig = true;
239-
}
240-
241-
if (request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0)
242-
{
243-
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList("x-elastic-product");
244-
}
245-
else
246-
{
247-
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product");
248-
}
249-
}
250-
251-
return (requestModified, hadRequestConfig, originalHeaders);
252-
}
253-
254296
private (string resolvedUrl, string urlTemplate, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request,
255297
Action<IRequestConfiguration>? forceConfiguration)
256298
where TRequest : Request<TRequestParameters>
@@ -278,21 +320,6 @@ private static OpenTelemetryData PrepareOpenTelemetryData<TRequest, TRequestPara
278320
return (resolvedUrl, urlTemplate, routeValues, postData);
279321
}
280322

281-
private void PostRequestProductCheck<TRequest, TResponse>(TRequest request, TResponse response)
282-
where TRequest : Request
283-
where TResponse : TransportResponse, new()
284-
{
285-
if (response.ApiCallDetails.HttpStatusCode.HasValue && response.ApiCallDetails.HttpStatusCode.Value >= 200 && response.ApiCallDetails.HttpStatusCode.Value <= 299 && _productCheckStatus == ProductCheckStatus.NotChecked)
286-
{
287-
if (!response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) || !values.Single().Equals("Elasticsearch", StringComparison.Ordinal))
288-
{
289-
_productCheckStatus = ProductCheckStatus.Failed;
290-
}
291-
292-
_productCheckStatus = ProductCheckStatus.Succeeded;
293-
}
294-
}
295-
296323
private static void ForceConfiguration<TRequestParameters>(Request<TRequestParameters> request, Action<IRequestConfiguration> forceConfiguration)
297324
where TRequestParameters : RequestParameters, new()
298325
{

0 commit comments

Comments
 (0)