Skip to content

Commit 73c15ef

Browse files
authored
Make RoutingNodes behave like a collection (#83540) (#83573)
Extend it from `AbstractCollection` instead of `Iterable` analogous to #83453
1 parent e6603b2 commit 73c15ef

File tree

8 files changed

+64
-41
lines changed

8 files changed

+64
-41
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ public ClusterState measureAllocation() {
156156
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
157157
clusterState = strategy.applyStartedShards(
158158
clusterState,
159-
StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
159+
clusterState.getRoutingNodes()
160+
.stream()
160161
.flatMap(shardRoutings -> StreamSupport.stream(shardRoutings.spliterator(), false))
161162
.filter(ShardRouting::initializing)
162163
.collect(Collectors.toList())

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicReference;
3636
import java.util.stream.Collectors;
37-
import java.util.stream.StreamSupport;
3837

3938
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
4039
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
@@ -74,10 +73,15 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
7473
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
7574
}
7675

77-
final List<String> nodeIds = StreamSupport.stream(
78-
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
79-
false
80-
).map(RoutingNode::nodeId).collect(Collectors.toList());
76+
final List<String> nodeIds = client().admin()
77+
.cluster()
78+
.prepareState()
79+
.get()
80+
.getState()
81+
.getRoutingNodes()
82+
.stream()
83+
.map(RoutingNode::nodeId)
84+
.collect(Collectors.toList());
8185

8286
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
8387
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
@@ -149,10 +153,15 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {
149153
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
150154
}
151155

152-
final List<String> nodeIds = StreamSupport.stream(
153-
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
154-
false
155-
).map(RoutingNode::nodeId).collect(Collectors.toList());
156+
final List<String> nodeIds = client().admin()
157+
.cluster()
158+
.prepareState()
159+
.get()
160+
.getState()
161+
.getRoutingNodes()
162+
.stream()
163+
.map(RoutingNode::nodeId)
164+
.collect(Collectors.toList());
156165

157166
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
158167
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
@@ -270,10 +279,15 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception
270279
)
271280
);
272281

273-
final List<String> nodeIds = StreamSupport.stream(
274-
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
275-
false
276-
).map(RoutingNode::nodeId).collect(Collectors.toList());
282+
final List<String> nodeIds = client().admin()
283+
.cluster()
284+
.prepareState()
285+
.get()
286+
.getState()
287+
.getRoutingNodes()
288+
.stream()
289+
.map(RoutingNode::nodeId)
290+
.collect(Collectors.toList());
277291

278292
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
279293

@@ -329,10 +343,15 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
329343

330344
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
331345

332-
final List<String> nodeIds = StreamSupport.stream(
333-
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
334-
false
335-
).map(RoutingNode::nodeId).collect(Collectors.toList());
346+
final List<String> nodeIds = client().admin()
347+
.cluster()
348+
.prepareState()
349+
.get()
350+
.getState()
351+
.getRoutingNodes()
352+
.stream()
353+
.map(RoutingNode::nodeId)
354+
.collect(Collectors.toList());
336355

337356
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
338357
assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1));
@@ -437,10 +456,15 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception
437456
)
438457
);
439458

440-
final List<String> nodeIds = StreamSupport.stream(
441-
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
442-
false
443-
).map(RoutingNode::nodeId).collect(Collectors.toList());
459+
final List<String> nodeIds = client().admin()
460+
.cluster()
461+
.prepareState()
462+
.get()
463+
.getState()
464+
.getRoutingNodes()
465+
.stream()
466+
.map(RoutingNode::nodeId)
467+
.collect(Collectors.toList());
444468

445469
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
446470

server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.util.Set;
2727
import java.util.stream.Collectors;
28-
import java.util.stream.StreamSupport;
2928

3029
import static org.elasticsearch.client.internal.Requests.clusterHealthRequest;
3130
import static org.elasticsearch.client.internal.Requests.createIndexRequest;
@@ -239,9 +238,7 @@ private String getLocalNodeId(String name) {
239238
}
240239

241240
private void assertNodesPresent(RoutingNodes routingNodes, String... nodes) {
242-
final Set<String> keySet = StreamSupport.stream(routingNodes.spliterator(), false)
243-
.map(RoutingNode::nodeId)
244-
.collect(Collectors.toSet());
241+
final Set<String> keySet = routingNodes.stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
245242
assertThat(keySet, containsInAnyOrder(nodes));
246243
}
247244
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.index.Index;
2828
import org.elasticsearch.index.shard.ShardId;
2929

30+
import java.util.AbstractCollection;
3031
import java.util.ArrayDeque;
3132
import java.util.ArrayList;
3233
import java.util.Collections;
@@ -44,7 +45,6 @@
4445
import java.util.Set;
4546
import java.util.function.Predicate;
4647
import java.util.stream.Collectors;
47-
import java.util.stream.StreamSupport;
4848

4949
/**
5050
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
@@ -60,7 +60,7 @@
6060
* <li> {@link #failShard} fails/cancels an assigned shard.
6161
* </ul>
6262
*/
63-
public class RoutingNodes implements Iterable<RoutingNode> {
63+
public class RoutingNodes extends AbstractCollection<RoutingNode> {
6464

6565
private final Map<String, RoutingNode> nodesToShards;
6666

@@ -299,10 +299,7 @@ public Set<String> getAttributeValues(String attributeName) {
299299
: Thread.currentThread().getName() + " should be the master service thread";
300300
return attributeValuesByAttribute.computeIfAbsent(
301301
attributeName,
302-
ignored -> StreamSupport.stream(this.spliterator(), false)
303-
.map(r -> r.node().getAttributes().get(attributeName))
304-
.filter(Objects::nonNull)
305-
.collect(Collectors.toSet())
302+
ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet())
306303
);
307304
}
308305

@@ -869,6 +866,7 @@ private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shar
869866
/**
870867
* Returns the number of routing nodes
871868
*/
869+
@Override
872870
public int size() {
873871
return nodesToShards.size();
874872
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.function.Supplier;
4848
import java.util.stream.Collectors;
4949
import java.util.stream.Stream;
50-
import java.util.stream.StreamSupport;
5150

5251
/**
5352
* Listens for a node to go over the high watermark and kicks off an empty
@@ -375,7 +374,8 @@ public void onNewInfo(ClusterInfo info) {
375374
}
376375

377376
// Generate a map of node name to ID so we can use it to look up node replacement targets
378-
final Map<String, String> nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false)
377+
final Map<String, String> nodeNameToId = state.getRoutingNodes()
378+
.stream()
379379
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
380380

381381
// Calculate both the source node id and the target node id of a "replace" type shutdown

server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.HashMap;
3535
import java.util.Map;
3636
import java.util.function.UnaryOperator;
37-
import java.util.stream.StreamSupport;
3837

3938
import static java.util.Collections.emptyMap;
4039
import static java.util.Collections.singletonList;
@@ -1098,7 +1097,8 @@ private void testExplanation(
10981097
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
10991098
);
11001099

1101-
final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
1100+
final RoutingNode emptyNode = clusterState.getRoutingNodes()
1101+
.stream()
11021102
.filter(RoutingNode::isEmpty)
11031103
.findFirst()
11041104
.orElseThrow(AssertionError::new);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import java.util.Collections;
3434
import java.util.List;
35-
import java.util.stream.StreamSupport;
3635

3736
import static java.util.Collections.emptyMap;
3837
import static java.util.Collections.singletonList;
@@ -205,12 +204,14 @@ public void testSameHostCheckWithExplain() {
205204
new ClusterSettings(sameHostSetting, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
206205
);
207206

208-
final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
207+
final RoutingNode emptyNode = clusterState.getRoutingNodes()
208+
.stream()
209209
.filter(node -> node.getByShardId(unassignedShard.shardId()) == null)
210210
.findFirst()
211211
.orElseThrow(AssertionError::new);
212212

213-
final RoutingNode otherNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
213+
final RoutingNode otherNode = clusterState.getRoutingNodes()
214+
.stream()
214215
.filter(node -> node != emptyNode)
215216
.findFirst()
216217
.orElseThrow(AssertionError::new);

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
344344
return false;
345345
}
346346
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
347-
Set<Decision.Type> decisionTypes = StreamSupport.stream(allocation.routingNodes().spliterator(), false)
347+
Set<Decision.Type> decisionTypes = allocation.routingNodes()
348+
.stream()
348349
.map(
349350
node -> dataTierAllocationDecider.shouldFilter(
350351
indexMetadata,
@@ -369,7 +370,8 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
369370
allocation.debugDecision(true);
370371
try {
371372
// check that it does not belong on any existing node, i.e., there must be only a tier like reason it cannot be allocated
372-
return StreamSupport.stream(allocation.routingNodes().spliterator(), false)
373+
return allocation.routingNodes()
374+
.stream()
373375
.anyMatch(node -> isFilterTierOnlyDecision(allocationDeciders.canAllocate(shard, node, allocation), indexMetadata));
374376
} finally {
375377
allocation.debugDecision(false);

0 commit comments

Comments
 (0)