Skip to content

Commit eb782d0

Browse files
davidkylejasontedor
authored andcommitted
[ML] Do not run data cleanup on the client thread (#31691)
The ML expired data cleanup executes blocking calls. These calls need to be forked off the network thread or they can end up blocking all networking threads waiting for these blocking calls to finish. This commit moves the calls off the networking thread by forking to the ML utility thread pool.
1 parent d554df7 commit eb782d0

File tree

5 files changed

+604
-2
lines changed

5 files changed

+604
-2
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.threadpool.ThreadPool;
1717
import org.elasticsearch.transport.TransportService;
18+
import org.elasticsearch.transport.Transports;
1819
import org.elasticsearch.xpack.core.ClientHelper;
1920
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
2021
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -66,10 +67,16 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
6667

6768
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
6869
ActionListener<DeleteExpiredDataAction.Response> listener) {
70+
// Removing expired ML data and artifacts requires multiple operations.
71+
// These are queued up and executed sequentially in the action listener,
72+
// the chained calls must all run the ML utility thread pool NOT the thread
73+
// the previous action returned in which in the case of a transport_client_boss
74+
// thread is a disaster.
6975
if (mlDataRemoversIterator.hasNext()) {
7076
MlDataRemover remover = mlDataRemoversIterator.next();
7177
remover.remove(ActionListener.wrap(
72-
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener),
78+
booleanResponse -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() ->
79+
deleteExpiredData(mlDataRemoversIterator, listener)),
7380
listener::onFailure));
7481
} else {
7582
logger.info("Completed deletion of expired data");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.search.SearchHit;
1616
import org.elasticsearch.search.builder.SearchSourceBuilder;
1717
import org.elasticsearch.search.sort.SortBuilders;
18+
import org.elasticsearch.transport.Transports;
1819
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
1920
import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils;
2021

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import org.elasticsearch.gradle.LoggedExec
2+
3+
apply plugin: 'elasticsearch.standalone-rest-test'
4+
apply plugin: 'elasticsearch.rest-test'
5+
6+
dependencies {
7+
testCompile project(path: xpackModule('core'), configuration: 'runtime')
8+
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
9+
testCompile project(path: xpackModule('ml'), configuration: 'runtime')
10+
testCompile project(path: xpackModule('ml'), configuration: 'testArtifacts')
11+
}
12+
13+
integTestRunner {
14+
/*
15+
* We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each
16+
* other if we allow them to set the number of available processors as it's set-once in Netty.
17+
*/
18+
systemProperty 'es.set.netty.runtime.available.processors', 'false'
19+
}
20+
21+
// location of generated keystores and certificates
22+
File keystoreDir = new File(project.buildDir, 'keystore')
23+
24+
// Generate the node's keystore
25+
File nodeKeystore = new File(keystoreDir, 'test-node.jks')
26+
task createNodeKeyStore(type: LoggedExec) {
27+
doFirst {
28+
if (nodeKeystore.parentFile.exists() == false) {
29+
nodeKeystore.parentFile.mkdirs()
30+
}
31+
if (nodeKeystore.exists()) {
32+
delete nodeKeystore
33+
}
34+
}
35+
executable = new File(project.runtimeJavaHome, 'bin/keytool')
36+
standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8'))
37+
args '-genkey',
38+
'-alias', 'test-node',
39+
'-keystore', nodeKeystore,
40+
'-keyalg', 'RSA',
41+
'-keysize', '2048',
42+
'-validity', '712',
43+
'-dname', 'CN=smoke-test-plugins-ssl',
44+
'-keypass', 'keypass',
45+
'-storepass', 'keypass'
46+
}
47+
48+
// Add keystores to test classpath: it expects it there
49+
sourceSets.test.resources.srcDir(keystoreDir)
50+
processTestResources.dependsOn(createNodeKeyStore)
51+
52+
integTestCluster {
53+
dependsOn createNodeKeyStore
54+
setting 'xpack.security.enabled', 'true'
55+
setting 'xpack.ml.enabled', 'true'
56+
setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE'
57+
setting 'xpack.monitoring.enabled', 'false'
58+
setting 'xpack.security.authc.token.enabled', 'true'
59+
setting 'xpack.security.transport.ssl.enabled', 'true'
60+
setting 'xpack.security.transport.ssl.keystore.path', nodeKeystore.name
61+
setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
62+
setting 'xpack.security.audit.enabled', 'true'
63+
setting 'xpack.license.self_generated.type', 'trial'
64+
65+
keystoreSetting 'bootstrap.password', 'x-pack-test-password'
66+
keystoreSetting 'xpack.security.transport.ssl.keystore.secure_password', 'keypass'
67+
68+
numNodes = 3
69+
70+
setupCommand 'setupDummyUser',
71+
'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser'
72+
73+
extraConfigFile nodeKeystore.name, nodeKeystore
74+
75+
waitCondition = { node, ant ->
76+
File tmpFile = new File(node.cwd, 'wait.success')
77+
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
78+
dest: tmpFile.toString(),
79+
username: 'x_pack_rest_user',
80+
password: 'x-pack-test-password',
81+
ignoreerrors: true,
82+
retries: 10)
83+
return tmpFile.exists()
84+
}
85+
}

x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java renamed to x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.Arrays;
3838
import java.util.Collections;
3939
import java.util.List;
40-
import java.util.concurrent.ExecutionException;
4140
import java.util.concurrent.TimeUnit;
4241

4342
import static org.hamcrest.Matchers.equalTo;

0 commit comments

Comments
 (0)