14
14
import org .elasticsearch .common .logging .Loggers ;
15
15
import org .elasticsearch .common .unit .ByteSizeUnit ;
16
16
import org .elasticsearch .common .unit .ByteSizeValue ;
17
+ import org .elasticsearch .common .unit .TimeValue ;
18
+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
19
+ import org .elasticsearch .common .util .concurrent .FutureUtils ;
17
20
import org .elasticsearch .xpack .core .ml .MachineLearningField ;
18
21
import org .elasticsearch .xpack .core .ml .action .PutJobAction ;
19
22
import org .elasticsearch .xpack .core .ml .action .UpdateJobAction ;
30
33
import org .elasticsearch .xpack .core .ml .job .results .ForecastRequestStats ;
31
34
import org .elasticsearch .xpack .core .ml .job .results .Influencer ;
32
35
import org .elasticsearch .xpack .core .ml .job .results .ModelPlot ;
36
+ import org .elasticsearch .xpack .ml .MachineLearning ;
33
37
import org .elasticsearch .xpack .ml .job .persistence .JobProvider ;
34
38
import org .elasticsearch .xpack .ml .job .persistence .JobResultsPersister ;
35
39
import org .elasticsearch .xpack .ml .job .process .autodetect .AutodetectProcess ;
43
47
import java .util .List ;
44
48
import java .util .Objects ;
45
49
import java .util .concurrent .CountDownLatch ;
50
+ import java .util .concurrent .Future ;
46
51
import java .util .concurrent .Semaphore ;
47
52
import java .util .concurrent .TimeUnit ;
48
53
import java .util .concurrent .TimeoutException ;
@@ -71,6 +76,13 @@ public class AutoDetectResultProcessor {
71
76
72
77
private static final Logger LOGGER = Loggers .getLogger (AutoDetectResultProcessor .class );
73
78
79
+ /**
80
+ * This is how far behind real-time we'll update the job with the latest established model memory.
81
+ * If more updates are received during the delay period then they'll take precedence.
82
+ * As a result there will be at most one update of established model memory per delay period.
83
+ */
84
+ private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue .timeValueSeconds (5 );
85
+
74
86
private final Client client ;
75
87
private final Auditor auditor ;
76
88
private final String jobId ;
@@ -90,8 +102,10 @@ public class AutoDetectResultProcessor {
90
102
* New model size stats are read as the process is running
91
103
*/
92
104
private volatile ModelSizeStats latestModelSizeStats ;
105
+ private volatile Date latestDateForEstablishedModelMemoryCalc ;
93
106
private volatile long latestEstablishedModelMemory ;
94
107
private volatile boolean haveNewLatestModelSizeStats ;
108
+ private Future <?> scheduledEstablishedModelMemoryUpdate ; // only accessed in synchronized methods
95
109
96
110
public AutoDetectResultProcessor (Client client , Auditor auditor , String jobId , Renormalizer renormalizer , JobResultsPersister persister ,
97
111
JobProvider jobProvider , ModelSizeStats latestModelSizeStats , boolean restoredSnapshot ) {
@@ -148,6 +162,7 @@ public void process(AutodetectProcess process) {
148
162
}
149
163
150
164
LOGGER .info ("[{}] {} buckets parsed from autodetect output" , jobId , bucketCount );
165
+ runEstablishedModelMemoryUpdate (true );
151
166
} catch (Exception e ) {
152
167
failed = true ;
153
168
@@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) {
194
209
// persist after deleting interim results in case the new
195
210
// results are also interim
196
211
context .bulkResultsPersister .persistBucket (bucket ).executeRequest ();
212
+ latestDateForEstablishedModelMemoryCalc = bucket .getTimestamp ();
197
213
++bucketCount ;
198
214
199
215
// if we haven't previously set established model memory, consider trying again after
200
- // a reasonable amount of time has elapsed since the last model size stats update
216
+ // a reasonable number of buckets have elapsed since the last model size stats update
201
217
long minEstablishedTimespanMs = JobProvider .BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket .getBucketSpan () * 1000L ;
202
- if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0
203
- && bucket .getTimestamp ().getTime () > latestModelSizeStats .getTimestamp ().getTime () + minEstablishedTimespanMs ) {
204
- persister .commitResultWrites (context .jobId );
205
- updateEstablishedModelMemoryOnJob (bucket .getTimestamp (), latestModelSizeStats );
218
+ if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc .getTime ()
219
+ > latestModelSizeStats .getTimestamp ().getTime () + minEstablishedTimespanMs ) {
220
+ scheduleEstablishedModelMemoryUpdate (ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY );
206
221
haveNewLatestModelSizeStats = false ;
207
222
}
208
223
}
@@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat
293
308
persister .persistModelSizeStats (modelSizeStats );
294
309
notifyModelMemoryStatusChange (context , modelSizeStats );
295
310
latestModelSizeStats = modelSizeStats ;
311
+ latestDateForEstablishedModelMemoryCalc = modelSizeStats .getTimestamp ();
296
312
haveNewLatestModelSizeStats = true ;
297
313
298
314
// This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
299
315
// because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
300
316
// we'll NEVER consider memory usage to be established during this period
301
317
if (restoredSnapshot || bucketCount >= JobProvider .BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE ) {
302
- // We need to make all results written up to and including these stats available for the established memory calculation
303
- persister .commitResultWrites (context .jobId );
304
- updateEstablishedModelMemoryOnJob (modelSizeStats .getTimestamp (), modelSizeStats );
318
+ scheduleEstablishedModelMemoryUpdate (ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY );
305
319
}
306
320
}
307
321
@@ -348,26 +362,91 @@ public void onFailure(Exception e) {
348
362
});
349
363
}
350
364
351
- private void updateEstablishedModelMemoryOnJob (Date latestBucketTimestamp , ModelSizeStats modelSizeStats ) {
352
- jobProvider .getEstablishedMemoryUsage (jobId , latestBucketTimestamp , modelSizeStats , establishedModelMemory -> {
353
- JobUpdate update = new JobUpdate .Builder (jobId )
354
- .setEstablishedModelMemory (establishedModelMemory ).build ();
355
- UpdateJobAction .Request updateRequest = UpdateJobAction .Request .internal (jobId , update );
356
- updateRequest .setWaitForAck (false );
357
-
358
- executeAsyncWithOrigin (client , ML_ORIGIN , UpdateJobAction .INSTANCE , updateRequest , new ActionListener <PutJobAction .Response >() {
359
- @ Override
360
- public void onResponse (PutJobAction .Response response ) {
361
- latestEstablishedModelMemory = establishedModelMemory ;
362
- LOGGER .debug ("[{}] Updated job with established model memory [{}]" , jobId , establishedModelMemory );
363
- }
365
+ /**
366
+ * The purpose of this method is to avoid saturating the cluster state update thread
367
+ * when a lookback job is churning through buckets very fast and the memory usage of
368
+ * the job is changing regularly. The idea is to only update the established model
369
+ * memory associated with the job a few seconds after the new value has been received.
370
+ * If more updates are received during the delay period then they simply replace the
371
+ * value that originally caused the update to be scheduled. This rate limits cluster
372
+ * state updates due to established model memory changing to one per job per delay period.
373
+ * (In reality updates will only occur this rapidly during lookback. During real-time
374
+ * operation the limit of one model size stats document per bucket will mean there is a
375
+ * maximum of one cluster state update per job per bucket, and usually the bucket span
376
+ * is 5 minutes or more.)
377
+ * @param delay The delay before updating established model memory.
378
+ */
379
+ synchronized void scheduleEstablishedModelMemoryUpdate (TimeValue delay ) {
364
380
365
- @ Override
366
- public void onFailure (Exception e ) {
367
- LOGGER .error ("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]" ,
368
- e );
381
+ if (scheduledEstablishedModelMemoryUpdate == null ) {
382
+ try {
383
+ scheduledEstablishedModelMemoryUpdate = client .threadPool ().schedule (delay , MachineLearning .UTILITY_THREAD_POOL_NAME ,
384
+ () -> runEstablishedModelMemoryUpdate (false ));
385
+ LOGGER .trace ("[{}] Scheduled established model memory update to run in [{}]" , jobId , delay );
386
+ } catch (EsRejectedExecutionException e ) {
387
+ if (e .isExecutorShutdown ()) {
388
+ LOGGER .debug ("failed to schedule established model memory update; shutting down" , e );
389
+ } else {
390
+ throw e ;
369
391
}
370
- });
392
+ }
393
+ }
394
+ }
395
+
396
+ /**
397
+ * This method is called from two places:
398
+ * - From the {@link Future} used for delayed updates
399
+ * - When shutting down this result processor
400
+ * When shutting down the result processor it's only necessary to do anything
401
+ * if an update has been scheduled, but we want to do the update immediately.
402
+ * Despite cancelling the scheduled update in this case, it's possible that
403
+ * it's already started running, in which case this method will get called
404
+ * twice in quick succession. But the second call will do nothing, as
405
+ * <code>scheduledEstablishedModelMemoryUpdate</code> will have been reset
406
+ * to <code>null</code> by the first call.
407
+ */
408
+ private synchronized void runEstablishedModelMemoryUpdate (boolean cancelExisting ) {
409
+
410
+ if (scheduledEstablishedModelMemoryUpdate != null ) {
411
+ if (cancelExisting ) {
412
+ LOGGER .debug ("[{}] Bringing forward previously scheduled established model memory update" , jobId );
413
+ FutureUtils .cancel (scheduledEstablishedModelMemoryUpdate );
414
+ }
415
+ scheduledEstablishedModelMemoryUpdate = null ;
416
+ updateEstablishedModelMemoryOnJob ();
417
+ }
418
+ }
419
+
420
+ private void updateEstablishedModelMemoryOnJob () {
421
+
422
+ // Copy these before committing writes, so the calculation is done based on committed documents
423
+ Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc ;
424
+ ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats ;
425
+
426
+ // We need to make all results written up to and including these stats available for the established memory calculation
427
+ persister .commitResultWrites (jobId );
428
+
429
+ jobProvider .getEstablishedMemoryUsage (jobId , latestBucketTimestamp , modelSizeStatsForCalc , establishedModelMemory -> {
430
+ if (latestEstablishedModelMemory != establishedModelMemory ) {
431
+ JobUpdate update = new JobUpdate .Builder (jobId ).setEstablishedModelMemory (establishedModelMemory ).build ();
432
+ UpdateJobAction .Request updateRequest = UpdateJobAction .Request .internal (jobId , update );
433
+ updateRequest .setWaitForAck (false );
434
+
435
+ executeAsyncWithOrigin (client , ML_ORIGIN , UpdateJobAction .INSTANCE , updateRequest ,
436
+ new ActionListener <PutJobAction .Response >() {
437
+ @ Override
438
+ public void onResponse (PutJobAction .Response response ) {
439
+ latestEstablishedModelMemory = establishedModelMemory ;
440
+ LOGGER .debug ("[{}] Updated job with established model memory [{}]" , jobId , establishedModelMemory );
441
+ }
442
+
443
+ @ Override
444
+ public void onFailure (Exception e ) {
445
+ LOGGER .error ("[" + jobId + "] Failed to update job with new established model memory [" +
446
+ establishedModelMemory + "]" , e );
447
+ }
448
+ });
449
+ }
371
450
}, e -> LOGGER .error ("[" + jobId + "] Failed to calculate established model memory" , e ));
372
451
}
373
452
0 commit comments