Skip to content

Commit 9f432d8

Browse files
committed
Prevent can_match requests from sending to incompatible nodes
With cross cluster search we can potentially proxy `can_match` requests to nodes that don't have the endpoint. This might not cause any problem from a functional perspecitve but will cause ugly error messages on the target node. This commit will cause an IAE if we try to talk to an incompatible node via a proxy. Relates to elastic#25704
1 parent a85b22b commit 9f432d8

File tree

4 files changed

+39
-11
lines changed

4 files changed

+39
-11
lines changed

core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,18 @@ public void sendFreeContext(Transport.Connection connection, long contextId, fin
105105

106106
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
107107
ActionListener<CanMatchResponse> listener) {
108-
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
109-
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
108+
if (connection.getNode().getVersion().onOrAfter(Version.CURRENT.minimumCompatibilityVersion())) {
109+
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
110+
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
111+
} else {
112+
// this might look weird but if we are in a CrossClusterSearch environment we can get a connection
113+
// to a pre 5.latest node which is proxied by a 5.latest node under the hood since we are only compatible with 5.latest
114+
// instead of sending the request we shortcut it here and let the caller deal with this -- see #25704
115+
// also failing the request instead of returning a fake answer might trigger a retry on a replica which might be on a
116+
// compatible node
117+
throw new IllegalArgumentException("can_match is not supported on pre "+ Version.CURRENT.minimumCompatibilityVersion() +
118+
" nodes");
119+
}
110120
}
111121

112122
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
860860
} else {
861861
AggregatorFactories.Builder aggregations = source.aggregations();
862862
if (aggregations != null) {
863-
if (aggregations.mustVisiteAllDocs()) {
863+
if (aggregations.mustVisitAllDocs()) {
864864
return false;
865865
}
866866
}

core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void writeTo(StreamOutput out) throws IOException {
286286
}
287287
}
288288

289-
public boolean mustVisiteAllDocs() {
289+
public boolean mustVisitAllDocs() {
290290
for (AggregationBuilder builder : aggregationBuilders) {
291291
if (builder instanceof GlobalAggregationBuilder) {
292292
return true;

core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.search.internal.AliasFilter;
3131
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
3232
import org.elasticsearch.test.ESTestCase;
33+
import org.elasticsearch.test.VersionUtils;
3334
import org.elasticsearch.transport.Transport;
3435

3536
import java.io.IOException;
@@ -102,6 +103,18 @@ public void run() throws IOException {
102103
}
103104
}
104105

106+
public void testOldNodesTriggerException() {
107+
SearchTransportService searchTransportService = new SearchTransportService(
108+
Settings.builder().put("search.remote.connect", false).build(), null);
109+
DiscoveryNode node = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), VersionUtils.getPreviousVersion(Version
110+
.CURRENT.minimumCompatibilityVersion()));
111+
SearchAsyncActionTests.MockConnection mockConnection = new SearchAsyncActionTests.MockConnection(node);
112+
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class,
113+
() -> searchTransportService.sendCanMatch(mockConnection, null, null, null));
114+
assertEquals("can_match is not supported on pre " + Version
115+
.CURRENT.minimumCompatibilityVersion() + " nodes", illegalArgumentException.getMessage());
116+
}
117+
105118
public void testFilterWithFailure() throws InterruptedException {
106119
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
107120
System::nanoTime);
@@ -117,13 +130,18 @@ public void testFilterWithFailure() throws InterruptedException {
117130
@Override
118131
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
119132
ActionListener<CanMatchResponse> listener) {
120-
new Thread(() -> {
121-
if (request.shardId().id() == 0) {
122-
listener.onResponse(new CanMatchResponse(shard1));
123-
} else {
124-
listener.onFailure(new NullPointerException());
125-
}
126-
}).start();
133+
boolean throwException = request.shardId().id() != 0;
134+
if (throwException && randomBoolean()) {
135+
throw new IllegalArgumentException("boom");
136+
} else {
137+
new Thread(() -> {
138+
if (throwException == false) {
139+
listener.onResponse(new CanMatchResponse(shard1));
140+
} else {
141+
listener.onFailure(new NullPointerException());
142+
}
143+
}).start();
144+
}
127145
}
128146
};
129147

0 commit comments

Comments
 (0)