Skip to content

Commit e29974b

Browse files
committed
[ML] remove thread sleep from results persister
Having a thread sleep in a recurring action may cause issues on node shutdown. What if the thread is sleeping while a nice shutdown is occurring? Since these retry timeouts can extend to a larger period of time, we should instead use scheduled tasks + the threadpool. This allows the retries to be effectively canceled instead of waiting for a thread to wake back up. closes elastic#65890
1 parent 0693f01 commit e29974b

File tree

9 files changed

+250
-115
lines changed

9 files changed

+250
-115
lines changed

server/src/main/java/org/elasticsearch/action/support/RetryableAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ public void onRejection(Exception e) {
114114

115115
public abstract boolean shouldRetry(Exception e);
116116

117+
protected long calculateDelay(long previousDelay) {
118+
return Math.min(previousDelay * 2, Integer.MAX_VALUE);
119+
}
120+
117121
public void onFinished() {
118122
}
119123

@@ -148,7 +152,7 @@ public void onFailure(Exception e) {
148152
} else {
149153
addException(e);
150154

151-
final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
155+
final long nextDelayMillisBound = calculateDelay(delayMillisBound);
152156
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
153157
final Runnable runnable = createRunnable(retryingListener);
154158
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void createComponents() throws Exception {
153153
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
154154

155155
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
156-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
156+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
157157
resultProcessor = new AutodetectResultProcessor(
158158
client(),
159159
auditor,

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void createComponents() {
5959
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
6060

6161
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
62-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
62+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
6363
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
6464
jobResultsPersister = new JobResultsPersister(
6565
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void createComponents() throws Exception {
125125
ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp);
126126

127127
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
128-
resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, builder.build());
128+
resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, builder.build());
129129
auditor = new AnomalyDetectionAuditor(client(), clusterService);
130130
waitForMlTemplates();
131131
}

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void createComponents() {
7373
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
7474
ClusterService clusterService = new ClusterService(settings, clusterSettings, tp);
7575
OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN);
76-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
76+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings);
7777
jobResultsProvider = new JobResultsProvider(client(), settings, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
7878
jobResultsPersister = new JobResultsPersister(
7979
originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), clusterService));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
635635
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService);
636636
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
637637
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
638-
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
638+
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(
639+
threadPool,
640+
originSettingClient,
641+
clusterService,
642+
settings
643+
);
639644
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(resultsPersisterService);
640645
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
641646
JobResultsPersister jobResultsPersister =

0 commit comments

Comments
 (0)