Skip to content

Commit 0434ecf

Browse files
committed
Merge pull request #11464 from nirmalc/nodes-preference
Search `preference` based on node specification
2 parents f4a143d + 72a9d34 commit 0434ecf

File tree

6 files changed

+104
-1
lines changed

6 files changed

+104
-1
lines changed

core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.collect.ImmutableList;
2323
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.collect.Sets;
2425
import com.google.common.collect.UnmodifiableIterator;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -330,6 +331,33 @@ public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
330331
return new PlainShardIterator(shardId, ordered);
331332
}
332333

334+
/**
335+
* Returns shards based on nodeAttributes given such as node name , node attribute, node IP
336+
* Supports node specifications in cluster API
337+
*
338+
* @param nodeAttribute
339+
* @param discoveryNodes
340+
*/
341+
public ShardIterator onlyNodeSelectorActiveInitializingShardsIt(String nodeAttribute, DiscoveryNodes discoveryNodes) {
342+
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
343+
Set<String> selectedNodes = Sets.newHashSet(discoveryNodes.resolveNodesIds(nodeAttribute));
344+
345+
for (ShardRouting shardRouting : activeShards) {
346+
if (selectedNodes.contains(shardRouting.currentNodeId())) {
347+
ordered.add(shardRouting);
348+
}
349+
}
350+
for (ShardRouting shardRouting : allInitializingShards) {
351+
if (selectedNodes.contains(shardRouting.currentNodeId())) {
352+
ordered.add(shardRouting);
353+
}
354+
}
355+
if (ordered.isEmpty()) {
356+
throw new IllegalArgumentException("No data node with critera [" + nodeAttribute + "] found");
357+
}
358+
return new PlainShardIterator(shardId, ordered);
359+
}
360+
333361
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
334362
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
335363
// fill it in a randomized fashion

core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
183183
String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
184184
ensureNodeIdExists(nodes, nodeId);
185185
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
186+
case ONLY_NODES:
187+
String nodeAttribute = preference.substring(Preference.ONLY_NODES.type().length() + 1);
188+
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttribute, nodes);
186189
default:
187190
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
188191
}

core/src/main/java/org/elasticsearch/cluster/routing/Preference.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ public enum Preference {
5757
/**
5858
* Route to specific node only
5959
*/
60-
ONLY_NODE("_only_node");
60+
ONLY_NODE("_only_node"),
61+
62+
/**
63+
* Route to only node with attribute
64+
*/
65+
ONLY_NODES("_only_nodes");
6166

6267
private final String type;
6368

@@ -97,6 +102,8 @@ public static Preference parse(String preference) {
97102
case "_only_local":
98103
case "_onlyLocal":
99104
return ONLY_LOCAL;
105+
case "_only_nodes":
106+
return ONLY_NODES;
100107
default:
101108
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
102109
}

core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,63 @@ public void testAttributePreferenceRouting() {
262262
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
263263
}
264264

265+
@Test
266+
public void testNodeSelectorRouting(){
267+
AllocationService strategy = createAllocationService(settingsBuilder()
268+
.put("cluster.routing.allocation.concurrent_recoveries", 10)
269+
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
270+
.build());
271+
272+
MetaData metaData = MetaData.builder()
273+
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
274+
.build();
275+
276+
RoutingTable routingTable = RoutingTable.builder()
277+
.addAsNew(metaData.index("test"))
278+
.build();
279+
280+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
281+
282+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
283+
.put(newNode("fred","node1", ImmutableMap.of("disk", "ebs")))
284+
.put(newNode("barney","node2", ImmutableMap.of("disk", "ephemeral")))
285+
.localNodeId("node1")
286+
).build();
287+
288+
routingTable = strategy.reroute(clusterState).routingTable();
289+
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
290+
291+
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
292+
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
293+
294+
ShardsIterator shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("disk:ebs",clusterState.nodes());
295+
assertThat(shardsIterator.size(), equalTo(1));
296+
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
297+
298+
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("dis*:eph*",clusterState.nodes());
299+
assertThat(shardsIterator.size(), equalTo(1));
300+
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
301+
302+
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
303+
assertThat(shardsIterator.size(), equalTo(1));
304+
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
305+
306+
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("bar*",clusterState.nodes());
307+
assertThat(shardsIterator.size(), equalTo(1));
308+
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node2"));
309+
310+
try {
311+
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("welma", clusterState.nodes());
312+
fail("shouldve raised illegalArgumentException");
313+
} catch (IllegalArgumentException illegal) {
314+
//expected exception
315+
}
316+
317+
shardsIterator = clusterState.routingTable().index("test").shard(0).onlyNodeSelectorActiveInitializingShardsIt("fred",clusterState.nodes());
318+
assertThat(shardsIterator.size(), equalTo(1));
319+
assertThat(shardsIterator.nextOrNull().currentNodeId(),equalTo("node1"));
320+
}
321+
265322

266323
@Test
267324
public void testShardsAndPreferNodeRouting() {

core/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ public static DiscoveryNode newNode(String nodeId, Map<String, String> attribute
105105
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
106106
}
107107

108+
public static DiscoveryNode newNode(String nodeName,String nodeId, Map<String, String> attributes) {
109+
return new DiscoveryNode(nodeName, nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
110+
}
111+
108112
public static DiscoveryNode newNode(String nodeId, Version version) {
109113
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version);
110114
}

docs/reference/search/request/preference.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ The `preference` is a query string parameter which can be set to:
3333
and `3` in this case). This preference can be combined with other
3434
preferences but it has to appear first: `_shards:2,3;_primary`
3535

36+
`_only_nodes`::
37+
Restricts the operation to nodes specified in node specification
38+
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster.html
39+
3640
Custom (string) value::
3741
A custom value will be used to guarantee that
3842
the same shards will be used for the same custom value. This can help

0 commit comments

Comments
 (0)