Skip to content

Commit bec4af5

Browse files
committed
Cross-cluster search: preserve cluster alias in shard failures (#32608)
When some remote clusters return shard failures as part of a cross-cluster search request, the cluster alias currently gets lost. As a result, if the shard failures are all caused by the same error, and against indices belonging to different clusters, but with the same index name, only one failure gets returned as part of the search response, meaning that failures are grouped by index name, ignoring the cluster alias. With this commit we make sure that `ShardSearchFailure` returns the cluster alias as part of the index name. Also, we set the fully qualfied index name when creating a `QueryShardException`. That way shard failures are grouped by cluster:index. Such fixes should cover at least most of the cases where either 1) the shard target is set but we don't have the index in the cause (we were previously reading it only from the cause that did not have the cluster alias) 2) the shard target is missing but if the cause is a `QueryShardException` the cluster alias does not get lost. We also prevent NPE in case the failure cause is not set and test such scenario.
1 parent 6a24d66 commit bec4af5

File tree

7 files changed

+335
-34
lines changed

7 files changed

+335
-34
lines changed

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

+32-25
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.LinkedList;
4141
import java.util.List;
42+
import java.util.Objects;
4243
import java.util.Optional;
4344
import java.util.Queue;
4445
import java.util.Set;
@@ -278,7 +279,7 @@ public static ShardOperationFailedException[] groupBy(ShardOperationFailedExcept
278279
List<ShardOperationFailedException> uniqueFailures = new ArrayList<>();
279280
Set<GroupBy> reasons = new HashSet<>();
280281
for (ShardOperationFailedException failure : failures) {
281-
GroupBy reason = new GroupBy(failure.getCause());
282+
GroupBy reason = new GroupBy(failure);
282283
if (reasons.contains(reason) == false) {
283284
reasons.add(reason);
284285
uniqueFailures.add(failure);
@@ -287,46 +288,52 @@ public static ShardOperationFailedException[] groupBy(ShardOperationFailedExcept
287288
return uniqueFailures.toArray(new ShardOperationFailedException[0]);
288289
}
289290

290-
static class GroupBy {
291+
private static class GroupBy {
291292
final String reason;
292293
final String index;
293294
final Class<? extends Throwable> causeType;
294295

295-
GroupBy(Throwable t) {
296-
if (t instanceof ElasticsearchException) {
297-
final Index index = ((ElasticsearchException) t).getIndex();
298-
if (index != null) {
299-
this.index = index.getName();
300-
} else {
301-
this.index = null;
296+
GroupBy(ShardOperationFailedException failure) {
297+
Throwable cause = failure.getCause();
298+
//the index name from the failure contains the cluster alias when using CCS. Ideally failures should be grouped by
299+
//index name and cluster alias. That's why the failure index name has the precedence over the one coming from the cause,
300+
//which does not include the cluster alias.
301+
String indexName = failure.index();
302+
if (indexName == null) {
303+
if (cause instanceof ElasticsearchException) {
304+
final Index index = ((ElasticsearchException) cause).getIndex();
305+
if (index != null) {
306+
indexName = index.getName();
307+
}
302308
}
309+
}
310+
this.index = indexName;
311+
if (cause == null) {
312+
this.reason = failure.reason();
313+
this.causeType = null;
303314
} else {
304-
index = null;
315+
this.reason = cause.getMessage();
316+
this.causeType = cause.getClass();
305317
}
306-
reason = t.getMessage();
307-
causeType = t.getClass();
308318
}
309319

310320
@Override
311321
public boolean equals(Object o) {
312-
if (this == o) return true;
313-
if (o == null || getClass() != o.getClass()) return false;
314-
322+
if (this == o) {
323+
return true;
324+
}
325+
if (o == null || getClass() != o.getClass()) {
326+
return false;
327+
}
315328
GroupBy groupBy = (GroupBy) o;
316-
317-
if (!causeType.equals(groupBy.causeType)) return false;
318-
if (index != null ? !index.equals(groupBy.index) : groupBy.index != null) return false;
319-
if (reason != null ? !reason.equals(groupBy.reason) : groupBy.reason != null) return false;
320-
321-
return true;
329+
return Objects.equals(reason, groupBy.reason) &&
330+
Objects.equals(index, groupBy.index) &&
331+
Objects.equals(causeType, groupBy.causeType);
322332
}
323333

324334
@Override
325335
public int hashCode() {
326-
int result = reason != null ? reason.hashCode() : 0;
327-
result = 31 * result + (index != null ? index.hashCode() : 0);
328-
result = 31 * result + causeType.hashCode();
329-
return result;
336+
return Objects.hash(reason, index, causeType);
330337
}
331338
}
332339
}

server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.rest.RestStatus;
3535
import org.elasticsearch.search.SearchException;
3636
import org.elasticsearch.search.SearchShardTarget;
37+
import org.elasticsearch.transport.RemoteClusterAware;
3738

3839
import java.io.IOException;
3940

@@ -66,7 +67,7 @@ public ShardSearchFailure(Exception e) {
6667

6768
public ShardSearchFailure(Exception e, @Nullable SearchShardTarget shardTarget) {
6869
final Throwable actual = ExceptionsHelper.unwrapCause(e);
69-
if (actual != null && actual instanceof SearchException) {
70+
if (actual instanceof SearchException) {
7071
this.shardTarget = ((SearchException) actual).shard();
7172
} else if (shardTarget != null) {
7273
this.shardTarget = shardTarget;
@@ -105,7 +106,7 @@ public RestStatus status() {
105106
@Override
106107
public String index() {
107108
if (shardTarget != null) {
108-
return shardTarget.getIndex();
109+
return shardTarget.getFullyQualifiedIndexName();
109110
}
110111
return null;
111112
}
@@ -186,6 +187,7 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
186187
String currentFieldName = null;
187188
int shardId = -1;
188189
String indexName = null;
190+
String clusterAlias = null;
189191
String nodeId = null;
190192
ElasticsearchException exception = null;
191193
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -196,6 +198,11 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
196198
shardId = parser.intValue();
197199
} else if (INDEX_FIELD.equals(currentFieldName)) {
198200
indexName = parser.text();
201+
int indexOf = indexName.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
202+
if (indexOf > 0) {
203+
clusterAlias = indexName.substring(0, indexOf);
204+
indexName = indexName.substring(indexOf + 1);
205+
}
199206
} else if (NODE_FIELD.equals(currentFieldName)) {
200207
nodeId = parser.text();
201208
} else {
@@ -214,7 +221,7 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
214221
SearchShardTarget searchShardTarget = null;
215222
if (nodeId != null) {
216223
searchShardTarget = new SearchShardTarget(nodeId,
217-
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), null, OriginalIndices.NONE);
224+
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
218225
}
219226
return new ShardSearchFailure(exception, searchShardTarget);
220227
}

server/src/main/java/org/elasticsearch/index/query/QueryShardException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public QueryShardException(QueryShardContext context, String msg, Object... args
3838

3939
public QueryShardException(QueryShardContext context, String msg, Throwable cause, Object... args) {
4040
super(msg, cause, args);
41-
setIndex(context.index());
41+
setIndex(context.getFullyQualifiedIndexName());
4242
}
4343

4444
/**

server/src/main/java/org/elasticsearch/search/SearchShardTarget.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.search;
2121

22-
import java.io.IOException;
23-
2422
import org.elasticsearch.Version;
2523
import org.elasticsearch.action.OriginalIndices;
2624
import org.elasticsearch.common.Nullable;
@@ -32,6 +30,8 @@
3230
import org.elasticsearch.index.shard.ShardId;
3331
import org.elasticsearch.transport.RemoteClusterAware;
3432

33+
import java.io.IOException;
34+
3535
/**
3636
* The target that the search request was executed on.
3737
*/
@@ -96,6 +96,13 @@ public String getClusterAlias() {
9696
return clusterAlias;
9797
}
9898

99+
/**
100+
* Returns the fully qualified index name, including the cluster alias.
101+
*/
102+
public String getFullyQualifiedIndexName() {
103+
return RemoteClusterAware.buildRemoteIndexName(getClusterAlias(), getIndex());
104+
}
105+
99106
@Override
100107
public int compareTo(SearchShardTarget o) {
101108
int i = shardId.getIndexName().compareTo(o.getIndex());

server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java

+115
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,27 @@
2020
package org.elasticsearch;
2121

2222
import org.apache.commons.codec.DecoderException;
23+
import org.elasticsearch.action.OriginalIndices;
24+
import org.elasticsearch.action.ShardOperationFailedException;
25+
import org.elasticsearch.action.search.ShardSearchFailure;
26+
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.common.ParsingException;
2328
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
29+
import org.elasticsearch.index.Index;
30+
import org.elasticsearch.index.query.QueryShardException;
31+
import org.elasticsearch.index.shard.ShardId;
2432
import org.elasticsearch.rest.RestStatus;
33+
import org.elasticsearch.search.SearchShardTarget;
2534
import org.elasticsearch.test.ESTestCase;
35+
import org.elasticsearch.transport.RemoteClusterAware;
2636

2737
import java.util.Optional;
2838

2939
import static org.elasticsearch.ExceptionsHelper.MAX_ITERATIONS;
3040
import static org.elasticsearch.ExceptionsHelper.maybeError;
3141
import static org.hamcrest.CoreMatchers.equalTo;
42+
import static org.hamcrest.CoreMatchers.instanceOf;
43+
import static org.hamcrest.CoreMatchers.nullValue;
3244

3345
public class ExceptionsHelperTests extends ESTestCase {
3446

@@ -91,4 +103,107 @@ public void testStatus() {
91103
assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS));
92104
}
93105

106+
public void testGroupBy() {
107+
ShardOperationFailedException[] failures = new ShardOperationFailedException[]{
108+
createShardFailureParsingException("error", "node0", "index", 0, null),
109+
createShardFailureParsingException("error", "node1", "index", 1, null),
110+
createShardFailureParsingException("error", "node2", "index2", 2, null),
111+
createShardFailureParsingException("error", "node0", "index", 0, "cluster1"),
112+
createShardFailureParsingException("error", "node1", "index", 1, "cluster1"),
113+
createShardFailureParsingException("error", "node2", "index", 2, "cluster1"),
114+
createShardFailureParsingException("error", "node0", "index", 0, "cluster2"),
115+
createShardFailureParsingException("error", "node1", "index", 1, "cluster2"),
116+
createShardFailureParsingException("error", "node2", "index", 2, "cluster2"),
117+
createShardFailureParsingException("another error", "node2", "index", 2, "cluster2")
118+
};
119+
120+
ShardOperationFailedException[] groupBy = ExceptionsHelper.groupBy(failures);
121+
assertThat(groupBy.length, equalTo(5));
122+
String[] expectedIndices = new String[]{"index", "index2", "cluster1:index", "cluster2:index", "cluster2:index"};
123+
String[] expectedErrors = new String[]{"error", "error", "error", "error", "another error"};
124+
int i = 0;
125+
for (ShardOperationFailedException shardOperationFailedException : groupBy) {
126+
assertThat(shardOperationFailedException.getCause().getMessage(), equalTo(expectedErrors[i]));
127+
assertThat(shardOperationFailedException.index(), equalTo(expectedIndices[i++]));
128+
}
129+
}
130+
131+
private static ShardSearchFailure createShardFailureParsingException(String error, String nodeId,
132+
String index, int shardId, String clusterAlias) {
133+
ParsingException ex = new ParsingException(0, 0, error, new IllegalArgumentException("some bad argument"));
134+
ex.setIndex(index);
135+
return new ShardSearchFailure(ex, createSearchShardTarget(nodeId, shardId, index, clusterAlias));
136+
}
137+
138+
private static SearchShardTarget createSearchShardTarget(String nodeId, int shardId, String index, String clusterAlias) {
139+
return new SearchShardTarget(nodeId,
140+
new ShardId(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
141+
}
142+
143+
public void testGroupByNullTarget() {
144+
ShardOperationFailedException[] failures = new ShardOperationFailedException[] {
145+
createShardFailureQueryShardException("error", "index", null),
146+
createShardFailureQueryShardException("error", "index", null),
147+
createShardFailureQueryShardException("error", "index", null),
148+
createShardFailureQueryShardException("error", "index", "cluster1"),
149+
createShardFailureQueryShardException("error", "index", "cluster1"),
150+
createShardFailureQueryShardException("error", "index", "cluster1"),
151+
createShardFailureQueryShardException("error", "index", "cluster2"),
152+
createShardFailureQueryShardException("error", "index", "cluster2"),
153+
createShardFailureQueryShardException("error", "index2", null),
154+
createShardFailureQueryShardException("another error", "index2", null),
155+
};
156+
157+
ShardOperationFailedException[] groupBy = ExceptionsHelper.groupBy(failures);
158+
assertThat(groupBy.length, equalTo(5));
159+
String[] expectedIndices = new String[]{"index", "cluster1:index", "cluster2:index", "index2", "index2"};
160+
String[] expectedErrors = new String[]{"error", "error", "error", "error", "another error"};
161+
int i = 0;
162+
for (ShardOperationFailedException shardOperationFailedException : groupBy) {
163+
assertThat(shardOperationFailedException.index(), nullValue());
164+
assertThat(shardOperationFailedException.getCause(), instanceOf(ElasticsearchException.class));
165+
ElasticsearchException elasticsearchException = (ElasticsearchException) shardOperationFailedException.getCause();
166+
assertThat(elasticsearchException.getMessage(), equalTo(expectedErrors[i]));
167+
assertThat(elasticsearchException.getIndex().getName(), equalTo(expectedIndices[i++]));
168+
}
169+
}
170+
171+
private static ShardSearchFailure createShardFailureQueryShardException(String error, String indexName, String clusterAlias) {
172+
Index index = new Index(RemoteClusterAware.buildRemoteIndexName(clusterAlias, indexName), "uuid");
173+
QueryShardException queryShardException = new QueryShardException(index, error, new IllegalArgumentException("parse error"));
174+
return new ShardSearchFailure(queryShardException, null);
175+
}
176+
177+
public void testGroupByNullCause() {
178+
ShardOperationFailedException[] failures = new ShardOperationFailedException[] {
179+
new ShardSearchFailure("error", createSearchShardTarget("node0", 0, "index", null)),
180+
new ShardSearchFailure("error", createSearchShardTarget("node1", 1, "index", null)),
181+
new ShardSearchFailure("error", createSearchShardTarget("node1", 1, "index2", null)),
182+
new ShardSearchFailure("error", createSearchShardTarget("node2", 2, "index", "cluster1")),
183+
new ShardSearchFailure("error", createSearchShardTarget("node1", 1, "index", "cluster1")),
184+
new ShardSearchFailure("a different error", createSearchShardTarget("node3", 3, "index", "cluster1"))
185+
};
186+
187+
ShardOperationFailedException[] groupBy = ExceptionsHelper.groupBy(failures);
188+
assertThat(groupBy.length, equalTo(4));
189+
String[] expectedIndices = new String[]{"index", "index2", "cluster1:index", "cluster1:index"};
190+
String[] expectedErrors = new String[]{"error", "error", "error", "a different error"};
191+
192+
int i = 0;
193+
for (ShardOperationFailedException shardOperationFailedException : groupBy) {
194+
assertThat(shardOperationFailedException.reason(), equalTo(expectedErrors[i]));
195+
assertThat(shardOperationFailedException.index(), equalTo(expectedIndices[i++]));
196+
}
197+
}
198+
199+
public void testGroupByNullIndex() {
200+
ShardOperationFailedException[] failures = new ShardOperationFailedException[] {
201+
new ShardSearchFailure("error", null),
202+
new ShardSearchFailure(new IllegalArgumentException("error")),
203+
new ShardSearchFailure(new ParsingException(0, 0, "error", null)),
204+
};
205+
206+
ShardOperationFailedException[] groupBy = ExceptionsHelper.groupBy(failures);
207+
assertThat(groupBy.length, equalTo(3));
208+
}
94209
}

server/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ public static ShardSearchFailure createTestItem() {
4545
if (randomBoolean()) {
4646
String nodeId = randomAlphaOfLengthBetween(5, 10);
4747
String indexName = randomAlphaOfLengthBetween(5, 10);
48+
String clusterAlias = randomBoolean() ? randomAlphaOfLengthBetween(5, 10) : null;
4849
searchShardTarget = new SearchShardTarget(nodeId,
49-
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), randomInt()), null, null);
50+
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), randomInt()), clusterAlias, OriginalIndices.NONE);
5051
}
5152
return new ShardSearchFailure(ex, searchShardTarget);
5253
}
@@ -115,4 +116,22 @@ public void testToXContent() throws IOException {
115116
+ "}",
116117
xContent.utf8ToString());
117118
}
119+
120+
public void testToXContentWithClusterAlias() throws IOException {
121+
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(0, 0, "some message", null),
122+
new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), "cluster1", OriginalIndices.NONE));
123+
BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean());
124+
assertEquals(
125+
"{\"shard\":123,"
126+
+ "\"index\":\"cluster1:indexName\","
127+
+ "\"node\":\"nodeId\","
128+
+ "\"reason\":{"
129+
+ "\"type\":\"parsing_exception\","
130+
+ "\"reason\":\"some message\","
131+
+ "\"line\":0,"
132+
+ "\"col\":0"
133+
+ "}"
134+
+ "}",
135+
xContent.utf8ToString());
136+
}
118137
}

0 commit comments

Comments
 (0)