Skip to content

Commit 2268846

Browse files
committed
Convert IndicesShardStoreResponse to use Map
The IndicesShardStoreResponse class uses ImmutableOpenMap and ImmutableOpenIntMap to represent the per index -> shard -> status structure. This commit converts these to Java Maps. relates elastic#86239
1 parent 51dc774 commit 2268846

File tree

6 files changed

+65
-171
lines changed

6 files changed

+65
-171
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
1818
import org.elasticsearch.cluster.routing.ShardRoutingState;
19-
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
20-
import org.elasticsearch.common.collect.ImmutableOpenMap;
2119
import org.elasticsearch.common.settings.Settings;
2220
import org.elasticsearch.index.Index;
2321
import org.elasticsearch.index.IndexService;
@@ -76,7 +74,7 @@ public void testBasic() throws Exception {
7674
// all shards
7775
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index).shardStatuses("all")).get();
7876
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
79-
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
77+
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
8078
assertThat(shardStores.size(), equalTo(2));
8179
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.entrySet()) {
8280
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.getValue()) {
@@ -98,7 +96,7 @@ public void testBasic() throws Exception {
9896
List<ShardRouting> unassignedShards = clusterState.routingTable().index(index).shardsWithState(ShardRoutingState.UNASSIGNED);
9997
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index)).get();
10098
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
101-
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
99+
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
102100
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
103101
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses.entrySet()) {
104102
assertThat("must report for one store", storesStatus.getValue().size(), equalTo(1));
@@ -125,8 +123,7 @@ public void testIndices() throws Exception {
125123
.indices()
126124
.shardStores(Requests.indicesShardStoresRequest().shardStatuses("all"))
127125
.get();
128-
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response
129-
.getStoreStatuses();
126+
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response.getStoreStatuses();
130127
assertThat(shardStatuses.containsKey(index1), equalTo(true));
131128
assertThat(shardStatuses.containsKey(index2), equalTo(true));
132129
assertThat(shardStatuses.get(index1).size(), equalTo(2));
@@ -181,7 +178,7 @@ public void testCorruptedShards() throws Exception {
181178

182179
assertBusy(() -> { // IndicesClusterStateService#failAndRemoveShard() called asynchronously but we need it to have completed here.
183180
IndicesShardStoresResponse rsp = client().admin().indices().prepareShardStores(index).setShardStatuses("all").get();
184-
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
181+
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
185182
assertNotNull(shardStatuses);
186183
assertThat(shardStatuses.size(), greaterThan(0));
187184
for (Map.Entry<Integer, List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses.entrySet()) {

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
2424
import org.elasticsearch.cluster.service.ClusterService;
2525
import org.elasticsearch.common.Strings;
26-
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
2726
import org.elasticsearch.common.settings.Settings;
2827
import org.elasticsearch.common.util.set.Sets;
2928
import org.elasticsearch.gateway.GatewayAllocator;
@@ -286,7 +285,7 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
286285
boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
287286
logger.info("--> explicitly promote old primary shard");
288287
final String idxName = "test";
289-
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
288+
Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin()
290289
.indices()
291290
.prepareShardStores(idxName)
292291
.get()

server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.elasticsearch.cluster.service.ClusterService;
3434
import org.elasticsearch.common.Priority;
3535
import org.elasticsearch.common.Strings;
36-
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
37-
import org.elasticsearch.common.collect.ImmutableOpenMap;
3836
import org.elasticsearch.common.inject.Inject;
3937
import org.elasticsearch.core.SuppressForbidden;
4038
import org.elasticsearch.tasks.Task;
@@ -110,12 +108,11 @@ private void verifyThenSubmitUpdate(
110108
IndicesShardStoresAction.NAME,
111109
new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
112110
new ActionListenerResponseHandler<>(ActionListener.wrap(response -> {
113-
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status = response
114-
.getStoreStatuses();
111+
Map<String, Map<Integer, List<IndicesShardStoresResponse.StoreStatus>>> status = response.getStoreStatuses();
115112
Exception e = null;
116113
for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
117114
final String index = entry.getKey();
118-
final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
115+
final Map<Integer, List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
119116
if (indexStatus == null) {
120117
// The index in the stale primary allocation request was green and hence filtered out by the store status
121118
// request. We ignore it here since the relevant exception will be thrown by the reroute action later on.

server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
232232
}
233233
}
234234

235-
private final ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses;
235+
private final Map<String, Map<Integer, List<StoreStatus>>> storeStatuses;
236236
private final List<Failure> failures;
237237

238-
public IndicesShardStoresResponse(
239-
ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses,
240-
List<Failure> failures
241-
) {
238+
public IndicesShardStoresResponse(Map<String, Map<Integer, List<StoreStatus>>> storeStatuses, List<Failure> failures) {
242239
this.storeStatuses = storeStatuses;
243240
this.failures = failures;
244241
}
@@ -264,7 +261,7 @@ public IndicesShardStoresResponse(StreamInput in) throws IOException {
264261
* Returns {@link StoreStatus}s
265262
* grouped by their index names and shard ids.
266263
*/
267-
public ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> getStoreStatuses() {
264+
public Map<String, Map<Integer, List<StoreStatus>>> getStoreStatuses() {
268265
return storeStatuses;
269266
}
270267

@@ -299,7 +296,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
299296
}
300297

301298
builder.startObject(Fields.INDICES);
302-
for (Map.Entry<String, ImmutableOpenIntMap<List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
299+
for (Map.Entry<String, Map<Integer, List<StoreStatus>>> indexShards : storeStatuses.entrySet()) {
303300
builder.startObject(indexShards.getKey());
304301

305302
builder.startObject(Fields.SHARDS);

server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

Lines changed: 32 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.apache.lucene.util.CollectionUtil;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.FailedNodeException;
15+
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.Failure;
16+
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus;
17+
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.AllocationStatus;
1518
import org.elasticsearch.action.support.ActionFilters;
1619
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
1720
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
@@ -31,8 +34,6 @@
3134
import org.elasticsearch.cluster.routing.RoutingTable;
3235
import org.elasticsearch.cluster.routing.ShardRouting;
3336
import org.elasticsearch.cluster.service.ClusterService;
34-
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
35-
import org.elasticsearch.common.collect.ImmutableOpenMap;
3637
import org.elasticsearch.common.inject.Inject;
3738
import org.elasticsearch.common.util.concurrent.CountDown;
3839
import org.elasticsearch.core.Tuple;
@@ -47,8 +48,10 @@
4748

4849
import java.util.ArrayList;
4950
import java.util.Collections;
51+
import java.util.HashMap;
5052
import java.util.HashSet;
5153
import java.util.List;
54+
import java.util.Map;
5255
import java.util.Queue;
5356
import java.util.Set;
5457
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -206,81 +209,50 @@ protected synchronized void processAsyncFetch(
206209
}
207210

208211
void finish() {
209-
ImmutableOpenMap.Builder<
210-
String,
211-
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>>> indicesStoreStatusesBuilder =
212-
ImmutableOpenMap.builder();
213-
214-
java.util.List<IndicesShardStoresResponse.Failure> failureBuilder = new ArrayList<>();
212+
Map<String, Map<Integer, List<StoreStatus>>> indicesStatuses = new HashMap<>();
213+
List<Failure> failures = new ArrayList<>();
215214
for (Response fetchResponse : fetchResponses) {
216-
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexStoreStatuses =
217-
indicesStoreStatusesBuilder.get(fetchResponse.shardId.getIndexName());
218-
final ImmutableOpenIntMap.Builder<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexShardsBuilder;
219-
if (indexStoreStatuses == null) {
220-
indexShardsBuilder = ImmutableOpenIntMap.builder();
221-
} else {
222-
indexShardsBuilder = ImmutableOpenIntMap.builder(indexStoreStatuses);
223-
}
224-
java.util.List<IndicesShardStoresResponse.StoreStatus> storeStatuses = indexShardsBuilder.get(
225-
fetchResponse.shardId.id()
226-
);
227-
if (storeStatuses == null) {
228-
storeStatuses = new ArrayList<>();
229-
}
230-
for (NodeGatewayStartedShards response : fetchResponse.responses) {
231-
if (shardExistsInNode(response)) {
232-
IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(
233-
fetchResponse.shardId.getIndexName(),
234-
fetchResponse.shardId.id(),
235-
response.getNode()
236-
);
237-
storeStatuses.add(
238-
new IndicesShardStoresResponse.StoreStatus(
239-
response.getNode(),
240-
response.allocationId(),
241-
allocationStatus,
242-
response.storeException()
243-
)
244-
);
215+
var indexName = fetchResponse.shardId.getIndexName();
216+
var shardId = fetchResponse.shardId.id();
217+
var indexStatuses = indicesStatuses.computeIfAbsent(indexName, k -> new HashMap<>());
218+
var storeStatuses = indexStatuses.computeIfAbsent(shardId, k -> new ArrayList<>());
219+
220+
for (NodeGatewayStartedShards r : fetchResponse.responses) {
221+
if (shardExistsInNode(r)) {
222+
var allocationStatus = getAllocationStatus(indexName, shardId, r.getNode());
223+
storeStatuses.add(new StoreStatus(r.getNode(), r.allocationId(), allocationStatus, r.storeException()));
245224
}
246225
}
247-
CollectionUtil.timSort(storeStatuses);
248-
indexShardsBuilder.put(fetchResponse.shardId.id(), storeStatuses);
249-
indicesStoreStatusesBuilder.put(fetchResponse.shardId.getIndexName(), indexShardsBuilder.build());
226+
250227
for (FailedNodeException failure : fetchResponse.failures) {
251-
failureBuilder.add(
252-
new IndicesShardStoresResponse.Failure(
253-
failure.nodeId(),
254-
fetchResponse.shardId.getIndexName(),
255-
fetchResponse.shardId.id(),
256-
failure.getCause()
257-
)
258-
);
228+
failures.add(new Failure(failure.nodeId(), indexName, shardId, failure.getCause()));
259229
}
260230
}
261-
listener.onResponse(
262-
new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder))
263-
);
231+
// make the status structure immutable
232+
indicesStatuses.replaceAll((k, v) -> {
233+
v.replaceAll((s, l) -> {
234+
CollectionUtil.timSort(l);
235+
return List.copyOf(l);
236+
});
237+
return Map.copyOf(v);
238+
});
239+
listener.onResponse(new IndicesShardStoresResponse(Map.copyOf(indicesStatuses), List.copyOf(failures)));
264240
}
265241

266-
private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(
267-
String index,
268-
int shardID,
269-
DiscoveryNode node
270-
) {
242+
private AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
271243
for (ShardRouting shardRouting : routingNodes.node(node.getId())) {
272244
ShardId shardId = shardRouting.shardId();
273245
if (shardId.id() == shardID && shardId.getIndexName().equals(index)) {
274246
if (shardRouting.primary()) {
275-
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
247+
return AllocationStatus.PRIMARY;
276248
} else if (shardRouting.assignedToNode()) {
277-
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
249+
return AllocationStatus.REPLICA;
278250
} else {
279-
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
251+
return AllocationStatus.UNUSED;
280252
}
281253
}
282254
}
283-
return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
255+
return AllocationStatus.UNUSED;
284256
}
285257

286258
/**

0 commit comments

Comments
 (0)