Skip to content

Commit ea90c96

Browse files
committed
Ensure that local cluster alias is never treated as remote (#37121)
With #36997 we added support for providing a local cluster alias with a `SearchRequest`. We intended to make sure that when provided as part of a search request, the cluster alias would never be used for connection lookups. Yet due to a bug we would still end up looking up the connection from the remote ones. This commit adds a test to make sure that whenever we set the cluster alias to the `SearchRequest` (which can only be done at transport), such alias is used as index prefix in the returned hits. No errors are thrown despite no remote clusters are configured indicating that such alias is never used for connection look-ups. Also, we add explicit support for the empty cluster alias when printing out index names through `RemoteClusterAware#buildRemoteIndexName`. In fact we don't want to print out `:index` when the cluster alias is set to empty string, but rather `index`. Yet, the semantic of empty string is different compared to `null` as it will still disable final reduction. This will be used in CCS when searching against remote clusters as well as the local one, the local one will have empty prefix yet it will need to disable final reduction so that its results will be properly merged with the ones coming from the remote clusters.
1 parent e1a852d commit ea90c96

File tree

5 files changed

+110
-5
lines changed

5 files changed

+110
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
2828
import org.elasticsearch.action.support.IndicesOptions;
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
30+
import org.elasticsearch.common.Nullable;
3031
import org.elasticsearch.common.io.stream.StreamInput;
3132
import org.elasticsearch.common.io.stream.StreamOutput;
3233
import org.elasticsearch.common.io.stream.Writeable;
@@ -423,11 +424,11 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne
423424
/**
424425
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
425426
* against the local cluster.
426-
* @param clusterAlias the cluster alias the node should be resolve against
427+
* @param clusterAlias the cluster alias the node should be resolved against
427428
* @param node the node to resolve
428429
* @return a connection to the given node belonging to the cluster with the provided alias.
429430
*/
430-
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
431+
Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode node) {
431432
if (clusterAlias == null) {
432433
return transportService.getConnection(node);
433434
} else {

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,19 @@ static BiFunction<String, String, Transport.Connection> buildConnectionLookup(St
362362
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
363363
return (clusterAlias, nodeId) -> {
364364
final DiscoveryNode discoveryNode;
365+
final boolean remoteCluster;
365366
if (clusterAlias == null || requestClusterAlias != null) {
366367
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
367368
discoveryNode = localNodes.apply(nodeId);
369+
remoteCluster = false;
368370
} else {
369371
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
372+
remoteCluster = true;
370373
}
371374
if (discoveryNode == null) {
372375
throw new IllegalStateException("no node found for id: " + nodeId);
373376
}
374-
return nodeToConnection.apply(clusterAlias, discoveryNode);
377+
return nodeToConnection.apply(remoteCluster ? clusterAlias : null, discoveryNode);
375378
};
376379
}
377380

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ private static int indexOfPortSeparator(String remoteHost) {
340340
}
341341

342342
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
343-
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
343+
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
344+
? indexName : clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
344345
}
345-
346346
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.search;
21+
22+
import org.elasticsearch.action.index.IndexRequest;
23+
import org.elasticsearch.action.index.IndexResponse;
24+
import org.elasticsearch.action.support.WriteRequest;
25+
import org.elasticsearch.rest.RestStatus;
26+
import org.elasticsearch.search.SearchHit;
27+
import org.elasticsearch.test.ESSingleNodeTestCase;
28+
29+
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
30+
31+
public void testLocalClusterAlias() {
32+
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
33+
indexRequest.source("field", "value");
34+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
35+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
36+
assertEquals(RestStatus.CREATED, indexResponse.status());
37+
38+
{
39+
SearchRequest searchRequest = new SearchRequest("local");
40+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
41+
assertEquals(1, searchResponse.getHits().getTotalHits());
42+
SearchHit[] hits = searchResponse.getHits().getHits();
43+
assertEquals(1, hits.length);
44+
SearchHit hit = hits[0];
45+
assertEquals("local", hit.getClusterAlias());
46+
assertEquals("test", hit.getIndex());
47+
assertEquals("1", hit.getId());
48+
}
49+
{
50+
SearchRequest searchRequest = new SearchRequest("");
51+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
52+
assertEquals(1, searchResponse.getHits().getTotalHits());
53+
SearchHit[] hits = searchResponse.getHits().getHits();
54+
assertEquals(1, hits.length);
55+
SearchHit hit = hits[0];
56+
assertEquals("", hit.getClusterAlias());
57+
assertEquals("test", hit.getIndex());
58+
assertEquals("1", hit.getId());
59+
}
60+
}
61+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
24+
public class RemoteClusterAwareTests extends ESTestCase {
25+
26+
public void testBuildRemoteIndexName() {
27+
{
28+
String clusterAlias = randomAlphaOfLengthBetween(5, 10);
29+
String index = randomAlphaOfLengthBetween(5, 10);
30+
String remoteIndexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, index);
31+
assertEquals(clusterAlias + ":" + index, remoteIndexName);
32+
}
33+
{
34+
String index = randomAlphaOfLengthBetween(5, 10);
35+
String clusterAlias = randomBoolean() ? null : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
36+
String remoteIndexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, index);
37+
assertEquals(index, remoteIndexName);
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)