Skip to content

Commit 1bb6740

Browse files
authored
[ML] remove thread sleep from results persister (#65904) (#66034)
* [ML] remove thread sleep from results persister Having a thread sleep in a recurring action may cause issues on node shutdown. What if the thread is sleeping while a nice shutdown is occurring? Since these retry timeouts can extend to a larger period of time, we should instead use scheduled tasks + the threadpool. This allows the retries to be effectively canceled instead of waiting for a thread to wake back up. closes #65890
1 parent 643a8d7 commit 1bb6740

File tree

13 files changed

+407
-144
lines changed

13 files changed

+407
-144
lines changed

server/src/main/java/org/elasticsearch/action/support/RetryableAction.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ public void onRejection(Exception e) {
114114

115115
public abstract boolean shouldRetry(Exception e);
116116

117+
protected long calculateDelay(long previousDelay) {
118+
return Math.min(previousDelay * 2, Integer.MAX_VALUE);
119+
}
120+
121+
protected long minimumDelayMillis() {
122+
return 1L;
123+
}
124+
117125
public void onFinished() {
118126
}
119127

@@ -148,10 +156,10 @@ public void onFailure(Exception e) {
148156
} else {
149157
addException(e);
150158

151-
final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
159+
final long nextDelayMillisBound = calculateDelay(delayMillisBound);
152160
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
153161
final Runnable runnable = createRunnable(retryingListener);
154-
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
162+
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + minimumDelayMillis();
155163
if (isDone.get() == false) {
156164
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
157165
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void cleanUpTest() {
6565
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
6666
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
6767
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
68+
.putNull("logger.org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService")
6869
.build()).get();
6970
cleanUp();
7071
}
@@ -121,6 +122,7 @@ public void testBulkFailureRetries() throws Exception {
121122
.setTransientSettings(Settings.builder()
122123
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
123124
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
125+
.put("logger.org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService", "TRACE")
124126
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
125127
.put("xpack.ml.persist_results_max_retries", "15")
126128
.build()).get();

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.xcontent.ToXContent;
3030
import org.elasticsearch.common.xcontent.XContentParser;
3131
import org.elasticsearch.index.reindex.ReindexPlugin;
32+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
3233
import org.elasticsearch.plugins.Plugin;
3334
import org.elasticsearch.search.SearchHit;
3435
import org.elasticsearch.threadpool.ThreadPool;
@@ -57,6 +58,7 @@
5758
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
5859
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
5960
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
61+
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
6062
import org.elasticsearch.xpack.ilm.IndexLifecycle;
6163
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
6264
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
@@ -127,7 +129,10 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
127129
protected Collection<Class<? extends Plugin>> getPlugins() {
128130
return pluginList(
129131
LocalStateMachineLearning.class,
132+
DataStreamsPlugin.class,
133+
IngestCommonPlugin.class,
130134
ReindexPlugin.class,
135+
MockPainlessScriptEngine.TestPlugin.class,
131136
// ILM is required for .ml-state template index settings
132137
IndexLifecycle.class);
133138
}
@@ -142,7 +147,7 @@ public void createComponents() throws Exception {
142147
renormalizer = mock(Renormalizer.class);
143148
process = mock(AutodetectProcess.class);
144149
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
145-
ThreadPool tp = mock(ThreadPool.class);
150+
ThreadPool tp = mockThreadPool();
146151
Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();
147152
ClusterSettings clusterSettings = new ClusterSettings(settings,
148153
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
@@ -153,9 +158,8 @@ public void createComponents() throws Exception {
153158
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
154159
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
155160
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
156-
157161
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
158-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
162+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
159163
resultProcessor = new AutodetectResultProcessor(
160164
client(),
161165
auditor,

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.concurrent.atomic.AtomicReference;
3838

3939
import static org.hamcrest.CoreMatchers.equalTo;
40-
import static org.mockito.Mockito.mock;
4140

4241
public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
4342

@@ -49,7 +48,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
4948
@Before
5049
public void createComponents() {
5150
Settings settings = nodeSettings(0);
52-
ThreadPool tp = mock(ThreadPool.class);
51+
ThreadPool tp = mockThreadPool();
5352
ClusterSettings clusterSettings = new ClusterSettings(settings,
5453
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
5554
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -61,7 +60,7 @@ public void createComponents() {
6160
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
6261

6362
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
64-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
63+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
6564
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
6665
jobResultsPersister = new JobResultsPersister(
6766
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import static org.hamcrest.Matchers.not;
102102
import static org.hamcrest.collection.IsEmptyCollection.empty;
103103
import static org.hamcrest.core.Is.is;
104-
import static org.mockito.Mockito.mock;
105104

106105

107106
public class JobResultsProviderIT extends MlSingleNodeTestCase {
@@ -115,7 +114,7 @@ public void createComponents() throws Exception {
115114
Settings.Builder builder = Settings.builder()
116115
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
117116
jobProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
118-
ThreadPool tp = mock(ThreadPool.class);
117+
ThreadPool tp = mockThreadPool();
119118
ClusterSettings clusterSettings = new ClusterSettings(builder.build(),
120119
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
121120
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -127,7 +126,7 @@ public void createComponents() throws Exception {
127126
ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp);
128127

129128
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
130-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, builder.build());
129+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, builder.build());
131130
auditor = new AnomalyDetectionAuditor(client(), clusterService);
132131
waitForMlTemplates();
133132
}

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import static org.hamcrest.Matchers.equalTo;
4949
import static org.hamcrest.Matchers.is;
5050
import static org.hamcrest.Matchers.nullValue;
51-
import static org.mockito.Mockito.mock;
5251

5352
/**
5453
* Test that ML does not touch unnecessary indices when removing job index aliases
@@ -64,7 +63,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
6463
@Before
6564
public void createComponents() {
6665
Settings settings = nodeSettings(0);
67-
ThreadPool tp = mock(ThreadPool.class);
66+
ThreadPool tp = mockThreadPool();
6867
ClusterSettings clusterSettings = new ClusterSettings(settings,
6968
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
7069
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -75,7 +74,7 @@ public void createComponents() {
7574
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
7675
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
7776
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
78-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
77+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
7978
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
8079
jobResultsPersister = new JobResultsPersister(
8180
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
621621
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService);
622622
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
623623
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
624-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
624+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(
625+
threadPool,
626+
originSettingClient,
627+
clusterService,
628+
settings
629+
);
625630
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(resultsPersisterService);
626631
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
627632
JobResultsPersister jobResultsPersister =

0 commit comments

Comments
 (0)