Skip to content

Commit bbfa047

Browse files
committed
Make indices stats requests cancellable
Relates elastic#55550
1 parent 146f7be commit bbfa047

File tree

6 files changed

+233
-168
lines changed

6 files changed

+233
-168
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.elasticsearch.action.support.PlainActionFuture;
12+
import org.elasticsearch.client.Cancellable;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.client.Response;
15+
import org.elasticsearch.client.ResponseListener;
16+
import org.elasticsearch.common.lease.Releasable;
17+
import org.elasticsearch.common.lease.Releasables;
18+
import org.elasticsearch.common.settings.Setting;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.CollectionUtils;
21+
import org.elasticsearch.index.IndexService;
22+
import org.elasticsearch.index.IndexSettings;
23+
import org.elasticsearch.index.engine.Engine;
24+
import org.elasticsearch.index.engine.EngineConfig;
25+
import org.elasticsearch.index.engine.EngineException;
26+
import org.elasticsearch.index.engine.EngineFactory;
27+
import org.elasticsearch.index.engine.ReadOnlyEngine;
28+
import org.elasticsearch.index.shard.IndexShard;
29+
import org.elasticsearch.index.shard.IndexShardTestCase;
30+
import org.elasticsearch.index.translog.TranslogStats;
31+
import org.elasticsearch.indices.IndicesService;
32+
import org.elasticsearch.plugins.EnginePlugin;
33+
import org.elasticsearch.plugins.Plugin;
34+
import org.elasticsearch.tasks.CancellableTask;
35+
import org.elasticsearch.tasks.TaskInfo;
36+
import org.elasticsearch.transport.TransportService;
37+
38+
import java.util.ArrayList;
39+
import java.util.Collection;
40+
import java.util.List;
41+
import java.util.Optional;
42+
import java.util.concurrent.CancellationException;
43+
import java.util.concurrent.Semaphore;
44+
import java.util.function.Function;
45+
46+
import static java.util.Collections.singletonList;
47+
import static org.hamcrest.Matchers.empty;
48+
import static org.hamcrest.Matchers.not;
49+
50+
/**
51+
* Base class for testing that cancellation works at the REST layer for requests that need to acquire a searcher on one or more shards.
52+
*
53+
* It works by blocking searcher acquisition in order to catch the request mid-execution, and then to check that all the tasks are cancelled
54+
* before they complete normally.
55+
*/
56+
public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {
57+
58+
private static final Setting<Boolean> BLOCK_SEARCHER_SETTING
59+
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);
60+
61+
@Override
62+
protected Collection<Class<? extends Plugin>> nodePlugins() {
63+
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
64+
}
65+
66+
@Override
67+
protected boolean addMockInternalEngine() {
68+
return false;
69+
}
70+
71+
void runTest(Request request, String actionPrefix) throws Exception {
72+
73+
createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
74+
ensureGreen("test");
75+
76+
final List<Semaphore> searcherBlocks = new ArrayList<>();
77+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
78+
for (final IndexService indexService : indicesService) {
79+
for (final IndexShard indexShard : indexService) {
80+
final Engine engine = IndexShardTestCase.getEngine(indexShard);
81+
if (engine instanceof SearcherBlockingEngine) {
82+
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
83+
}
84+
}
85+
}
86+
}
87+
assertThat(searcherBlocks, not(empty()));
88+
89+
final List<Releasable> releasables = new ArrayList<>();
90+
try {
91+
for (final Semaphore searcherBlock : searcherBlocks) {
92+
searcherBlock.acquire();
93+
releasables.add(searcherBlock::release);
94+
}
95+
96+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
97+
logger.info("--> sending request");
98+
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
99+
@Override
100+
public void onSuccess(Response response) {
101+
future.onResponse(null);
102+
}
103+
104+
@Override
105+
public void onFailure(Exception exception) {
106+
future.onFailure(exception);
107+
}
108+
});
109+
110+
logger.info("--> waiting for task to start");
111+
assertBusy(() -> {
112+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
113+
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix)));
114+
});
115+
116+
logger.info("--> waiting for at least one task to hit a block");
117+
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
118+
119+
logger.info("--> cancelling request");
120+
cancellable.cancel();
121+
expectThrows(CancellationException.class, future::actionGet);
122+
123+
logger.info("--> checking that all tasks are marked as cancelled");
124+
assertBusy(() -> {
125+
boolean foundTask = false;
126+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
127+
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
128+
if (cancellableTask.getAction().startsWith(actionPrefix)) {
129+
foundTask = true;
130+
assertTrue(
131+
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
132+
cancellableTask.isCancelled());
133+
}
134+
}
135+
}
136+
assertTrue("found no cancellable tasks", foundTask);
137+
});
138+
} finally {
139+
Releasables.close(releasables);
140+
}
141+
142+
logger.info("--> checking that all tasks have finished");
143+
assertBusy(() -> {
144+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
145+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
146+
});
147+
}
148+
149+
public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {
150+
151+
@Override
152+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
153+
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
154+
return Optional.of(SearcherBlockingEngine::new);
155+
}
156+
return Optional.empty();
157+
}
158+
159+
@Override
160+
public List<Setting<?>> getSettings() {
161+
return singletonList(BLOCK_SEARCHER_SETTING);
162+
}
163+
}
164+
165+
private static class SearcherBlockingEngine extends ReadOnlyEngine {
166+
167+
final Semaphore searcherBlock = new Semaphore(1);
168+
169+
SearcherBlockingEngine(EngineConfig config) {
170+
super(config, null, new TranslogStats(), true, Function.identity(), true);
171+
}
172+
173+
@Override
174+
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
175+
try {
176+
searcherBlock.acquire();
177+
} catch (InterruptedException e) {
178+
throw new AssertionError(e);
179+
}
180+
searcherBlock.release();
181+
return super.acquireSearcher(source, scope, wrapper);
182+
}
183+
}
184+
185+
}

qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java

Lines changed: 3 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -10,178 +10,14 @@
1010

1111
import org.apache.http.client.methods.HttpGet;
1212
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
13-
import org.elasticsearch.action.support.PlainActionFuture;
14-
import org.elasticsearch.client.Cancellable;
1513
import org.elasticsearch.client.Request;
16-
import org.elasticsearch.client.Response;
17-
import org.elasticsearch.client.ResponseListener;
18-
import org.elasticsearch.common.lease.Releasable;
19-
import org.elasticsearch.common.lease.Releasables;
20-
import org.elasticsearch.common.settings.Setting;
21-
import org.elasticsearch.common.settings.Settings;
22-
import org.elasticsearch.common.util.CollectionUtils;
23-
import org.elasticsearch.index.IndexService;
24-
import org.elasticsearch.index.IndexSettings;
25-
import org.elasticsearch.index.engine.Engine;
26-
import org.elasticsearch.index.engine.EngineConfig;
27-
import org.elasticsearch.index.engine.EngineException;
28-
import org.elasticsearch.index.engine.EngineFactory;
29-
import org.elasticsearch.index.engine.ReadOnlyEngine;
30-
import org.elasticsearch.index.shard.IndexShard;
31-
import org.elasticsearch.index.shard.IndexShardTestCase;
32-
import org.elasticsearch.index.translog.TranslogStats;
33-
import org.elasticsearch.indices.IndicesService;
34-
import org.elasticsearch.plugins.EnginePlugin;
35-
import org.elasticsearch.plugins.Plugin;
36-
import org.elasticsearch.tasks.CancellableTask;
37-
import org.elasticsearch.tasks.TaskInfo;
38-
import org.elasticsearch.transport.TransportService;
39-
40-
import java.util.ArrayList;
41-
import java.util.Collection;
42-
import java.util.List;
43-
import java.util.Optional;
44-
import java.util.concurrent.CancellationException;
45-
import java.util.concurrent.Semaphore;
46-
import java.util.function.Function;
47-
48-
import static java.util.Collections.singletonList;
49-
import static org.hamcrest.Matchers.empty;
50-
import static org.hamcrest.Matchers.not;
51-
52-
public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {
53-
54-
public static final Setting<Boolean> BLOCK_SEARCHER_SETTING
55-
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);
56-
57-
@Override
58-
protected Collection<Class<? extends Plugin>> nodePlugins() {
59-
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
60-
}
61-
62-
@Override
63-
protected boolean addMockInternalEngine() {
64-
return false;
65-
}
6614

15+
public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
6716
public void testIndicesSegmentsRestCancellation() throws Exception {
68-
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"));
17+
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME);
6918
}
7019

7120
public void testCatSegmentsRestCancellation() throws Exception {
72-
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"));
73-
}
74-
75-
private void runTest(Request request) throws Exception {
76-
77-
createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
78-
ensureGreen("test");
79-
80-
final List<Semaphore> searcherBlocks = new ArrayList<>();
81-
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
82-
for (final IndexService indexService : indicesService) {
83-
for (final IndexShard indexShard : indexService) {
84-
final Engine engine = IndexShardTestCase.getEngine(indexShard);
85-
if (engine instanceof SearcherBlockingEngine) {
86-
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
87-
}
88-
}
89-
}
90-
}
91-
assertThat(searcherBlocks, not(empty()));
92-
93-
final List<Releasable> releasables = new ArrayList<>();
94-
try {
95-
for (final Semaphore searcherBlock : searcherBlocks) {
96-
searcherBlock.acquire();
97-
releasables.add(searcherBlock::release);
98-
}
99-
100-
final PlainActionFuture<Void> future = new PlainActionFuture<>();
101-
logger.info("--> sending indices segments request");
102-
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
103-
@Override
104-
public void onSuccess(Response response) {
105-
future.onResponse(null);
106-
}
107-
108-
@Override
109-
public void onFailure(Exception exception) {
110-
future.onFailure(exception);
111-
}
112-
});
113-
114-
logger.info("--> waiting for task to start");
115-
assertBusy(() -> {
116-
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
117-
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
118-
});
119-
120-
logger.info("--> waiting for at least one task to hit a block");
121-
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));
122-
123-
logger.info("--> cancelling request");
124-
cancellable.cancel();
125-
expectThrows(CancellationException.class, future::actionGet);
126-
127-
logger.info("--> checking that all indices segments tasks are marked as cancelled");
128-
assertBusy(() -> {
129-
boolean foundTask = false;
130-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
131-
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
132-
if (cancellableTask.getAction().startsWith(IndicesSegmentsAction.NAME)) {
133-
foundTask = true;
134-
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
135-
}
136-
}
137-
}
138-
assertTrue("found no cancellable tasks", foundTask);
139-
});
140-
} finally {
141-
Releasables.close(releasables);
142-
}
143-
144-
logger.info("--> checking that all indices segments tasks have finished");
145-
assertBusy(() -> {
146-
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
147-
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
148-
});
21+
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME);
14922
}
150-
151-
public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {
152-
153-
@Override
154-
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
155-
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
156-
return Optional.of(SearcherBlockingEngine::new);
157-
}
158-
return Optional.empty();
159-
}
160-
161-
@Override
162-
public List<Setting<?>> getSettings() {
163-
return singletonList(BLOCK_SEARCHER_SETTING);
164-
}
165-
}
166-
167-
private static class SearcherBlockingEngine extends ReadOnlyEngine {
168-
169-
final Semaphore searcherBlock = new Semaphore(1);
170-
171-
SearcherBlockingEngine(EngineConfig config) {
172-
super(config, null, new TranslogStats(), true, Function.identity(), true);
173-
}
174-
175-
@Override
176-
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
177-
try {
178-
searcherBlock.acquire();
179-
} catch (InterruptedException e) {
180-
throw new AssertionError(e);
181-
}
182-
searcherBlock.release();
183-
return super.acquireSearcher(source, scope, wrapper);
184-
}
185-
}
186-
18723
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
13+
import org.elasticsearch.client.Request;
14+
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
15+
import org.elasticsearch.common.settings.Settings;
16+
17+
public class IndicesStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
18+
19+
@Override
20+
protected Settings nodeSettings(int nodeOrdinal) {
21+
return Settings.builder()
22+
.put(super.nodeSettings(nodeOrdinal))
23+
// disable internal cluster info service to avoid internal indices stats calls
24+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
25+
.build();
26+
}
27+
28+
public void testIndicesStatsRestCancellation() throws Exception {
29+
runTest(new Request(HttpGet.METHOD_NAME, "/_stats"), IndicesStatsAction.NAME);
30+
}
31+
}

0 commit comments

Comments
 (0)