Skip to content

Commit 8c51fc7

Browse files
authored
System index reads in separate threadpool (#57936)
This commit introduces a new thread pool, `system_read`, which is intended for use by system indices for all read operations (get and search). The `system_read` pool is a fixed thread pool with a maximum number of threads equal to lesser of half of the available processors or 5. Given the combination of both get and read operations in this thread pool, the queue size has been set to 2000. The motivation for this change is to allow system read operations to be serviced in spite of the number of user searches. In order to avoid a significant performance hit due to pattern matching on all search requests, a new metadata flag is added to mark indices as system or non-system. Previously created system indices will have flag added to their metadata upon upgrade to a version with this capability. Additionally, this change also introduces a new class, `SystemIndices`, which encapsulates logic around system indices. Currently, the class provides a method to check if an index is a system index and a method to find a matching index descriptor given the name of an index. Relates #50251 Relates #37867
1 parent bb5a4b3 commit 8c51fc7

34 files changed

+738
-209
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ tasks.register("verifyVersions") {
174174
* after the backport of the backcompat code is complete.
175175
*/
176176

177-
boolean bwc_tests_enabled = true
178-
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
177+
boolean bwc_tests_enabled = false
178+
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/57936" /* place a PR link here when committing bwc changes */
179179
if (bwc_tests_enabled == false) {
180180
if (bwc_tests_disabled_issue.isEmpty()) {
181181
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

docs/reference/modules/threadpool.asciidoc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[[modules-threadpool]]
22
=== Thread pools
33

4-
A node uses several thread pools to manage memory consumption.
5-
Queues associated with many of the thread pools enable pending requests
6-
to be held instead of discarded.
4+
A node uses several thread pools to manage memory consumption.
5+
Queues associated with many of the thread pools enable pending requests
6+
to be held instead of discarded.
77

88
There are several thread pools, but the important ones include:
99

@@ -81,6 +81,11 @@ There are several thread pools, but the important ones include:
8181
Thread pool type is `scaling` with a keep-alive of `5m` and a default
8282
maximum size of `5`.
8383

84+
`system_read`::
85+
For read operations on system indices.
86+
Thread pool type is `fixed` and a default maximum size of
87+
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
88+
8489
Changing a specific thread pool can be done by setting its type-specific
8590
parameters; for example, changing the number of threads in the `write` thread
8691
pool:

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,69 @@ public void testResize() throws Exception {
13851385
}
13861386
}
13871387

1388+
public void testCreateSystemIndexInOldVersion() throws Exception {
1389+
assumeTrue("only run on old cluster", isRunningAgainstOldCluster());
1390+
// create index
1391+
Request createTestIndex = new Request("PUT", "/test_index_old");
1392+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
1393+
client().performRequest(createTestIndex);
1394+
1395+
Request bulk = new Request("POST", "/_bulk");
1396+
bulk.addParameter("refresh", "true");
1397+
bulk.setJsonEntity("{\"index\": {\"_index\": \"test_index_old\"}}\n" +
1398+
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
1399+
client().performRequest(bulk);
1400+
1401+
// start a async reindex job
1402+
Request reindex = new Request("POST", "/_reindex");
1403+
reindex.setJsonEntity(
1404+
"{\n" +
1405+
" \"source\":{\n" +
1406+
" \"index\":\"test_index_old\"\n" +
1407+
" },\n" +
1408+
" \"dest\":{\n" +
1409+
" \"index\":\"test_index_reindex\"\n" +
1410+
" }\n" +
1411+
"}");
1412+
reindex.addParameter("wait_for_completion", "false");
1413+
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
1414+
String taskId = (String) response.get("task");
1415+
1416+
// wait for task
1417+
Request getTask = new Request("GET", "/_tasks/" + taskId);
1418+
getTask.addParameter("wait_for_completion", "true");
1419+
client().performRequest(getTask);
1420+
1421+
// make sure .tasks index exists
1422+
assertBusy(() -> {
1423+
Request getTasksIndex = new Request("GET", "/.tasks");
1424+
assertThat(client().performRequest(getTasksIndex).getStatusLine().getStatusCode(), is(200));
1425+
});
1426+
}
1427+
1428+
@SuppressWarnings("unchecked" +
1429+
"")
1430+
public void testSystemIndexGetsUpdatedMetadata() throws Exception {
1431+
assumeFalse("only run in upgraded cluster", isRunningAgainstOldCluster());
1432+
1433+
assertBusy(() -> {
1434+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
1435+
Map<String, Object> response = entityAsMap(client().performRequest(clusterStateRequest));
1436+
Map<String, Object> metadata = (Map<String, Object>) response.get("metadata");
1437+
assertNotNull(metadata);
1438+
Map<String, Object> indices = (Map<String, Object>) metadata.get("indices");
1439+
assertNotNull(indices);
1440+
1441+
Map<String, Object> tasksIndex = (Map<String, Object>) indices.get(".tasks");
1442+
assertNotNull(tasksIndex);
1443+
assertThat(tasksIndex.get("system"), is(true));
1444+
1445+
Map<String, Object> testIndex = (Map<String, Object>) indices.get("test_index_old");
1446+
assertNotNull(testIndex);
1447+
assertThat(testIndex.get("system"), is(false));
1448+
});
1449+
}
1450+
13881451
public static void assertNumHits(String index, int numHits, int totalShards) throws IOException {
13891452
Map<String, Object> resp = entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search")));
13901453
assertNoFailures(resp);
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.upgrades;
21+
22+
import org.elasticsearch.client.Request;
23+
24+
import java.util.Map;
25+
26+
import static org.hamcrest.Matchers.is;
27+
28+
public class SystemIndicesUpgradeIT extends AbstractRollingTestCase {
29+
30+
public void testOldDoesntHaveSystemIndexMetadata() throws Exception {
31+
assumeTrue("only run in old cluster", CLUSTER_TYPE == ClusterType.OLD);
32+
// create index
33+
Request createTestIndex = new Request("PUT", "/test_index_old");
34+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
35+
client().performRequest(createTestIndex);
36+
37+
Request bulk = new Request("POST", "/_bulk");
38+
bulk.addParameter("refresh", "true");
39+
bulk.setJsonEntity("{\"index\": {\"_index\": \"test_index_old\"}}\n" +
40+
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
41+
client().performRequest(bulk);
42+
43+
// start a async reindex job
44+
Request reindex = new Request("POST", "/_reindex");
45+
reindex.setJsonEntity(
46+
"{\n" +
47+
" \"source\":{\n" +
48+
" \"index\":\"test_index_old\"\n" +
49+
" },\n" +
50+
" \"dest\":{\n" +
51+
" \"index\":\"test_index_reindex\"\n" +
52+
" }\n" +
53+
"}");
54+
reindex.addParameter("wait_for_completion", "false");
55+
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
56+
String taskId = (String) response.get("task");
57+
58+
// wait for task
59+
Request getTask = new Request("GET", "/_tasks/" + taskId);
60+
getTask.addParameter("wait_for_completion", "true");
61+
client().performRequest(getTask);
62+
63+
// make sure .tasks index exists
64+
assertBusy(() -> {
65+
Request getTasksIndex = new Request("GET", "/.tasks");
66+
assertThat(client().performRequest(getTasksIndex).getStatusLine().getStatusCode(), is(200));
67+
});
68+
}
69+
70+
public void testMixedCluster() {
71+
assumeTrue("nothing to do in mixed cluster", CLUSTER_TYPE == ClusterType.MIXED);
72+
}
73+
74+
@SuppressWarnings("unchecked")
75+
public void testUpgradedCluster() throws Exception {
76+
assumeTrue("only run on upgraded cluster", CLUSTER_TYPE == ClusterType.UPGRADED);
77+
78+
assertBusy(() -> {
79+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
80+
Map<String, Object> response = entityAsMap(client().performRequest(clusterStateRequest));
81+
Map<String, Object> metadata = (Map<String, Object>) response.get("metadata");
82+
assertNotNull(metadata);
83+
Map<String, Object> indices = (Map<String, Object>) metadata.get("indices");
84+
assertNotNull(indices);
85+
86+
Map<String, Object> tasksIndex = (Map<String, Object>) indices.get(".tasks");
87+
assertNotNull(tasksIndex);
88+
assertThat(tasksIndex.get("system"), is(true));
89+
90+
Map<String, Object> testIndex = (Map<String, Object>) indices.get("test_index_old");
91+
assertNotNull(testIndex);
92+
assertThat(testIndex.get("system"), is(false));
93+
});
94+
}
95+
}

qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@
9595
index: .tasks
9696
body:
9797
query:
98-
match_all: {}
98+
match:
99+
task.description:
100+
query: 'reindexed_index_copy'
99101
- match: { hits.total: 1 }
100102
- match: { hits.hits.0._id: '/.+:\d+/' }
101103
- set: {hits.hits.0._id: task_id}

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,13 @@ protected Writeable.Reader<GetResponse> getResponseReader() {
115115

116116
@Override
117117
protected String getExecutor(GetRequest request, ShardId shardId) {
118-
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
119-
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
120-
shardId);
118+
final ClusterState clusterState = clusterService.state();
119+
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
120+
return ThreadPool.Names.SYSTEM_READ;
121+
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
122+
return ThreadPool.Names.SEARCH_THROTTLED;
123+
} else {
124+
return super.getExecutor(request, shardId);
125+
}
121126
}
122127
}

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,13 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
128128

129129
@Override
130130
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
131-
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
132-
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
133-
shardId);
131+
final ClusterState clusterState = clusterService.state();
132+
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
133+
return ThreadPool.Names.SYSTEM_READ;
134+
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
135+
return ThreadPool.Names.SEARCH_THROTTLED;
136+
} else {
137+
return super.getExecutor(request, shardId);
138+
}
134139
}
135140
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,10 +519,17 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
519519
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
520520
nodes::get, remoteConnections, searchTransportService::getConnection);
521521
boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());
522-
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,
522+
final Executor asyncSearchExecutor = asyncSearchExecutor(indices, clusterState);
523+
searchAsyncAction(task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
523524
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
524525
}
525526

527+
Executor asyncSearchExecutor(final Index[] indices, final ClusterState clusterState) {
528+
final boolean onlySystemIndices =
529+
Arrays.stream(indices).allMatch(index -> clusterState.metadata().index(index.getName()).isSystem());
530+
return onlySystemIndices ? threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
531+
}
532+
526533
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
527534
Function<String, DiscoveryNode> localNodes,
528535
BiFunction<String, String, DiscoveryNode> remoteNodes,
@@ -584,6 +591,7 @@ static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShards
584591
}
585592

586593
private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(SearchTask task, SearchRequest searchRequest,
594+
Executor executor,
587595
GroupShardsIterator<SearchShardIterator> shardIterators,
588596
SearchTimeProvider timeProvider,
589597
BiFunction<String, String, Transport.Connection> connectionLookup,
@@ -594,14 +602,14 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
594602
ActionListener<SearchResponse> listener,
595603
boolean preFilter,
596604
SearchResponse.Clusters clusters) {
597-
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
598605
if (preFilter) {
599606
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
600607
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
601608
timeProvider, clusterState, task, (iter) -> {
602609
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
603610
task,
604611
searchRequest,
612+
executor,
605613
iter,
606614
timeProvider,
607615
connectionLookup,

0 commit comments

Comments
 (0)