Skip to content

Commit 2fa6448

Browse files
authored
System index reads in separate threadpool (#60927)
This commit introduces a new thread pool, `system_read`, which is intended for use by system indices for all read operations (get and search). The `system_read` pool is a fixed thread pool with a maximum number of threads equal to lesser of half of the available processors or 5. Given the combination of both get and read operations in this thread pool, the queue size has been set to 2000. The motivation for this change is to allow system read operations to be serviced in spite of the number of user searches. In order to avoid a significant performance hit due to pattern matching on all search requests, a new metadata flag is added to mark indices as system or non-system. Previously created system indices will have flag added to their metadata upon upgrade to a version with this capability. Additionally, this change also introduces a new class, `SystemIndices`, which encapsulates logic around system indices. Currently, the class provides a method to check if an index is a system index and a method to find a matching index descriptor given the name of an index. Relates #50251 Relates #37867 Backport of #57936
1 parent a93be8d commit 2fa6448

35 files changed

+759
-210
lines changed

docs/reference/modules/threadpool.asciidoc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[[modules-threadpool]]
22
=== Thread pools
33

4-
A node uses several thread pools to manage memory consumption.
5-
Queues associated with many of the thread pools enable pending requests
6-
to be held instead of discarded.
4+
A node uses several thread pools to manage memory consumption.
5+
Queues associated with many of the thread pools enable pending requests
6+
to be held instead of discarded.
77

88
There are several thread pools, but the important ones include:
99

@@ -83,6 +83,11 @@ There are several thread pools, but the important ones include:
8383
Thread pool type is `scaling` with a keep-alive of `5m` and a default
8484
maximum size of `5`.
8585

86+
`system_read`::
87+
For read operations on system indices.
88+
Thread pool type is `fixed` and a default maximum size of
89+
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.
90+
8691
Changing a specific thread pool can be done by setting its type-specific
8792
parameters; for example, changing the number of threads in the `write` thread
8893
pool:

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,6 +1457,75 @@ public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
14571457
assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
14581458
}
14591459

1460+
public void testCreateSystemIndexInOldVersion() throws Exception {
1461+
assumeTrue("only run on old cluster", isRunningAgainstOldCluster());
1462+
// create index
1463+
Request createTestIndex = new Request("PUT", "/test_index_old");
1464+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0, \"index.number_of_shards\": 1}}");
1465+
client().performRequest(createTestIndex);
1466+
1467+
Request bulk = new Request("POST", "/_bulk");
1468+
bulk.addParameter("refresh", "true");
1469+
bulk.setJsonEntity("{\"index\": {\"_index\": \"test_index_old\", \"_type\" : \"_doc\"}}\n" +
1470+
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
1471+
if (isRunningAgainstAncientCluster() == false) {
1472+
bulk.setOptions(expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
1473+
}
1474+
client().performRequest(bulk);
1475+
1476+
// start a async reindex job
1477+
Request reindex = new Request("POST", "/_reindex");
1478+
reindex.setJsonEntity(
1479+
"{\n" +
1480+
" \"source\":{\n" +
1481+
" \"index\":\"test_index_old\"\n" +
1482+
" },\n" +
1483+
" \"dest\":{\n" +
1484+
" \"index\":\"test_index_reindex\"\n" +
1485+
" }\n" +
1486+
"}");
1487+
reindex.addParameter("wait_for_completion", "false");
1488+
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
1489+
String taskId = (String) response.get("task");
1490+
1491+
// wait for task
1492+
Request getTask = new Request("GET", "/_tasks/" + taskId);
1493+
getTask.addParameter("wait_for_completion", "true");
1494+
client().performRequest(getTask);
1495+
1496+
// make sure .tasks index exists
1497+
assertBusy(() -> {
1498+
Request getTasksIndex = new Request("GET", "/.tasks");
1499+
if (isRunningAgainstAncientCluster()) {
1500+
getTasksIndex.addParameter("include_type_name", "false");
1501+
}
1502+
assertThat(client().performRequest(getTasksIndex).getStatusLine().getStatusCode(), is(200));
1503+
});
1504+
}
1505+
1506+
@SuppressWarnings("unchecked" +
1507+
"")
1508+
public void testSystemIndexGetsUpdatedMetadata() throws Exception {
1509+
assumeFalse("only run in upgraded cluster", isRunningAgainstOldCluster());
1510+
1511+
assertBusy(() -> {
1512+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
1513+
Map<String, Object> response = entityAsMap(client().performRequest(clusterStateRequest));
1514+
Map<String, Object> metadata = (Map<String, Object>) response.get("metadata");
1515+
assertNotNull(metadata);
1516+
Map<String, Object> indices = (Map<String, Object>) metadata.get("indices");
1517+
assertNotNull(indices);
1518+
1519+
Map<String, Object> tasksIndex = (Map<String, Object>) indices.get(".tasks");
1520+
assertNotNull(tasksIndex);
1521+
assertThat(tasksIndex.get("system"), is(true));
1522+
1523+
Map<String, Object> testIndex = (Map<String, Object>) indices.get("test_index_old");
1524+
assertNotNull(testIndex);
1525+
assertThat(testIndex.get("system"), is(false));
1526+
});
1527+
}
1528+
14601529
public static void assertNumHits(String index, int numHits, int totalShards) throws IOException {
14611530
Map<String, Object> resp = entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search")));
14621531
assertNoFailures(resp);

qa/rolling-upgrade/build.gradle

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
4141
* </ul>
4242
*/
4343
String baseName = "v${bwcVersion}"
44+
String bwcVersionStr = "${bwcVersion}"
4445

4546
testClusters {
4647
"${baseName}" {
47-
versions = [bwcVersion.toString(), project.version]
48+
versions = [bwcVersionStr, project.version]
4849
numberOfNodes = 3
4950

5051
setting 'repositories.url.allowed_urls', 'http://snapshot.test*'
@@ -60,7 +61,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
6061
doFirst {
6162
project.delete("${buildDir}/cluster/shared/repo/${baseName}")
6263
}
63-
systemProperty 'tests.upgrade_from_version', bwcVersion.toString()
64+
systemProperty 'tests.upgrade_from_version', bwcVersionStr
6465
systemProperty 'tests.rest.suite', 'old_cluster'
6566
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
6667
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
@@ -73,7 +74,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
7374
testClusters."${baseName}".nextNodeToNextVersion()
7475
}
7576
systemProperty 'tests.rest.suite', 'mixed_cluster'
76-
systemProperty 'tests.upgrade_from_version', bwcVersion.toString()
77+
systemProperty 'tests.upgrade_from_version', bwcVersionStr
7778
systemProperty 'tests.first_round', 'true'
7879
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
7980
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
@@ -86,7 +87,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
8687
testClusters."${baseName}".nextNodeToNextVersion()
8788
}
8889
systemProperty 'tests.rest.suite', 'mixed_cluster'
89-
systemProperty 'tests.upgrade_from_version', bwcVersion.toString()
90+
systemProperty 'tests.upgrade_from_version', bwcVersionStr
9091
systemProperty 'tests.first_round', 'false'
9192
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
9293
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
@@ -99,7 +100,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
99100
}
100101
useCluster testClusters."${baseName}"
101102
systemProperty 'tests.rest.suite', 'upgraded_cluster'
102-
systemProperty 'tests.upgrade_from_version', bwcVersion.toString()
103+
systemProperty 'tests.upgrade_from_version', bwcVersionStr
103104

104105
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
105106
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MappingIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
*/
1919
package org.elasticsearch.upgrades;
2020

21+
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
2122
import org.elasticsearch.Version;
2223
import org.elasticsearch.client.Request;
2324
import org.elasticsearch.client.Response;
2425
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2526

27+
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60986")
2628
public class MappingIT extends AbstractRollingTestCase {
2729
/**
2830
* Create a mapping that explicitly disables the _all field (possible in 6x, see #37429)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.upgrades;
21+
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.client.Request;
24+
25+
import java.util.Map;
26+
27+
import static org.hamcrest.Matchers.is;
28+
29+
public class SystemIndicesUpgradeIT extends AbstractRollingTestCase {
30+
31+
public void testOldDoesntHaveSystemIndexMetadata() throws Exception {
32+
assumeTrue("only run in old cluster", CLUSTER_TYPE == ClusterType.OLD);
33+
// create index
34+
Request createTestIndex = new Request("PUT", "/test_index_old");
35+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0, \"index.number_of_shards\": 1}}");
36+
client().performRequest(createTestIndex);
37+
38+
Request bulk = new Request("POST", "/_bulk");
39+
bulk.addParameter("refresh", "true");
40+
if (UPGRADE_FROM_VERSION.before(Version.V_7_0_0)) {
41+
bulk.setJsonEntity("{\"index\": {\"_index\": \"test_index_old\", \"_type\" : \"_doc\"}}\n" +
42+
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
43+
} else {
44+
bulk.setJsonEntity("{\"index\": {\"_index\": \"test_index_old\"}\n" +
45+
"{\"f1\": \"v1\", \"f2\": \"v2\"}\n");
46+
}
47+
client().performRequest(bulk);
48+
49+
// start a async reindex job
50+
Request reindex = new Request("POST", "/_reindex");
51+
reindex.setJsonEntity(
52+
"{\n" +
53+
" \"source\":{\n" +
54+
" \"index\":\"test_index_old\"\n" +
55+
" },\n" +
56+
" \"dest\":{\n" +
57+
" \"index\":\"test_index_reindex\"\n" +
58+
" }\n" +
59+
"}");
60+
reindex.addParameter("wait_for_completion", "false");
61+
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
62+
String taskId = (String) response.get("task");
63+
64+
// wait for task
65+
Request getTask = new Request("GET", "/_tasks/" + taskId);
66+
getTask.addParameter("wait_for_completion", "true");
67+
client().performRequest(getTask);
68+
69+
// make sure .tasks index exists
70+
assertBusy(() -> {
71+
Request getTasksIndex = new Request("GET", "/.tasks");
72+
if (UPGRADE_FROM_VERSION.before(Version.V_7_0_0)) {
73+
getTasksIndex.addParameter("include_type_name", "false");
74+
}
75+
assertThat(client().performRequest(getTasksIndex).getStatusLine().getStatusCode(), is(200));
76+
});
77+
}
78+
79+
public void testMixedCluster() {
80+
assumeTrue("nothing to do in mixed cluster", CLUSTER_TYPE == ClusterType.MIXED);
81+
}
82+
83+
@SuppressWarnings("unchecked")
84+
public void testUpgradedCluster() throws Exception {
85+
assumeTrue("only run on upgraded cluster", CLUSTER_TYPE == ClusterType.UPGRADED);
86+
87+
assertBusy(() -> {
88+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
89+
Map<String, Object> response = entityAsMap(client().performRequest(clusterStateRequest));
90+
Map<String, Object> metadata = (Map<String, Object>) response.get("metadata");
91+
assertNotNull(metadata);
92+
Map<String, Object> indices = (Map<String, Object>) metadata.get("indices");
93+
assertNotNull(indices);
94+
95+
Map<String, Object> tasksIndex = (Map<String, Object>) indices.get(".tasks");
96+
assertNotNull(tasksIndex);
97+
assertThat(tasksIndex.get("system"), is(true));
98+
99+
Map<String, Object> testIndex = (Map<String, Object>) indices.get("test_index_old");
100+
assertNotNull(testIndex);
101+
assertThat(testIndex.get("system"), is(false));
102+
});
103+
}
104+
}

qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@
9595
index: .tasks
9696
body:
9797
query:
98-
match_all: {}
98+
match:
99+
task.description:
100+
query: 'reindexed_index_copy'
99101
- match: { hits.total: 1 }
100102
- match: { hits.hits.0._id: '/.+:\d+/' }
101103
- set: {hits.hits.0._id: task_id}

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,13 @@ protected Writeable.Reader<GetResponse> getResponseReader() {
115115

116116
@Override
117117
protected String getExecutor(GetRequest request, ShardId shardId) {
118-
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
119-
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
120-
shardId);
118+
final ClusterState clusterState = clusterService.state();
119+
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
120+
return ThreadPool.Names.SYSTEM_READ;
121+
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
122+
return ThreadPool.Names.SEARCH_THROTTLED;
123+
} else {
124+
return super.getExecutor(request, shardId);
125+
}
121126
}
122127
}

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,13 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
126126

127127
@Override
128128
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
129-
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
130-
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
131-
shardId);
129+
final ClusterState clusterState = clusterService.state();
130+
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
131+
return ThreadPool.Names.SYSTEM_READ;
132+
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
133+
return ThreadPool.Names.SEARCH_THROTTLED;
134+
} else {
135+
return super.getExecutor(request, shardId);
136+
}
132137
}
133138
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,10 +519,17 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
519519
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
520520
nodes::get, remoteConnections, searchTransportService::getConnection);
521521
boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());
522-
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,
522+
final Executor asyncSearchExecutor = asyncSearchExecutor(indices, clusterState);
523+
searchAsyncAction(task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
523524
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
524525
}
525526

527+
Executor asyncSearchExecutor(final Index[] indices, final ClusterState clusterState) {
528+
final boolean onlySystemIndices =
529+
Arrays.stream(indices).allMatch(index -> clusterState.metadata().index(index.getName()).isSystem());
530+
return onlySystemIndices ? threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
531+
}
532+
526533
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
527534
Function<String, DiscoveryNode> localNodes,
528535
BiFunction<String, String, DiscoveryNode> remoteNodes,
@@ -584,6 +591,7 @@ static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShards
584591
}
585592

586593
private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(SearchTask task, SearchRequest searchRequest,
594+
Executor executor,
587595
GroupShardsIterator<SearchShardIterator> shardIterators,
588596
SearchTimeProvider timeProvider,
589597
BiFunction<String, String, Transport.Connection> connectionLookup,
@@ -594,14 +602,14 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
594602
ActionListener<SearchResponse> listener,
595603
boolean preFilter,
596604
SearchResponse.Clusters clusters) {
597-
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
598605
if (preFilter) {
599606
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
600607
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
601608
timeProvider, clusterState, task, (iter) -> {
602609
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
603610
task,
604611
searchRequest,
612+
executor,
605613
iter,
606614
timeProvider,
607615
connectionLookup,

0 commit comments

Comments
 (0)