Skip to content

Commit c2adc58

Browse files
committed
Async fetch of shard started and store during allocation
Today, when a primary shard is not allocated we go to all the nodes to find where it is allocated (listing its started state). When we allocate a replica shard, we head to all the nodes and list its store to allocate the replica on a node that holds the closest matching index files to the primary. Those two operations today execute synchronously within the GatewayAllocator, which means they execute in a sync manner within the cluster update thread. For large clusters, or environments with very slow disk, those operations will stall the cluster update thread, making it seem like its stuck. Worse, if the FS is really slow, we timeout after 30s the operation (to not stall the cluster update thread completely). This means that we will have another run for the primary shard if we didn't find one, or we won't find the best node to place a shard since it might have timed out (listing stores need to list all files and read the checksum at the end of each file). On top of that, this sync operation happen one shard at a time, so its effectively compounding the problem in a serial manner the more shards we have and the slower FS is... This change moves to perform both listing the shard started states and the shard stores to an async manner. During the allocation by the GatewayAllocator, if data needs to be fetched from a node, it is done in an async fashion, with the response triggering a reroute to make sure the results will be taken into account. Also, if there are on going operations happening, the relevant shard data will not be taken into account until all the ongoing listing operations are done executing. The execution of listing shard states and stores has been moved to their own respective thread pools (scaling, so will go down to 0 when not needed anymore, unbounded queue, since we don't want to timeout, just let it execute based on how fast the local FS is). This is needed sine we are going to blast nodes with a lot of requests and we need to make sure there is no thread explosion. This change also improves the handling of shard failures coming from a specific node. Today, those nodes were ignored from allocation only for the single reroute round. Now, since fetching is async, we need to keep those failures around at least until a single successful fetch without the node is done, to make sure not to repeat allocating to the failed node all the time. Note, if before the indication of slow allocation was high pending tasks since the allocator was waiting for responses, not the pending tasks will be much smaller. In order to still indicate that the cluster is in the middle of fetching shard data, 2 attributes were added to the cluster health API, indicating the number of ongoing fetches of both started shards and shard store. closes elastic#9502 closes elastic#11101
1 parent 1038933 commit c2adc58

14 files changed

+1006
-330
lines changed

src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.collect.ImmutableList;
2323
import com.google.common.collect.Maps;
24+
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionResponse;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -57,6 +58,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
5758
int initializingShards = 0;
5859
int unassignedShards = 0;
5960
int numberOfPendingTasks = 0;
61+
int numberOfInFlightFetch = 0;
6062
boolean timedOut = false;
6163
ClusterHealthStatus status = ClusterHealthStatus.RED;
6264
private List<String> validationFailures;
@@ -67,12 +69,14 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
6769

6870
/** needed for plugins BWC */
6971
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
70-
this(clusterName, concreteIndices, clusterState, -1);
72+
this(clusterName, concreteIndices, clusterState, -1, -1);
7173
}
7274

73-
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks) {
75+
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
76+
int numberOfInFlightFetch) {
7477
this.clusterName = clusterName;
7578
this.numberOfPendingTasks = numberOfPendingTasks;
79+
this.numberOfInFlightFetch = numberOfInFlightFetch;
7680
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
7781
validationFailures = validation.failures();
7882
numberOfNodes = clusterState.nodes().size();
@@ -166,6 +170,10 @@ public int getNumberOfPendingTasks() {
166170
return this.numberOfPendingTasks;
167171
}
168172

173+
public int getNumberOfInFlightFetch() {
174+
return this.numberOfInFlightFetch;
175+
}
176+
169177
/**
170178
* <tt>true</tt> if the waitForXXX has timeout out and did not match.
171179
*/
@@ -220,6 +228,10 @@ public void readFrom(StreamInput in) throws IOException {
220228
validationFailures.add(in.readString());
221229
}
222230
}
231+
232+
if (in.getVersion().onOrAfter(Version.V_1_6_0)) {
233+
numberOfInFlightFetch = in.readInt();
234+
}
223235
}
224236

225237
@Override
@@ -245,6 +257,10 @@ public void writeTo(StreamOutput out) throws IOException {
245257
for (String failure : validationFailures) {
246258
out.writeString(failure);
247259
}
260+
261+
if (out.getVersion().onOrAfter(Version.V_1_6_0)) {
262+
out.writeInt(numberOfInFlightFetch);
263+
}
248264
}
249265

250266

@@ -268,6 +284,7 @@ static final class Fields {
268284
static final XContentBuilderString NUMBER_OF_NODES = new XContentBuilderString("number_of_nodes");
269285
static final XContentBuilderString NUMBER_OF_DATA_NODES = new XContentBuilderString("number_of_data_nodes");
270286
static final XContentBuilderString NUMBER_OF_PENDING_TASKS = new XContentBuilderString("number_of_pending_tasks");
287+
static final XContentBuilderString NUMBER_OF_IN_FLIGHT_FETCH = new XContentBuilderString("number_of_in_flight_fetch");
271288
static final XContentBuilderString ACTIVE_PRIMARY_SHARDS = new XContentBuilderString("active_primary_shards");
272289
static final XContentBuilderString ACTIVE_SHARDS = new XContentBuilderString("active_shards");
273290
static final XContentBuilderString RELOCATING_SHARDS = new XContentBuilderString("relocating_shards");
@@ -290,6 +307,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
290307
builder.field(Fields.INITIALIZING_SHARDS, getInitializingShards());
291308
builder.field(Fields.UNASSIGNED_SHARDS, getUnassignedShards());
292309
builder.field(Fields.NUMBER_OF_PENDING_TASKS, getNumberOfPendingTasks());
310+
builder.field(Fields.NUMBER_OF_IN_FLIGHT_FETCH, getNumberOfInFlightFetch());
293311

294312
String level = params.param("level", "cluster");
295313
boolean outputIndices = "indices".equals(level) || "shards".equals(level);

src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.action.admin.cluster.health;
2121

22-
import org.elasticsearch.ElasticsearchException;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.support.ActionFilters;
2524
import org.elasticsearch.action.support.IndicesOptions;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.common.inject.Inject;
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.common.unit.TimeValue;
32+
import org.elasticsearch.gateway.GatewayAllocator;
3333
import org.elasticsearch.indices.IndexMissingException;
3434
import org.elasticsearch.threadpool.ThreadPool;
3535
import org.elasticsearch.transport.TransportService;
@@ -40,12 +40,14 @@
4040
public class TransportClusterHealthAction extends TransportMasterNodeReadOperationAction<ClusterHealthRequest, ClusterHealthResponse> {
4141

4242
private final ClusterName clusterName;
43+
private final GatewayAllocator gatewayAllocator;
4344

4445
@Inject
4546
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
46-
ClusterName clusterName, ActionFilters actionFilters) {
47+
ClusterName clusterName, ActionFilters actionFilters, GatewayAllocator gatewayAllocator) {
4748
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, ClusterHealthRequest.class);
4849
this.clusterName = clusterName;
50+
this.gatewayAllocator = gatewayAllocator;
4951
}
5052

5153
@Override
@@ -158,12 +160,12 @@ public void onTimeout(TimeValue timeout) {
158160
}
159161

160162
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
161-
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
163+
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
162164
return prepareResponse(request, response, clusterState, waitFor);
163165
}
164166

165167
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
166-
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks());
168+
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.numberOfPendingTasks(), gatewayAllocator.getNumberOfInFlightFetch());
167169
boolean valid = prepareResponse(request, response, clusterState, waitFor);
168170
assert valid || timedOut;
169171
// we check for a timeout here since this method might be called from the wait_for_events
@@ -247,7 +249,7 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
247249
}
248250

249251

250-
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks) {
252+
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks, int numberOfInFlightFetch) {
251253
if (logger.isTraceEnabled()) {
252254
logger.trace("Calculating health based on state version [{}]", clusterState.version());
253255
}
@@ -256,11 +258,11 @@ private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, Cluste
256258
concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
257259
} catch (IndexMissingException e) {
258260
// one of the specified indices is not there - treat it as RED.
259-
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks);
261+
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, numberOfPendingTasks, numberOfInFlightFetch);
260262
response.status = ClusterHealthStatus.RED;
261263
return response;
262264
}
263265

264-
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks);
266+
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, numberOfInFlightFetch);
265267
}
266268
}

0 commit comments

Comments
 (0)