From 391e75ec9547d013226280eaee465fb4edb87ba2 Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Tue, 13 Aug 2024 09:09:43 +0200 Subject: [PATCH] Fix race condition inproduct check (#8296) --- .../Client/ElasticsearchClient.cs | 197 ++++++----- .../Client/ElasticsearchClient.cs | 334 ------------------ 2 files changed, 112 insertions(+), 419 deletions(-) rename src/{Elastic.Clients.Elasticsearch.Serverless => Elastic.Clients.Elasticsearch.Shared}/Client/ElasticsearchClient.cs (72%) delete mode 100644 src/Elastic.Clients.Elasticsearch/Client/ElasticsearchClient.cs diff --git a/src/Elastic.Clients.Elasticsearch.Serverless/Client/ElasticsearchClient.cs b/src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.cs similarity index 72% rename from src/Elastic.Clients.Elasticsearch.Serverless/Client/ElasticsearchClient.cs rename to src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.cs index f3a70e5c252..26a611775d9 100644 --- a/src/Elastic.Clients.Elasticsearch.Serverless/Client/ElasticsearchClient.cs +++ b/src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.cs @@ -7,14 +7,23 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Text.Json; -using System.Threading; using System.Threading.Tasks; -using Elastic.Clients.Elasticsearch.Serverless.Requests; +using System.Threading; using Elastic.Transport; using Elastic.Transport.Diagnostics; -using Elastic.Transport.Products.Elasticsearch; +#if ELASTICSEARCH_SERVERLESS +using Elastic.Clients.Elasticsearch.Serverless.Requests; +#else +using Elastic.Clients.Elasticsearch.Requests; +#endif + +#if ELASTICSEARCH_SERVERLESS namespace Elastic.Clients.Elasticsearch.Serverless; +#else + +namespace Elastic.Clients.Elasticsearch; +#endif /// /// A strongly-typed client for communicating with Elasticsearch server endpoints. @@ -22,6 +31,7 @@ namespace Elastic.Clients.Elasticsearch.Serverless; public partial class ElasticsearchClient { private const string OpenTelemetrySpanAttributePrefix = "db.elasticsearch."; + // This should be updated if any of the code uses semantic conventions defined in newer schema versions. private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0"; @@ -82,13 +92,14 @@ internal ElasticsearchClient(ITransport transport) public Serializer SourceSerializer => _transport.Configuration.SourceSerializer; public ITransport Transport => _transport; - private ProductCheckStatus _productCheckStatus; + private int _productCheckStatus; private enum ProductCheckStatus { - NotChecked, - Succeeded, - Failed + NotChecked = 0, + InProgress = 1, + Succeeded = 2, + Failed = 3 } private partial void SetupNamespaces(); @@ -133,48 +144,115 @@ private ValueTask DoRequestCoreAsync(request); - var (resolvedUrl, urlTemplate, resolvedRouteValues, postData) = PrepareRequest(request, forceConfiguration); - var openTelemetryData = PrepareOpenTelemetryData(request, resolvedRouteValues); + var productCheckStatus = Interlocked.CompareExchange( + ref _productCheckStatus, + (int)ProductCheckStatus.InProgress, + (int)ProductCheckStatus.NotChecked + ); - if (_productCheckStatus == ProductCheckStatus.Succeeded && !requestModified) + return productCheckStatus switch { - if (isAsync) - return new ValueTask(_transport.RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken)); - else - return new ValueTask(_transport.Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData)); + (int)ProductCheckStatus.NotChecked => SendRequestWithProductCheck(), + (int)ProductCheckStatus.InProgress or + (int)ProductCheckStatus.Succeeded => SendRequest(), + (int)ProductCheckStatus.Failed => throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError), + _ => throw new InvalidOperationException("unreachable") + }; + + ValueTask SendRequest() + { + var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest(request, forceConfiguration); + var openTelemetryData = PrepareOpenTelemetryData(request, resolvedRouteValues); + + return isAsync + ? new ValueTask(_transport + .RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken)) + : new ValueTask(_transport + .Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData)); } - return SendRequest(isAsync); + async ValueTask SendRequestWithProductCheck() + { + try + { + return await SendRequestWithProductCheckCore().ConfigureAwait(false); + } + catch + { + // Re-try product check on next request. + + // 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that + // no other thread executes the product check at the same time. Locked access is not required here. + if (_productCheckStatus is (int)ProductCheckStatus.InProgress) + _productCheckStatus = (int)ProductCheckStatus.NotChecked; + + throw; + } + } - async ValueTask SendRequest(bool isAsync) + async ValueTask SendRequestWithProductCheckCore() { + // Attach product check header + + var hadRequestConfig = false; + HeadersList? originalHeaders = null; + + if (request.RequestParameters.RequestConfiguration is null) + request.RequestParameters.RequestConfiguration = new RequestConfiguration(); + else + { + originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse; + hadRequestConfig = true; + } + + request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0 + ? new HeadersList("x-elastic-product") + : new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product"); + + // Send request + + var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest(request, forceConfiguration); + var openTelemetryData = PrepareOpenTelemetryData(request, resolvedRouteValues); + TResponse response; if (isAsync) - response = await _transport.RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken).ConfigureAwait(false); + { + response = await _transport + .RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken) + .ConfigureAwait(false); + } else - response = _transport.Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData); + { + response = _transport + .Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData); + } + + // Evaluate product check result + + var productCheckSucceeded = response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) && + values.FirstOrDefault(x => x.Equals("Elasticsearch", StringComparison.Ordinal)) is not null; - PostRequestProductCheck(request, response); + _productCheckStatus = productCheckSucceeded + ? (int)ProductCheckStatus.Succeeded + : (int)ProductCheckStatus.Failed; - if (_productCheckStatus == ProductCheckStatus.Failed) + if (_productCheckStatus == (int)ProductCheckStatus.Failed) throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError); - if (request.RequestParameters.RequestConfiguration is not null) - { - if (!hadRequestConfig) - { - request.RequestParameters.RequestConfiguration = null; - } - else if (originalHeaders.HasValue && originalHeaders.Value.Count > 0) - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value; - } - } + if (request.RequestParameters.RequestConfiguration is null) + return response; + + // Reset request configuration + + if (!hadRequestConfig) + request.RequestParameters.RequestConfiguration = null; + else if (originalHeaders is { Count: > 0 }) + request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value; return response; } @@ -215,42 +293,6 @@ private static OpenTelemetryData PrepareOpenTelemetryData(TRequest request) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - var requestModified = false; - var hadRequestConfig = false; - HeadersList? originalHeaders = null; - - // If we have not yet checked the product name, add the product header to the list of headers to parse. - if (_productCheckStatus == ProductCheckStatus.NotChecked) - { - requestModified = true; - - if (request.RequestParameters.RequestConfiguration is null) - { - request.RequestParameters.RequestConfiguration = new RequestConfiguration(); - } - else - { - originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse; - hadRequestConfig = true; - } - - if (request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0) - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList("x-elastic-product"); - } - else - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product"); - } - } - - return (requestModified, hadRequestConfig, originalHeaders); - } - private (string resolvedUrl, string urlTemplate, Dictionary? resolvedRouteValues, PostData data) PrepareRequest(TRequest request, Action? forceConfiguration) where TRequest : Request @@ -278,21 +320,6 @@ private static OpenTelemetryData PrepareOpenTelemetryData(TRequest request, TResponse response) - where TRequest : Request - where TResponse : TransportResponse, new() - { - if (response.ApiCallDetails.HttpStatusCode.HasValue && response.ApiCallDetails.HttpStatusCode.Value >= 200 && response.ApiCallDetails.HttpStatusCode.Value <= 299 && _productCheckStatus == ProductCheckStatus.NotChecked) - { - if (!response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) || !values.Single().Equals("Elasticsearch", StringComparison.Ordinal)) - { - _productCheckStatus = ProductCheckStatus.Failed; - } - - _productCheckStatus = ProductCheckStatus.Succeeded; - } - } - private static void ForceConfiguration(Request request, Action forceConfiguration) where TRequestParameters : RequestParameters, new() { diff --git a/src/Elastic.Clients.Elasticsearch/Client/ElasticsearchClient.cs b/src/Elastic.Clients.Elasticsearch/Client/ElasticsearchClient.cs deleted file mode 100644 index c9134d26b9c..00000000000 --- a/src/Elastic.Clients.Elasticsearch/Client/ElasticsearchClient.cs +++ /dev/null @@ -1,334 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Clients.Elasticsearch.Requests; -using Elastic.Transport; -using Elastic.Transport.Diagnostics; -using Elastic.Transport.Products.Elasticsearch; - -namespace Elastic.Clients.Elasticsearch; - -/// -/// A strongly-typed client for communicating with Elasticsearch server endpoints. -/// -public partial class ElasticsearchClient -{ - private const string OpenTelemetrySpanAttributePrefix = "db.elasticsearch."; - // This should be updated if any of the code uses semantic conventions defined in newer schema versions. - private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0"; - - private readonly ITransport _transport; - internal static ConditionalWeakTable SettingsTable { get; } = new(); - - /// - /// Creates a client configured to connect to http://localhost:9200. - /// - public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { } - - /// - /// Creates a client configured to connect to a node reachable at the provided . - /// - /// The to connect to. - public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { } - - /// - /// Creates a client configured to communicate with Elastic Cloud using the provided . - /// See the documentation for more information on how to obtain your Cloud Id. - /// - /// If you want more control, use the constructor and - /// pass an instance of that takes a in its constructor as well. - /// - /// - /// The Cloud ID of an Elastic Cloud deployment. - /// The credentials to use for the connection. - public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this( - new ElasticsearchClientSettings(cloudId, credentials)) - { - } - - /// - /// Creates a client using the provided configuration to initialise the client. - /// - /// The used to configure the client. - public ElasticsearchClient(IElasticsearchClientSettings elasticsearchClientSettings) - : this(new DistributedTransport(elasticsearchClientSettings)) - { - } - - internal ElasticsearchClient(ITransport transport) - { - transport.ThrowIfNull(nameof(transport)); - transport.Configuration.ThrowIfNull(nameof(transport.Configuration)); - transport.Configuration.RequestResponseSerializer.ThrowIfNull( - nameof(transport.Configuration.RequestResponseSerializer)); - transport.Configuration.Inferrer.ThrowIfNull(nameof(transport.Configuration.Inferrer)); - - _transport = transport; - - SetupNamespaces(); - } - - public IElasticsearchClientSettings ElasticsearchClientSettings => _transport.Configuration; - public Inferrer Infer => _transport.Configuration.Inferrer; - public Serializer RequestResponseSerializer => _transport.Configuration.RequestResponseSerializer; - public Serializer SourceSerializer => _transport.Configuration.SourceSerializer; - public ITransport Transport => _transport; - - private ProductCheckStatus _productCheckStatus; - - private enum ProductCheckStatus - { - NotChecked, - Succeeded, - Failed - } - - private partial void SetupNamespaces(); - - internal TResponse DoRequest(TRequest request) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() => - DoRequest(request, null); - - internal TResponse DoRequest( - TRequest request, - Action? forceConfiguration) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequestCoreAsync(false, request, forceConfiguration).EnsureCompleted(); - - internal Task DoRequestAsync( - TRequest request, - CancellationToken cancellationToken = default) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequestAsync(request, null, cancellationToken); - - internal Task DoRequestAsync( - TRequest request, - Action? forceConfiguration, - CancellationToken cancellationToken = default) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequestCoreAsync(true, request, forceConfiguration, cancellationToken).AsTask(); - - private ValueTask DoRequestCoreAsync( - bool isAsync, - TRequest request, - Action? forceConfiguration, - CancellationToken cancellationToken = default) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - { - if (_productCheckStatus == ProductCheckStatus.Failed) - throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError); - - var (requestModified, hadRequestConfig, originalHeaders) = AttachProductCheckHeaderIfRequired(request); - var (resolvedUrl, urlTemplate, resolvedRouteValues, postData) = PrepareRequest(request, forceConfiguration); - var openTelemetryData = PrepareOpenTelemetryData(request, resolvedRouteValues); - - if (_productCheckStatus == ProductCheckStatus.Succeeded && !requestModified) - { - if (isAsync) - return new ValueTask(_transport.RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken)); - else - return new ValueTask(_transport.Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData)); - } - - return SendRequest(isAsync); - - async ValueTask SendRequest(bool isAsync) - { - TResponse response; - - if (isAsync) - response = await _transport.RequestAsync(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken).ConfigureAwait(false); - else - response = _transport.Request(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData); - - PostRequestProductCheck(request, response); - - if (_productCheckStatus == ProductCheckStatus.Failed) - throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError); - - if (request.RequestParameters.RequestConfiguration is not null) - { - if (!hadRequestConfig) - { - request.RequestParameters.RequestConfiguration = null; - } - else if (originalHeaders.HasValue && originalHeaders.Value.Count > 0) - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value; - } - } - - return response; - } - } - - private static OpenTelemetryData PrepareOpenTelemetryData(TRequest request, Dictionary resolvedRouteValues) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - // If there are no subscribed listeners, we avoid some work and allocations - if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners) - return default; - - // We fall back to a general operation name in cases where the derived request fails to override the property - var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue(); - - // TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or - // caching per combination of route values. - // We should benchmark this first to assess the impact for common workloads. - // The former is likely going to save some short-lived allocations, but only for requests to endpoints without required path parts. - // The latter may bloat the cache as some combinations of path parts may rarely re-occur. - var attributes = new Dictionary - { - [OpenTelemetry.SemanticConventions.DbOperation] = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown", - [$"{OpenTelemetrySpanAttributePrefix}schema_url"] = OpenTelemetrySchemaVersion - }; - - if (resolvedRouteValues is not null) - { - foreach (var value in resolvedRouteValues) - { - if (!string.IsNullOrEmpty(value.Key) && !string.IsNullOrEmpty(value.Value)) - attributes.Add($"{OpenTelemetrySpanAttributePrefix}path_parts.{value.Key}", value.Value); - } - } - - var openTelemetryData = new OpenTelemetryData { SpanName = operationName, SpanAttributes = attributes }; - return openTelemetryData; - } - - private (bool requestModified, bool hadRequestConfig, HeadersList? originalHeaders) AttachProductCheckHeaderIfRequired(TRequest request) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - var requestModified = false; - var hadRequestConfig = false; - HeadersList? originalHeaders = null; - - // If we have not yet checked the product name, add the product header to the list of headers to parse. - if (_productCheckStatus == ProductCheckStatus.NotChecked) - { - requestModified = true; - - if (request.RequestParameters.RequestConfiguration is null) - { - request.RequestParameters.RequestConfiguration = new RequestConfiguration(); - } - else - { - originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse; - hadRequestConfig = true; - } - - if (request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0) - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList("x-elastic-product"); - } - else - { - request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product"); - } - } - - return (requestModified, hadRequestConfig, originalHeaders); - } - - private (string resolvedUrl, string urlTemplate, Dictionary? resolvedRouteValues, PostData data) PrepareRequest(TRequest request, - Action? forceConfiguration) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - request.ThrowIfNull(nameof(request), "A request is required."); - - if (forceConfiguration is not null) - ForceConfiguration(request, forceConfiguration); - - if (request.ContentType is not null) - ForceContentType(request, request.ContentType); - - if (request.Accept is not null) - ForceAccept(request, request.Accept); - - var (resolvedUrl, urlTemplate, routeValues) = request.GetUrl(ElasticsearchClientSettings); - - var postData = - request.HttpMethod == HttpMethod.GET || - request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody - ? null - : PostData.Serializable(request); - - return (resolvedUrl, urlTemplate, routeValues, postData); - } - - private void PostRequestProductCheck(TRequest request, TResponse response) - where TRequest : Request - where TResponse : TransportResponse, new() - { - if (response.ApiCallDetails.HttpStatusCode.HasValue && response.ApiCallDetails.HttpStatusCode.Value >= 200 && response.ApiCallDetails.HttpStatusCode.Value <= 299 && _productCheckStatus == ProductCheckStatus.NotChecked) - { - if (!response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) || !values.Single().Equals("Elasticsearch", StringComparison.Ordinal)) - { - _productCheckStatus = ProductCheckStatus.Failed; - } - - _productCheckStatus = ProductCheckStatus.Succeeded; - } - } - - private static void ForceConfiguration(Request request, Action forceConfiguration) - where TRequestParameters : RequestParameters, new() - { - var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration(); - forceConfiguration(configuration); - request.RequestParameters.RequestConfiguration = configuration; - } - - private static void ForceContentType(TRequest request, string contentType) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration(); - configuration.Accept = contentType; - configuration.ContentType = contentType; - request.RequestParameters.RequestConfiguration = configuration; - } - - private static void ForceAccept(TRequest request, string acceptType) - where TRequest : Request - where TRequestParameters : RequestParameters, new() - { - var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration(); - configuration.Accept = acceptType; - request.RequestParameters.RequestConfiguration = configuration; - } - - internal static void ForceJson(IRequestConfiguration requestConfiguration) - { - requestConfiguration.Accept = RequestData.DefaultMimeType; - requestConfiguration.ContentType = RequestData.DefaultMimeType; - } - - internal static void ForceTextPlain(IRequestConfiguration requestConfiguration) - { - requestConfiguration.Accept = RequestData.MimeTypeTextPlain; - requestConfiguration.ContentType = RequestData.MimeTypeTextPlain; - } -}