Skip to content

Commit eb91e35

Browse files
authored
[ML] remove thread sleep from results persister (#65904)
* [ML] remove thread sleep from results persister Having a thread sleep in a recurring action may cause issues on node shutdown. What if the thread is sleeping while a nice shutdown is occurring? Since these retry timeouts can extend to a larger period of time, we should instead use scheduled tasks + the threadpool. This allows the retries to be effectively canceled instead of waiting for a thread to wake back up. closes #65890
1 parent e144471 commit eb91e35

File tree

13 files changed

+407
-144
lines changed

13 files changed

+407
-144
lines changed

server/src/main/java/org/elasticsearch/action/support/RetryableAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ public void onRejection(Exception e) {
114114

115115
public abstract boolean shouldRetry(Exception e);
116116

117+
protected long calculateDelay(long previousDelay) {
118+
return Math.min(previousDelay * 2, Integer.MAX_VALUE);
119+
}
120+
121+
protected long minimumDelayMillis() {
122+
return 1L;
123+
}
124+
117125
public void onFinished() {
118126
}
119127

@@ -148,10 +156,10 @@ public void onFailure(Exception e) {
148156
} else {
149157
addException(e);
150158

151-
final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
159+
final long nextDelayMillisBound = calculateDelay(delayMillisBound);
152160
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
153161
final Runnable runnable = createRunnable(retryingListener);
154-
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
162+
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + minimumDelayMillis();
155163
if (isDone.get() == false) {
156164
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
157165
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/BulkFailureRetryIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void cleanUpTest() {
6565
.putNull("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob")
6666
.putNull("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister")
6767
.putNull("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output")
68+
.putNull("logger.org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService")
6869
.build()).get();
6970
cleanUp();
7071
}
@@ -121,6 +122,7 @@ public void testBulkFailureRetries() throws Exception {
121122
.setTransientSettings(Settings.builder()
122123
.put("logger.org.elasticsearch.xpack.ml.datafeed.DatafeedJob", "TRACE")
123124
.put("logger.org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister", "TRACE")
125+
.put("logger.org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService", "TRACE")
124126
.put("logger.org.elasticsearch.xpack.ml.job.process.autodetect.output", "TRACE")
125127
.put("xpack.ml.persist_results_max_retries", "15")
126128
.build()).get();

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.xcontent.ToXContent;
2929
import org.elasticsearch.common.xcontent.XContentParser;
3030
import org.elasticsearch.index.reindex.ReindexPlugin;
31+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
3132
import org.elasticsearch.plugins.Plugin;
3233
import org.elasticsearch.search.SearchHit;
3334
import org.elasticsearch.threadpool.ThreadPool;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
5758
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
5859
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
60+
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
5961
import org.elasticsearch.xpack.ilm.IndexLifecycle;
6062
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
6163
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
@@ -126,7 +128,10 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
126128
protected Collection<Class<? extends Plugin>> getPlugins() {
127129
return pluginList(
128130
LocalStateMachineLearning.class,
131+
DataStreamsPlugin.class,
132+
IngestCommonPlugin.class,
129133
ReindexPlugin.class,
134+
MockPainlessScriptEngine.TestPlugin.class,
130135
// ILM is required for .ml-state template index settings
131136
IndexLifecycle.class);
132137
}
@@ -141,7 +146,7 @@ public void createComponents() throws Exception {
141146
renormalizer = mock(Renormalizer.class);
142147
process = mock(AutodetectProcess.class);
143148
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
144-
ThreadPool tp = mock(ThreadPool.class);
149+
ThreadPool tp = mockThreadPool();
145150
Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();
146151
ClusterSettings clusterSettings = new ClusterSettings(settings,
147152
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
@@ -151,9 +156,8 @@ public void createComponents() throws Exception {
151156
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
152157
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
153158
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
154-
155159
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
156-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
160+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
157161
resultProcessor = new AutodetectResultProcessor(
158162
client(),
159163
auditor,

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.concurrent.atomic.AtomicReference;
3737

3838
import static org.hamcrest.CoreMatchers.equalTo;
39-
import static org.mockito.Mockito.mock;
4039

4140
public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
4241

@@ -48,7 +47,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
4847
@Before
4948
public void createComponents() {
5049
Settings settings = nodeSettings(0);
51-
ThreadPool tp = mock(ThreadPool.class);
50+
ThreadPool tp = mockThreadPool();
5251
ClusterSettings clusterSettings = new ClusterSettings(settings,
5352
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
5453
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -59,7 +58,7 @@ public void createComponents() {
5958
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
6059

6160
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
62-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
61+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
6362
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
6463
jobResultsPersister = new JobResultsPersister(
6564
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import static org.hamcrest.Matchers.not;
101101
import static org.hamcrest.collection.IsEmptyCollection.empty;
102102
import static org.hamcrest.core.Is.is;
103-
import static org.mockito.Mockito.mock;
104103

105104

106105
public class JobResultsProviderIT extends MlSingleNodeTestCase {
@@ -114,7 +113,7 @@ public void createComponents() throws Exception {
114113
Settings.Builder builder = Settings.builder()
115114
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
116115
jobProvider = new JobResultsProvider(client(), builder.build(), new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
117-
ThreadPool tp = mock(ThreadPool.class);
116+
ThreadPool tp = mockThreadPool();
118117
ClusterSettings clusterSettings = new ClusterSettings(builder.build(),
119118
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
120119
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -125,7 +124,7 @@ public void createComponents() throws Exception {
125124
ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp);
126125

127126
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
128-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, builder.build());
127+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, builder.build());
129128
auditor = new AnomalyDetectionAuditor(client(), clusterService);
130129
waitForMlTemplates();
131130
}

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import static org.hamcrest.Matchers.equalTo;
4848
import static org.hamcrest.Matchers.is;
4949
import static org.hamcrest.Matchers.nullValue;
50-
import static org.mockito.Mockito.mock;
5150

5251
/**
5352
* Test that ML does not touch unnecessary indices when removing job index aliases
@@ -63,7 +62,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
6362
@Before
6463
public void createComponents() {
6564
Settings settings = nodeSettings(0);
66-
ThreadPool tp = mock(ThreadPool.class);
65+
ThreadPool tp = mockThreadPool();
6766
ClusterSettings clusterSettings = new ClusterSettings(settings,
6867
new HashSet<>(Arrays.asList(InferenceProcessor.MAX_INFERENCE_PROCESSORS,
6968
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
@@ -73,7 +72,7 @@ public void createComponents() {
7372
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
7473
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
7574
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
76-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
75+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
7776
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
7877
jobResultsPersister = new JobResultsPersister(
7978
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
635635
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService);
636636
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
637637
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
638-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
638+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(
639+
threadPool,
640+
originSettingClient,
641+
clusterService,
642+
settings
643+
);
639644
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(resultsPersisterService);
640645
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
641646
JobResultsPersister jobResultsPersister =

0 commit comments

Comments
 (0)