Skip to content

Commit f8c8ed2

Browse files
Add Esql.QueryAsObjectsAsync high level API (#8214) (#8223)
Co-authored-by: Florian Bernd <[email protected]>
1 parent dfc38d1 commit f8c8ed2

File tree

2 files changed

+102
-6
lines changed

2 files changed

+102
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
#if !ELASTICSEARCH_SERVERLESS
6+
7+
using System;
8+
using System.Collections.Generic;
9+
using System.IO;
10+
using System.Linq;
11+
using System.Text.Json;
12+
using System.Text.Json.Nodes;
13+
using System.Threading.Tasks;
14+
using System.Threading;
15+
16+
#if ELASTICSEARCH_SERVERLESS
17+
namespace Elastic.Clients.Elasticsearch.Esql.Serverless;
18+
#else
19+
20+
namespace Elastic.Clients.Elasticsearch.Esql;
21+
#endif
22+
23+
public partial class EsqlNamespacedClient
24+
{
25+
public virtual async Task<IEnumerable<TDocument>> QueryAsObjectsAsync<TDocument>(
26+
Action<EsqlQueryRequestDescriptor<TDocument>> configureRequest,
27+
CancellationToken cancellationToken = default)
28+
{
29+
if (configureRequest is null)
30+
throw new ArgumentNullException(nameof(configureRequest));
31+
32+
var response = await QueryAsync<TDocument>(Configure, cancellationToken).ConfigureAwait(false);
33+
34+
return EsqlToObject<TDocument>(Client, response);
35+
36+
void Configure(EsqlQueryRequestDescriptor<TDocument> descriptor)
37+
{
38+
configureRequest(descriptor);
39+
descriptor.Format("JSON");
40+
descriptor.Columnar(false);
41+
}
42+
}
43+
44+
private static IEnumerable<T> EsqlToObject<T>(ElasticsearchClient client, EsqlQueryResponse response)
45+
{
46+
// TODO: Improve performance
47+
48+
using var doc = JsonSerializer.Deserialize<JsonDocument>(response.Data) ?? throw new JsonException();
49+
50+
if (!doc.RootElement.TryGetProperty("columns"u8, out var columns) || (columns.ValueKind is not JsonValueKind.Array))
51+
throw new JsonException("");
52+
53+
if (!doc.RootElement.TryGetProperty("values"u8, out var values) || (values.ValueKind is not JsonValueKind.Array))
54+
yield break;
55+
56+
var names = columns.EnumerateArray()
57+
.Select(x =>
58+
{
59+
if (!x.TryGetProperty("name"u8, out var prop))
60+
{
61+
throw new JsonException();
62+
}
63+
64+
var result = prop.GetString() ?? throw new JsonException();
65+
66+
return result;
67+
})
68+
.ToArray();
69+
70+
var obj = new JsonObject();
71+
using var ms = new MemoryStream();
72+
using var writer = new Utf8JsonWriter(ms);
73+
74+
foreach (var document in values.EnumerateArray())
75+
{
76+
obj.Clear();
77+
ms.SetLength(0);
78+
writer.Reset();
79+
80+
var properties = names.Zip(document.EnumerateArray(),
81+
(key, value) => new KeyValuePair<string, JsonNode?>(key, JsonValue.Create(value)));
82+
foreach (var property in properties)
83+
obj.Add(property);
84+
85+
obj.WriteTo(writer);
86+
writer.Flush();
87+
ms.Position = 0;
88+
89+
var result = client.SourceSerializer.Deserialize<T>(ms) ?? throw new JsonException("");
90+
91+
yield return result;
92+
}
93+
}
94+
}
95+
96+
#endif

Diff for: src/Elastic.Clients.Elasticsearch.Shared/Client/NamespacedClientProxy.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ public abstract class NamespacedClientProxy
2424
private const string InvalidOperation = "The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " +
2525
"new instance of ElasticsearchClient to perform requests over a network to Elasticsearch.";
2626

27-
private readonly ElasticsearchClient _client;
27+
protected ElasticsearchClient Client { get; }
2828

2929
/// <summary>
3030
/// Initializes a new instance for mocking.
3131
/// </summary>
3232
protected NamespacedClientProxy() { }
3333

34-
internal NamespacedClientProxy(ElasticsearchClient client) => _client = client;
34+
internal NamespacedClientProxy(ElasticsearchClient client) => Client = client;
3535

3636
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
3737
where TRequest : Request<TRequestParameters>
@@ -46,10 +46,10 @@ internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
4646
where TResponse : ElasticsearchResponse, new()
4747
where TRequestParameters : RequestParameters, new()
4848
{
49-
if (_client is null)
49+
if (Client is null)
5050
ThrowHelper.ThrowInvalidOperationException(InvalidOperation);
5151

52-
return _client.DoRequest<TRequest, TResponse, TRequestParameters>(request, forceConfiguration);
52+
return Client.DoRequest<TRequest, TResponse, TRequestParameters>(request, forceConfiguration);
5353
}
5454

5555
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
@@ -68,9 +68,9 @@ internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>
6868
where TResponse : ElasticsearchResponse, new()
6969
where TRequestParameters : RequestParameters, new()
7070
{
71-
if (_client is null)
71+
if (Client is null)
7272
ThrowHelper.ThrowInvalidOperationException(InvalidOperation);
7373

74-
return _client.DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, forceConfiguration, cancellationToken);
74+
return Client.DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, forceConfiguration, cancellationToken);
7575
}
7676
}

0 commit comments

Comments
 (0)