Skip to content

Commit 96b8a24

Browse files
authored
Indices segments: bg serialize, make cancellable (#68965)
The response to an `IndicesSegmentsAction` might be large, perhaps 10s of MBs of JSON, and today it is serialized on a transport thread. It also might take so long to respond that the client times out, resulting in the work needed to compute the response being wasted. This commit introduces the `DispatchingRestToXContentListener` which dispatches the work of serializing an `XContent` response to a non-transport thread, and also makes `TransportBroadcastByNodeAction` sensitive to the cancellability of its tasks. It uses these two features to make the `RestIndicesSegmentsAction` serialize its response on a `MANAGEMENT` thread, and to abort its work more promptly if the client's channel is closed before the response is sent.
1 parent 36e69fd commit 96b8a24

File tree

20 files changed

+426
-67
lines changed

20 files changed

+426
-67
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.client.Cancellable;
15+
import org.elasticsearch.client.Request;
16+
import org.elasticsearch.client.Response;
17+
import org.elasticsearch.client.ResponseListener;
18+
import org.elasticsearch.common.lease.Releasable;
19+
import org.elasticsearch.common.lease.Releasables;
20+
import org.elasticsearch.common.settings.Setting;
21+
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.util.CollectionUtils;
23+
import org.elasticsearch.index.IndexService;
24+
import org.elasticsearch.index.IndexSettings;
25+
import org.elasticsearch.index.engine.Engine;
26+
import org.elasticsearch.index.engine.EngineConfig;
27+
import org.elasticsearch.index.engine.EngineException;
28+
import org.elasticsearch.index.engine.EngineFactory;
29+
import org.elasticsearch.index.engine.ReadOnlyEngine;
30+
import org.elasticsearch.index.shard.IndexShard;
31+
import org.elasticsearch.index.shard.IndexShardTestCase;
32+
import org.elasticsearch.index.translog.TranslogStats;
33+
import org.elasticsearch.indices.IndicesService;
34+
import org.elasticsearch.plugins.EnginePlugin;
35+
import org.elasticsearch.plugins.Plugin;
36+
import org.elasticsearch.tasks.CancellableTask;
37+
import org.elasticsearch.tasks.TaskInfo;
38+
import org.elasticsearch.transport.TransportService;
39+
40+
import java.util.ArrayList;
41+
import java.util.Collection;
42+
import java.util.List;
43+
import java.util.Optional;
44+
import java.util.concurrent.CancellationException;
45+
import java.util.concurrent.Semaphore;
46+
import java.util.function.Function;
47+
48+
import static java.util.Collections.singletonList;
49+
import static org.hamcrest.Matchers.empty;
50+
import static org.hamcrest.Matchers.not;
51+
52+
public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {
53+
54+
public static final Setting<Boolean> BLOCK_SEARCHER_SETTING
55+
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> nodePlugins() {
59+
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
60+
}
61+
62+
@Override
63+
protected boolean addMockInternalEngine() {
64+
return false;
65+
}
66+
67+
public void testClusterStateRestCancellation() throws Exception {
68+
69+
createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
70+
ensureGreen("test");
71+
72+
final List<Semaphore> searcherBlocks = new ArrayList<>();
73+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
74+
for (final IndexService indexService : indicesService) {
75+
for (final IndexShard indexShard : indexService) {
76+
final Engine engine = IndexShardTestCase.getEngine(indexShard);
77+
if (engine instanceof SearcherBlockingEngine) {
78+
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
79+
}
80+
}
81+
}
82+
}
83+
assertThat(searcherBlocks, not(empty()));
84+
85+
final List<Releasable> releasables = new ArrayList<>();
86+
try {
87+
for (final Semaphore searcherBlock : searcherBlocks) {
88+
searcherBlock.acquire();
89+
releasables.add(searcherBlock::release);
90+
}
91+
92+
final Request indicesSegments = new Request(HttpGet.METHOD_NAME, "/_segments");
93+
94+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
95+
logger.info("--> sending indices segments request");
96+
final Cancellable cancellable = getRestClient().performRequestAsync(indicesSegments, new ResponseListener() {
97+
@Override
98+
public void onSuccess(Response response) {
99+
future.onResponse(null);
100+
}
101+
102+
@Override
103+
public void onFailure(Exception exception) {
104+
future.onFailure(exception);
105+
}
106+
});
107+
108+
logger.info("--> waiting for task to start");
109+
assertBusy(() -> {
110+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
111+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
112+
});
113+
114+
logger.info("--> waiting for at least one task to hit a block");
115+
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
116+
117+
logger.info("--> cancelling request");
118+
cancellable.cancel();
119+
expectThrows(CancellationException.class, future::actionGet);
120+
121+
logger.info("--> checking that all indices segments tasks are marked as cancelled");
122+
assertBusy(() -> {
123+
boolean foundTask = false;
124+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
125+
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
126+
if (cancellableTask.getAction().startsWith(IndicesSegmentsAction.NAME)) {
127+
foundTask = true;
128+
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
129+
}
130+
}
131+
}
132+
assertTrue("found no cancellable tasks", foundTask);
133+
});
134+
} finally {
135+
Releasables.close(releasables);
136+
}
137+
138+
logger.info("--> checking that all indices segments tasks have finished");
139+
assertBusy(() -> {
140+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
141+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
142+
});
143+
}
144+
145+
public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {
146+
147+
@Override
148+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
149+
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
150+
return Optional.of(SearcherBlockingEngine::new);
151+
}
152+
return Optional.empty();
153+
}
154+
155+
@Override
156+
public List<Setting<?>> getSettings() {
157+
return singletonList(BLOCK_SEARCHER_SETTING);
158+
}
159+
}
160+
161+
private static class SearcherBlockingEngine extends ReadOnlyEngine {
162+
163+
final Semaphore searcherBlock = new Semaphore(1);
164+
165+
SearcherBlockingEngine(EngineConfig config) {
166+
super(config, null, new TranslogStats(), true, Function.identity(), true);
167+
}
168+
169+
@Override
170+
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
171+
try {
172+
searcherBlock.acquire();
173+
} catch (InterruptedException e) {
174+
throw new AssertionError(e);
175+
}
176+
searcherBlock.release();
177+
return super.acquireSearcher(source, scope, wrapper);
178+
}
179+
}
180+
181+
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
653653
registerHandler.accept(new RestSnapshottableFeaturesAction());
654654
registerHandler.accept(new RestGetIndicesAction());
655655
registerHandler.accept(new RestIndicesStatsAction());
656-
registerHandler.accept(new RestIndicesSegmentsAction());
656+
registerHandler.accept(new RestIndicesSegmentsAction(threadPool));
657657
registerHandler.accept(new RestIndicesShardStoresAction());
658658
registerHandler.accept(new RestGetAliasesAction());
659659
registerHandler.accept(new RestIndexDeleteAliasesAction());

server/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.inject.Inject;
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.indices.IndicesService;
24+
import org.elasticsearch.tasks.Task;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.transport.TransportService;
2627

@@ -63,7 +64,7 @@ protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOExce
6364
}
6465

6566
@Override
66-
protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) {
67+
protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting, Task task) {
6768
indicesService.clearIndexShardCache(shardRouting.shardId(), request.queryCache(), request.fieldDataCache(), request.requestCache(),
6869
request.fields());
6970
return EmptyResult.INSTANCE;

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.index.shard.IndexShard;
2424
import org.elasticsearch.indices.IndicesService;
25+
import org.elasticsearch.tasks.Task;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627
import org.elasticsearch.transport.TransportService;
2728

@@ -62,7 +63,7 @@ protected ForceMergeRequest readRequestFrom(StreamInput in) throws IOException {
6263
}
6364

6465
@Override
65-
protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting shardRouting) throws IOException {
66+
protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting shardRouting, Task task) throws IOException {
6667
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).getShard(shardRouting.shardId().id());
6768
indexShard.forceMerge(request);
6869
return EmptyResult.INSTANCE;

server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.index.shard.IndexShard;
2525
import org.elasticsearch.indices.IndicesService;
2626
import org.elasticsearch.indices.recovery.RecoveryState;
27+
import org.elasticsearch.tasks.Task;
2728
import org.elasticsearch.threadpool.ThreadPool;
2829
import org.elasticsearch.transport.TransportService;
2930

@@ -86,7 +87,7 @@ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
8687
}
8788

8889
@Override
89-
protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting) {
90+
protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting, Task task) {
9091
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
9192
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
9293
return indexShard.recoveryState();

server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.unit.ByteSizeValue;
2121
import org.elasticsearch.common.xcontent.XContentBuilder;
2222
import org.elasticsearch.index.engine.Segment;
23+
import org.elasticsearch.transport.Transports;
2324

2425
import java.io.IOException;
2526
import java.util.ArrayList;
@@ -80,6 +81,8 @@ public void writeTo(StreamOutput out) throws IOException {
8081

8182
@Override
8283
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
84+
assert Transports.assertNotTransportThread("segments are very numerous, too expensive to serialize on a transport thread");
85+
8386
builder.startObject(Fields.INDICES);
8487

8588
for (IndexSegments indexSegments : getIndices().values()) {

server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
import org.elasticsearch.common.Strings;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.tasks.CancellableTask;
16+
import org.elasticsearch.tasks.Task;
17+
import org.elasticsearch.tasks.TaskId;
1518

1619
import java.io.IOException;
20+
import java.util.Map;
1721

1822
public class IndicesSegmentsRequest extends BroadcastRequest<IndicesSegmentsRequest> {
1923

@@ -52,6 +56,10 @@ public void verbose(boolean v) {
5256
public void writeTo(StreamOutput out) throws IOException {
5357
super.writeTo(out);
5458
out.writeBoolean(verbose);
59+
}
5560

61+
@Override
62+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
63+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
5664
}
5765
}

server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.index.IndexService;
2424
import org.elasticsearch.index.shard.IndexShard;
2525
import org.elasticsearch.indices.IndicesService;
26+
import org.elasticsearch.tasks.CancellableTask;
27+
import org.elasticsearch.tasks.Task;
2628
import org.elasticsearch.threadpool.ThreadPool;
2729
import org.elasticsearch.transport.TransportService;
2830

@@ -80,7 +82,8 @@ protected IndicesSegmentsRequest readRequestFrom(StreamInput in) throws IOExcept
8082
}
8183

8284
@Override
83-
protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) {
85+
protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting, Task task) {
86+
assert task instanceof CancellableTask;
8487
IndexService indexService = indicesService.indexServiceSafe(shardRouting.index());
8588
IndexShard indexShard = indexService.getShard(shardRouting.id());
8689
return new ShardSegments(indexShard.routingEntry(), indexShard.segments(request.verbose()));

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.index.shard.IndexShard;
2929
import org.elasticsearch.index.shard.ShardNotFoundException;
3030
import org.elasticsearch.indices.IndicesService;
31+
import org.elasticsearch.tasks.Task;
3132
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.transport.TransportService;
3334

@@ -83,7 +84,7 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException
8384
}
8485

8586
@Override
86-
protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting) {
87+
protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task) {
8788
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
8889
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
8990
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet

0 commit comments

Comments
 (0)