7
7
8
8
import org .elasticsearch .common .xcontent .XContentElasticsearchExtension ;
9
9
import org .elasticsearch .core .internal .io .IOUtils ;
10
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
10
11
import org .elasticsearch .ElasticsearchStatusException ;
11
12
import org .elasticsearch .action .ActionListener ;
12
13
import org .elasticsearch .client .Client ;
15
16
import org .elasticsearch .common .component .AbstractComponent ;
16
17
import org .elasticsearch .common .settings .Setting ;
17
18
import org .elasticsearch .common .settings .Settings ;
19
+ import org .elasticsearch .common .unit .ByteSizeUnit ;
20
+ import org .elasticsearch .common .unit .ByteSizeValue ;
18
21
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
19
22
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
20
23
import org .elasticsearch .common .util .concurrent .ThreadContext ;
21
24
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
22
- import org .elasticsearch .common .xcontent .XContentBuilder ;
23
25
import org .elasticsearch .common .xcontent .XContentType ;
24
26
import org .elasticsearch .env .Environment ;
25
27
import org .elasticsearch .index .analysis .AnalysisRegistry ;
47
49
import org .elasticsearch .xpack .ml .job .persistence .JobResultsPersister ;
48
50
import org .elasticsearch .xpack .ml .job .persistence .StateStreamer ;
49
51
import org .elasticsearch .xpack .ml .job .process .DataCountsReporter ;
52
+ import org .elasticsearch .xpack .ml .job .process .NativeStorageProvider ;
50
53
import org .elasticsearch .xpack .ml .job .process .autodetect .output .AutoDetectResultProcessor ;
51
54
import org .elasticsearch .xpack .ml .job .process .autodetect .params .DataLoadParams ;
52
55
import org .elasticsearch .xpack .ml .job .process .autodetect .params .FlushJobParams ;
59
62
60
63
import java .io .IOException ;
61
64
import java .io .InputStream ;
65
+ import java .nio .file .Path ;
62
66
import java .time .Duration ;
63
67
import java .time .ZonedDateTime ;
64
68
import java .util .Date ;
@@ -96,6 +100,10 @@ public class AutodetectProcessManager extends AbstractComponent {
96
100
public static final Setting <Integer > MAX_OPEN_JOBS_PER_NODE =
97
101
Setting .intSetting ("xpack.ml.max_open_jobs" , MAX_RUNNING_JOBS_PER_NODE , 1 , Property .NodeScope );
98
102
103
+ // Undocumented setting for integration test purposes
104
+ public static final Setting <ByteSizeValue > MIN_DISK_SPACE_OFF_HEAP =
105
+ Setting .byteSizeSetting ("xpack.ml.min_disk_space_off_heap" , new ByteSizeValue (5 , ByteSizeUnit .GB ), Property .NodeScope );
106
+
99
107
private final Client client ;
100
108
private final Environment environment ;
101
109
private final ThreadPool threadPool ;
@@ -107,8 +115,12 @@ public class AutodetectProcessManager extends AbstractComponent {
107
115
private final JobResultsPersister jobResultsPersister ;
108
116
private final JobDataCountsPersister jobDataCountsPersister ;
109
117
118
+ private NativeStorageProvider nativeStorageProvider ;
110
119
private final ConcurrentMap <Long , ProcessContext > processByAllocation = new ConcurrentHashMap <>();
111
120
121
+ // a map that manages the allocation of temporary space to jobs
122
+ private final ConcurrentMap <String , Path > nativeTmpStorage = new ConcurrentHashMap <>();
123
+
112
124
private final int maxAllowedRunningJobs ;
113
125
114
126
private final NamedXContentRegistry xContentRegistry ;
@@ -133,6 +145,15 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
133
145
this .jobResultsPersister = jobResultsPersister ;
134
146
this .jobDataCountsPersister = jobDataCountsPersister ;
135
147
this .auditor = auditor ;
148
+ this .nativeStorageProvider = new NativeStorageProvider (environment , MIN_DISK_SPACE_OFF_HEAP .get (settings ));
149
+ }
150
+
151
+ public void onNodeStartup () {
152
+ try {
153
+ nativeStorageProvider .cleanupLocalTmpStorageInCaseOfUncleanShutdown ();
154
+ } catch (Exception e ) {
155
+ logger .warn ("Failed to cleanup native storage from previous invocation" , e );
156
+ }
136
157
}
137
158
138
159
public synchronized void closeAllJobsOnThisNode (String reason ) throws IOException {
@@ -251,17 +272,40 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<Flus
251
272
});
252
273
}
253
274
275
+ /**
276
+ * Request temporary storage to be used for the job
277
+ *
278
+ * @param jobTask The job task
279
+ * @param requestedSize requested size
280
+ * @return a Path to local storage or null if storage is not available
281
+ */
282
+ public Path tryGetTmpStorage (JobTask jobTask , ByteSizeValue requestedSize ) {
283
+ String jobId = jobTask .getJobId ();
284
+ Path path = nativeTmpStorage .get (jobId );
285
+ if (path == null ) {
286
+ path = nativeStorageProvider .tryGetLocalTmpStorage (jobId , requestedSize );
287
+ if (path != null ) {
288
+ nativeTmpStorage .put (jobId , path );
289
+ }
290
+ } else if (!nativeStorageProvider .localTmpStorageHasEnoughSpace (path , requestedSize )) {
291
+ // the previous tmp location ran out of disk space, do not allow further usage
292
+ return null ;
293
+ }
294
+ return path ;
295
+ }
296
+
254
297
/**
255
298
* Do a forecast for the running job.
256
299
*
257
300
* @param jobTask The job task
258
301
* @param params Forecast parameters
259
302
*/
260
303
public void forecastJob (JobTask jobTask , ForecastParams params , Consumer <Exception > handler ) {
261
- logger .debug ("Forecasting job {}" , jobTask .getJobId ());
304
+ String jobId = jobTask .getJobId ();
305
+ logger .debug ("Forecasting job {}" , jobId );
262
306
AutodetectCommunicator communicator = getOpenAutodetectCommunicator (jobTask );
263
307
if (communicator == null ) {
264
- String message = String .format (Locale .ROOT , "Cannot forecast because job [%s] is not open" , jobTask . getJobId () );
308
+ String message = String .format (Locale .ROOT , "Cannot forecast because job [%s] is not open" , jobId );
265
309
logger .debug (message );
266
310
handler .accept (ExceptionsHelper .conflictStatusException (message ));
267
311
return ;
@@ -271,7 +315,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
271
315
if (e == null ) {
272
316
handler .accept (null );
273
317
} else {
274
- String msg = String .format (Locale .ROOT , "[%s] exception while forecasting job" , jobTask . getJobId () );
318
+ String msg = String .format (Locale .ROOT , "[%s] exception while forecasting job" , jobId );
275
319
logger .error (msg , e );
276
320
handler .accept (ExceptionsHelper .serverError (msg , e ));
277
321
}
@@ -477,6 +521,11 @@ private Runnable onProcessCrash(JobTask jobTask) {
477
521
}
478
522
}
479
523
setJobState (jobTask , JobState .FAILED );
524
+ try {
525
+ removeTmpStorage (jobTask .getJobId ());
526
+ } catch (IOException e ) {
527
+ logger .error (new ParameterizedMessage ("[{}] Failed to delete temporary files" , jobTask .getJobId ()), e );
528
+ }
480
529
};
481
530
}
482
531
@@ -535,6 +584,12 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
535
584
// thread that gets into this method blocks until the first thread has finished closing the job
536
585
processContext .unlock ();
537
586
}
587
+ // delete any tmp storage
588
+ try {
589
+ removeTmpStorage (jobId );
590
+ } catch (IOException e ) {
591
+ logger .error (new ParameterizedMessage ("[{}]Failed to delete temporary files" , jobId ), e );
592
+ }
538
593
}
539
594
540
595
int numberOfOpenJobs () {
@@ -613,6 +668,13 @@ public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask
613
668
return Optional .of (new Tuple <>(communicator .getDataCounts (), communicator .getModelSizeStats ()));
614
669
}
615
670
671
+ private void removeTmpStorage (String jobId ) throws IOException {
672
+ Path path = nativeTmpStorage .get (jobId );
673
+ if (path != null ) {
674
+ nativeStorageProvider .cleanupLocalTmpStorage (path );
675
+ }
676
+ }
677
+
616
678
ExecutorService createAutodetectExecutorService (ExecutorService executorService ) {
617
679
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService (threadPool .getThreadContext ());
618
680
executorService .submit (autoDetectWorkerExecutor ::start );
0 commit comments