32
32
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
33
33
import org .elasticsearch .cluster .health .ClusterHealthStatus ;
34
34
import org .elasticsearch .cluster .health .ClusterShardHealth ;
35
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
35
36
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
36
37
import org .elasticsearch .cluster .node .DiscoveryNode ;
37
38
import org .elasticsearch .cluster .node .DiscoveryNodes ;
43
44
import org .elasticsearch .cluster .service .ClusterService ;
44
45
import org .elasticsearch .common .collect .ImmutableOpenIntMap ;
45
46
import org .elasticsearch .common .collect .ImmutableOpenMap ;
47
+ import org .elasticsearch .common .collect .Tuple ;
46
48
import org .elasticsearch .common .inject .Inject ;
47
49
import org .elasticsearch .common .io .stream .StreamInput ;
48
50
import org .elasticsearch .common .util .concurrent .CountDown ;
@@ -100,7 +102,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
100
102
final RoutingTable routingTables = state .routingTable ();
101
103
final RoutingNodes routingNodes = state .getRoutingNodes ();
102
104
final String [] concreteIndices = indexNameExpressionResolver .concreteIndexNames (state , request );
103
- final Set <ShardId > shardIdsToFetch = new HashSet <>();
105
+ final Set <Tuple < ShardId , String >> shardsToFetch = new HashSet <>();
104
106
105
107
logger .trace ("using cluster state version [{}] to determine shards" , state .version ());
106
108
// collect relevant shard ids of the requested indices for fetching store infos
@@ -109,11 +111,12 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
109
111
if (indexShardRoutingTables == null ) {
110
112
continue ;
111
113
}
114
+ final String customDataPath = IndexMetaData .INDEX_DATA_PATH_SETTING .get (state .metaData ().index (index ).getSettings ());
112
115
for (IndexShardRoutingTable routing : indexShardRoutingTables ) {
113
116
final int shardId = routing .shardId ().id ();
114
117
ClusterShardHealth shardHealth = new ClusterShardHealth (shardId , routing );
115
118
if (request .shardStatuses ().contains (shardHealth .getStatus ())) {
116
- shardIdsToFetch .add (routing .shardId ());
119
+ shardsToFetch .add (Tuple . tuple ( routing .shardId (), customDataPath ));
117
120
}
118
121
}
119
122
}
@@ -123,7 +126,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
123
126
// we could fetch all shard store info from every node once (nNodes requests)
124
127
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
125
128
// for fetching shard stores info, that operates on a list of shards instead of a single shard
126
- new AsyncShardStoresInfoFetches (state .nodes (), routingNodes , shardIdsToFetch , listener ).start ();
129
+ new AsyncShardStoresInfoFetches (state .nodes (), routingNodes , shardsToFetch , listener ).start ();
127
130
}
128
131
129
132
@ Override
@@ -135,46 +138,46 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl
135
138
private class AsyncShardStoresInfoFetches {
136
139
private final DiscoveryNodes nodes ;
137
140
private final RoutingNodes routingNodes ;
138
- private final Set <ShardId > shardIds ;
141
+ private final Set <Tuple < ShardId , String >> shards ;
139
142
private final ActionListener <IndicesShardStoresResponse > listener ;
140
143
private CountDown expectedOps ;
141
144
private final Queue <InternalAsyncFetch .Response > fetchResponses ;
142
145
143
- AsyncShardStoresInfoFetches (DiscoveryNodes nodes , RoutingNodes routingNodes , Set <ShardId > shardIds ,
146
+ AsyncShardStoresInfoFetches (DiscoveryNodes nodes , RoutingNodes routingNodes , Set <Tuple < ShardId , String >> shards ,
144
147
ActionListener <IndicesShardStoresResponse > listener ) {
145
148
this .nodes = nodes ;
146
149
this .routingNodes = routingNodes ;
147
- this .shardIds = shardIds ;
150
+ this .shards = shards ;
148
151
this .listener = listener ;
149
152
this .fetchResponses = new ConcurrentLinkedQueue <>();
150
- this .expectedOps = new CountDown (shardIds .size ());
153
+ this .expectedOps = new CountDown (shards .size ());
151
154
}
152
155
153
156
void start () {
154
- if (shardIds .isEmpty ()) {
157
+ if (shards .isEmpty ()) {
155
158
listener .onResponse (new IndicesShardStoresResponse ());
156
159
} else {
157
160
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type
158
161
Lister <BaseNodesResponse <NodeGatewayStartedShards >, NodeGatewayStartedShards > lister = this ::listStartedShards ;
159
- for (ShardId shardId : shardIds ) {
160
- InternalAsyncFetch fetch = new InternalAsyncFetch (logger , "shard_stores" , shardId , lister );
162
+ for (Tuple < ShardId , String > shard : shards ) {
163
+ InternalAsyncFetch fetch = new InternalAsyncFetch (logger , "shard_stores" , shard . v1 (), shard . v2 () , lister );
161
164
fetch .fetchData (nodes , Collections .<String >emptySet ());
162
165
}
163
166
}
164
167
}
165
168
166
- private void listStartedShards (ShardId shardId , DiscoveryNode [] nodes ,
169
+ private void listStartedShards (ShardId shardId , String customDataPath , DiscoveryNode [] nodes ,
167
170
ActionListener <BaseNodesResponse <NodeGatewayStartedShards >> listener ) {
168
- var request = new TransportNodesListGatewayStartedShards .Request (shardId , nodes );
171
+ var request = new TransportNodesListGatewayStartedShards .Request (shardId , customDataPath , nodes );
169
172
client .executeLocally (TransportNodesListGatewayStartedShards .TYPE , request ,
170
173
ActionListener .wrap (listener ::onResponse , listener ::onFailure ));
171
174
}
172
175
173
176
private class InternalAsyncFetch extends AsyncShardFetch <NodeGatewayStartedShards > {
174
177
175
- InternalAsyncFetch (Logger logger , String type , ShardId shardId ,
178
+ InternalAsyncFetch (Logger logger , String type , ShardId shardId , String customDataPath ,
176
179
Lister <? extends BaseNodesResponse <NodeGatewayStartedShards >, NodeGatewayStartedShards > action ) {
177
- super (logger , type , shardId , action );
180
+ super (logger , type , shardId , customDataPath , action );
178
181
}
179
182
180
183
@ Override
0 commit comments