Skip to content

Commit dfe6fd4

Browse files
authored
Make GET _cluster/stats cancellable (#68820)
Today `GET _cluster/stats` can be quite expensive, and is typically retrieved periodically by monitoring systems (e.g. Metricbeat) that implement a client-side timeout. When the client times out it closes the HTTP connection in use. With this commit we react to the close of the HTTP connection by cancelling the ongoing stats request, avoiding unnecessary duplicated work. Relates #55550 Backport of #68676
1 parent aeea3ba commit dfe6fd4

File tree

13 files changed

+946
-75
lines changed

13 files changed

+946
-75
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.client.Cancellable;
15+
import org.elasticsearch.client.Request;
16+
import org.elasticsearch.client.Response;
17+
import org.elasticsearch.client.ResponseListener;
18+
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
19+
import org.elasticsearch.common.lease.Releasable;
20+
import org.elasticsearch.common.lease.Releasables;
21+
import org.elasticsearch.common.settings.Setting;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.CollectionUtils;
24+
import org.elasticsearch.index.IndexService;
25+
import org.elasticsearch.index.IndexSettings;
26+
import org.elasticsearch.index.engine.Engine;
27+
import org.elasticsearch.index.engine.EngineConfig;
28+
import org.elasticsearch.index.engine.EngineFactory;
29+
import org.elasticsearch.index.engine.ReadOnlyEngine;
30+
import org.elasticsearch.index.seqno.SeqNoStats;
31+
import org.elasticsearch.index.shard.IndexShard;
32+
import org.elasticsearch.index.shard.IndexShardTestCase;
33+
import org.elasticsearch.index.translog.TranslogStats;
34+
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.plugins.EnginePlugin;
36+
import org.elasticsearch.plugins.Plugin;
37+
import org.elasticsearch.tasks.CancellableTask;
38+
import org.elasticsearch.tasks.TaskInfo;
39+
import org.elasticsearch.transport.TransportService;
40+
41+
import java.util.ArrayList;
42+
import java.util.Collection;
43+
import java.util.List;
44+
import java.util.Optional;
45+
import java.util.concurrent.CancellationException;
46+
import java.util.concurrent.Semaphore;
47+
import java.util.function.Function;
48+
49+
import static java.util.Collections.singletonList;
50+
import static org.hamcrest.Matchers.empty;
51+
import static org.hamcrest.Matchers.not;
52+
53+
public class ClusterStatsRestCancellationIT extends HttpSmokeTestCase {
54+
55+
public static final Setting<Boolean> BLOCK_STATS_SETTING = Setting.boolSetting("index.block_stats", false, Setting.Property.IndexScope);
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> nodePlugins() {
59+
return CollectionUtils.appendToCopy(super.nodePlugins(), ClusterStatsRestCancellationIT.StatsBlockingPlugin.class);
60+
}
61+
62+
@Override
63+
protected boolean addMockInternalEngine() {
64+
return false;
65+
}
66+
67+
@Override
68+
protected Settings nodeSettings(int nodeOrdinal) {
69+
return Settings.builder()
70+
.put(super.nodeSettings(nodeOrdinal))
71+
// disable internal cluster info service to avoid internal cluster stats calls
72+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
73+
.build();
74+
}
75+
76+
public void testClusterStateRestCancellation() throws Exception {
77+
78+
createIndex("test", Settings.builder().put(BLOCK_STATS_SETTING.getKey(), true).build());
79+
ensureGreen("test");
80+
81+
final List<Semaphore> statsBlocks = new ArrayList<>();
82+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
83+
for (final IndexService indexService : indicesService) {
84+
for (final IndexShard indexShard : indexService) {
85+
final Engine engine = IndexShardTestCase.getEngine(indexShard);
86+
if (engine instanceof StatsBlockingEngine) {
87+
statsBlocks.add(((StatsBlockingEngine) engine).statsBlock);
88+
}
89+
}
90+
}
91+
}
92+
assertThat(statsBlocks, not(empty()));
93+
94+
final List<Releasable> releasables = new ArrayList<>();
95+
try {
96+
for (final Semaphore statsBlock : statsBlocks) {
97+
statsBlock.acquire();
98+
releasables.add(statsBlock::release);
99+
}
100+
101+
final Request clusterStatsRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/stats");
102+
103+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
104+
logger.info("--> sending cluster state request");
105+
final Cancellable cancellable = getRestClient().performRequestAsync(clusterStatsRequest, new ResponseListener() {
106+
@Override
107+
public void onSuccess(Response response) {
108+
future.onResponse(null);
109+
}
110+
111+
@Override
112+
public void onFailure(Exception exception) {
113+
future.onFailure(exception);
114+
}
115+
});
116+
117+
logger.info("--> waiting for task to start");
118+
assertBusy(() -> {
119+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
120+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
121+
});
122+
123+
logger.info("--> waiting for at least one task to hit a block");
124+
assertBusy(() -> assertTrue(statsBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
125+
126+
logger.info("--> cancelling cluster stats request");
127+
cancellable.cancel();
128+
expectThrows(CancellationException.class, future::actionGet);
129+
130+
logger.info("--> checking that all cluster stats tasks are marked as cancelled");
131+
assertBusy(() -> {
132+
boolean foundTask = false;
133+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
134+
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
135+
if (cancellableTask.getAction().startsWith(ClusterStatsAction.NAME)) {
136+
foundTask = true;
137+
assertTrue(cancellableTask.isCancelled());
138+
}
139+
}
140+
}
141+
assertTrue(foundTask);
142+
});
143+
} finally {
144+
Releasables.close(releasables);
145+
}
146+
147+
logger.info("--> checking that all cluster stats tasks have finished");
148+
assertBusy(() -> {
149+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
150+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
151+
});
152+
}
153+
154+
public static class StatsBlockingPlugin extends Plugin implements EnginePlugin {
155+
156+
@Override
157+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
158+
if (BLOCK_STATS_SETTING.get(indexSettings.getSettings())) {
159+
return Optional.of(StatsBlockingEngine::new);
160+
}
161+
return Optional.empty();
162+
}
163+
164+
@Override
165+
public List<Setting<?>> getSettings() {
166+
return singletonList(BLOCK_STATS_SETTING);
167+
}
168+
}
169+
170+
private static class StatsBlockingEngine extends ReadOnlyEngine {
171+
172+
final Semaphore statsBlock = new Semaphore(1);
173+
174+
StatsBlockingEngine(EngineConfig config) {
175+
super(config, null, new TranslogStats(), true, Function.identity(), true);
176+
}
177+
178+
@Override
179+
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
180+
try {
181+
statsBlock.acquire();
182+
} catch (InterruptedException e) {
183+
throw new AssertionError(e);
184+
}
185+
statsBlock.release();
186+
return super.getSeqNoStats(globalCheckpoint);
187+
}
188+
}
189+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/AnalysisStats.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class AnalysisStats implements ToXContentFragment, Writeable {
4141
/**
4242
* Create {@link AnalysisStats} from the given cluster state.
4343
*/
44-
public static AnalysisStats of(Metadata metadata) {
44+
public static AnalysisStats of(Metadata metadata, Runnable ensureNotCancelled) {
4545
final Map<String, IndexFeatureStats> usedCharFilterTypes = new HashMap<>();
4646
final Map<String, IndexFeatureStats> usedTokenizerTypes = new HashMap<>();
4747
final Map<String, IndexFeatureStats> usedTokenFilterTypes = new HashMap<>();
@@ -52,6 +52,7 @@ public static AnalysisStats of(Metadata metadata) {
5252
final Map<String, IndexFeatureStats> usedBuiltInAnalyzers = new HashMap<>();
5353

5454
for (IndexMetadata indexMetadata : metadata) {
55+
ensureNotCancelled.run();
5556
if (indexMetadata.isSystem()) {
5657
// Don't include system indices in statistics about analysis,
5758
// we care about the user's indices.

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java

+9
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.tasks.CancellableTask;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.tasks.TaskId;
1417

1518
import java.io.IOException;
19+
import java.util.Map;
1620

1721
/**
1822
* A request to get cluster level stats.
@@ -31,6 +35,11 @@ public ClusterStatsRequest(String... nodesIds) {
3135
super(nodesIds);
3236
}
3337

38+
@Override
39+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
40+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
41+
}
42+
3443
@Override
3544
public void writeTo(StreamOutput out) throws IOException {
3645
super.writeTo(out);

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/MappingStats.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ public final class MappingStats implements ToXContentFragment, Writeable {
3838
/**
3939
* Create {@link MappingStats} from the given cluster state.
4040
*/
41-
public static MappingStats of(Metadata metadata) {
41+
public static MappingStats of(Metadata metadata, Runnable ensureNotCancelled) {
4242
Map<String, IndexFeatureStats> fieldTypes = new HashMap<>();
4343
for (IndexMetadata indexMetadata : metadata) {
44+
ensureNotCancelled.run();
4445
if (indexMetadata.isSystem()) {
4546
// Don't include system indices in statistics about mappings,
4647
// we care about the user's indices.

0 commit comments

Comments
 (0)