Skip to content

Commit 0e6d6f4

Browse files
authored
Report failures on partial results (#124823)
* Report failures on partial results
1 parent 190bd93 commit 0e6d6f4

File tree

5 files changed

+123
-7
lines changed

5 files changed

+123
-7
lines changed

docs/changelog/124823.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124823
2+
summary: Report failures on partial results
3+
area: "ES|QL"
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.atomic.AtomicLong;
4141

4242
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
43+
import static org.hamcrest.Matchers.aMapWithSize;
4344
import static org.hamcrest.Matchers.equalTo;
4445
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4546

@@ -143,7 +144,27 @@ protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bo
143144
assertThat((int) inner.get("total"), equalTo(numClusters));
144145
assertTrue(inner.containsKey("details"));
145146
} else {
146-
assertNull(clusters);
147+
final Object partial = esqlResponseAsMap.get("is_partial");
148+
if (partial != null && (Boolean) partial) {
149+
// If we have partial response, we could have cluster metadata, it should contain details.
150+
// Details should not be empty, and it should contain clusters with failures.
151+
if (clusters != null) {
152+
@SuppressWarnings("unchecked")
153+
Map<String, Object> inner = (Map<String, Object>) clusters;
154+
assertThat(inner, aMapWithSize(1));
155+
assertTrue(inner.containsKey("details"));
156+
@SuppressWarnings("unchecked")
157+
Map<String, Object> details = (Map<String, Object>) inner.get("details");
158+
assertThat(details.size(), greaterThanOrEqualTo(1));
159+
details.forEach((k, v) -> {
160+
@SuppressWarnings("unchecked")
161+
Map<String, Object> cluster = (Map<String, Object>) v;
162+
assertTrue(cluster.containsKey("failures"));
163+
});
164+
}
165+
} else {
166+
assertNull(clusters);
167+
}
147168
}
148169
} catch (IOException e) {
149170
fail("Could not convert ESQLQueryResponse to Map: " + e);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,15 @@ public boolean isCrossClusterSearch() {
218218
|| clusterInfo.size() == 1 && clusterInfo.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false;
219219
}
220220

221+
/**
222+
* Is there any metadata to report in the response?
223+
* This is true on cross-cluster search with includeCCSMetadata=true or when there are partial failures.
224+
*/
225+
public boolean hasMetadataToReport() {
226+
return isCrossClusterSearch() && includeCCSMetadata
227+
|| (isPartial && clusterInfo.values().stream().anyMatch(c -> c.getFailures().isEmpty() == false));
228+
}
229+
221230
public Cluster getCluster(String clusterAlias) {
222231
return clusterInfo.get(clusterAlias);
223232
}
@@ -257,9 +266,13 @@ public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Clus
257266

258267
@Override
259268
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
260-
if (isCrossClusterSearch() == false || clusterInfo.isEmpty()) {
269+
if (clusterInfo.isEmpty()) {
261270
return Collections.emptyIterator();
262271
}
272+
if (includeCCSMetadata == false) {
273+
// If includeCCSMetadata is false, the only reason we're here is partial failures, so just report them.
274+
return onlyFailuresToXContent();
275+
}
263276
Map<Cluster.Status, Integer> clusterStatuses = new EnumMap<>(Cluster.Status.class);
264277
for (Cluster info : clusterInfo.values()) {
265278
clusterStatuses.merge(info.getStatus(), 1, Integer::sum);
@@ -280,6 +293,19 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
280293
);
281294
}
282295

296+
private Iterator<? extends ToXContent> onlyFailuresToXContent() {
297+
Iterator<Cluster> failuresIterator = clusterInfo.values().stream().filter(c -> (c.getFailures().isEmpty() == false)).iterator();
298+
if (failuresIterator.hasNext()) {
299+
return Iterators.concat(
300+
ChunkedToXContentHelper.startObject(),
301+
ChunkedToXContentHelper.object("details", failuresIterator),
302+
ChunkedToXContentHelper.endObject()
303+
);
304+
} else {
305+
return Collections.emptyIterator();
306+
}
307+
}
308+
283309
/**
284310
* @param status the status you want to access
285311
* @return a stream of clusters with that status

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,9 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
228228
Iterator<ToXContent> profileRender = profile != null
229229
? ChunkedToXContentHelper.field("profile", profile, params)
230230
: Collections.emptyIterator();
231-
Iterator<ToXContent> executionInfoRender = executionInfo != null
232-
&& executionInfo.isCrossClusterSearch()
233-
&& executionInfo.includeCCSMetadata()
234-
? ChunkedToXContentHelper.field("_clusters", executionInfo, params)
235-
: Collections.emptyIterator();
231+
Iterator<ToXContent> executionInfoRender = executionInfo != null && executionInfo.hasMetadataToReport()
232+
? ChunkedToXContentHelper.field("_clusters", executionInfo, params)
233+
: Collections.emptyIterator();
236234
return Iterators.concat(
237235
ChunkedToXContentHelper.startObject(),
238236
asyncPropertiesOrEmpty(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.search.ShardSearchFailure;
11+
import org.elasticsearch.test.ESTestCase;
12+
import org.elasticsearch.transport.RemoteClusterService;
13+
14+
import java.util.List;
15+
16+
public class EsqlExecutionInfoTests extends ESTestCase {
17+
18+
static final EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(
19+
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
20+
"test"
21+
);
22+
static final EsqlExecutionInfo.Cluster remoteCluster = new EsqlExecutionInfo.Cluster("remote", "test");
23+
24+
public void testHasMetadataInclude() {
25+
// includeCCSMetadata + non-local clusters will produce true
26+
EsqlExecutionInfo info = new EsqlExecutionInfo(true);
27+
assertFalse(info.hasMetadataToReport());
28+
info.swapCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> localCluster);
29+
assertFalse(info.hasMetadataToReport());
30+
info.swapCluster("remote", (k, v) -> remoteCluster);
31+
assertTrue(info.hasMetadataToReport());
32+
// Only remote is enough
33+
info = new EsqlExecutionInfo(true);
34+
info.swapCluster("remote", (k, v) -> remoteCluster);
35+
assertTrue(info.hasMetadataToReport());
36+
}
37+
38+
public void testHasMetadataIncludeFalse() {
39+
// If includeCCSMetadata is false, then it should always return false
40+
EsqlExecutionInfo info = new EsqlExecutionInfo(false);
41+
assertFalse(info.hasMetadataToReport());
42+
assertFalse(info.hasMetadataToReport());
43+
info.swapCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> localCluster);
44+
assertFalse(info.hasMetadataToReport());
45+
info.swapCluster("remote", (k, v) -> remoteCluster);
46+
assertFalse(info.hasMetadataToReport());
47+
}
48+
49+
public void testHasMetadataPartial() {
50+
EsqlExecutionInfo info = new EsqlExecutionInfo(false);
51+
String key = randomFrom(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, "remote");
52+
info.swapCluster(key, (k, v) -> new EsqlExecutionInfo.Cluster(k, "test", false, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
53+
assertFalse(info.isPartial());
54+
assertFalse(info.hasMetadataToReport());
55+
info.swapCluster(key, (k, v) -> new EsqlExecutionInfo.Cluster(k, "test", false, EsqlExecutionInfo.Cluster.Status.PARTIAL));
56+
assertTrue(info.isPartial());
57+
assertFalse(info.hasMetadataToReport());
58+
info.swapCluster(key, (k, v) -> {
59+
EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v);
60+
builder.setFailures(List.of(new ShardSearchFailure(new IllegalStateException("shard failure"))));
61+
return builder.build();
62+
});
63+
assertTrue(info.hasMetadataToReport());
64+
}
65+
66+
}

0 commit comments

Comments
 (0)