23
23
import org .elasticsearch .threadpool .ThreadPool ;
24
24
import org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
25
25
import org .elasticsearch .xpack .core .ml .job .config .Job ;
26
+ import org .elasticsearch .xpack .core .ml .job .process .autodetect .output .FlushAcknowledgement ;
26
27
import org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeState ;
27
28
import org .elasticsearch .xpack .core .ml .job .snapshot .upgrade .SnapshotUpgradeTaskState ;
28
29
import org .elasticsearch .xpack .core .ml .utils .ExceptionsHelper ;
31
32
import org .elasticsearch .xpack .ml .job .persistence .StateStreamer ;
32
33
import org .elasticsearch .xpack .ml .job .process .autodetect .output .JobSnapshotUpgraderResultProcessor ;
33
34
import org .elasticsearch .xpack .ml .job .process .autodetect .params .AutodetectParams ;
35
+ import org .elasticsearch .xpack .ml .job .process .autodetect .params .FlushJobParams ;
34
36
import org .elasticsearch .xpack .ml .job .snapshot .upgrader .SnapshotUpgradeTask ;
35
37
import org .elasticsearch .xpack .ml .process .NativeStorageProvider ;
36
38
import org .elasticsearch .xpack .ml .process .writer .LengthEncodedWriter ;
37
39
38
40
import java .io .IOException ;
41
+ import java .time .Duration ;
39
42
import java .util .HashMap ;
40
43
import java .util .Map ;
41
44
import java .util .Objects ;
48
51
import static org .elasticsearch .xpack .ml .MachineLearning .UTILITY_THREAD_POOL_NAME ;
49
52
50
53
public final class JobModelSnapshotUpgrader {
51
-
54
+ private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration . ofSeconds ( 1 );
52
55
private static final Logger logger = LogManager .getLogger (JobModelSnapshotUpgrader .class );
53
56
54
57
private final SnapshotUpgradeTask task ;
@@ -97,7 +100,9 @@ void start() {
97
100
params ,
98
101
autodetectExecutorService ,
99
102
(reason ) -> {
100
- setTaskToFailed (reason , ActionListener .wrap (t -> {}, f -> {}));
103
+ setTaskToFailed (reason , ActionListener .wrap (t -> {
104
+ }, f -> {
105
+ }));
101
106
try {
102
107
nativeStorageProvider .cleanupLocalTmpStorage (task .getDescription ());
103
108
} catch (IOException e ) {
@@ -200,6 +205,24 @@ void writeHeader() throws IOException {
200
205
process .writeRecord (record );
201
206
}
202
207
208
+ FlushAcknowledgement waitFlushToCompletion (String flushId ) throws Exception {
209
+ logger .debug (() -> new ParameterizedMessage ("[{}] [{}] waiting for flush [{}]" , jobId , snapshotId , flushId ));
210
+
211
+ FlushAcknowledgement flushAcknowledgement ;
212
+ try {
213
+ flushAcknowledgement = processor .waitForFlushAcknowledgement (flushId , FLUSH_PROCESS_CHECK_FREQUENCY );
214
+ while (flushAcknowledgement == null ) {
215
+ checkProcessIsAlive ();
216
+ checkResultsProcessorIsAlive ();
217
+ flushAcknowledgement = processor .waitForFlushAcknowledgement (flushId , FLUSH_PROCESS_CHECK_FREQUENCY );
218
+ }
219
+ } finally {
220
+ processor .clearAwaitingFlush (flushId );
221
+ }
222
+ logger .debug (() -> new ParameterizedMessage ("[{}] [{}] flush completed [{}]" , jobId , snapshotId , flushId ));
223
+ return flushAcknowledgement ;
224
+ }
225
+
203
226
void restoreState () {
204
227
try {
205
228
process .restoreState (stateStreamer , params .modelSnapshot ());
@@ -209,6 +232,31 @@ void restoreState() {
209
232
ActionListener .wrap (t -> shutdown (e ), f -> shutdown (e )));
210
233
return ;
211
234
}
235
+ submitOperation (() -> {
236
+ String flushId = process .flushJob (FlushJobParams .builder ().waitForNormalization (false ).build ());
237
+ return waitFlushToCompletion (flushId );
238
+ }, (aVoid , e ) -> {
239
+ Runnable nextStep ;
240
+ if (e != null ) {
241
+ logger .error (
242
+ () -> new ParameterizedMessage (
243
+ "[{}] [{}] failed to flush after writing old state" ,
244
+ jobId ,
245
+ snapshotId
246
+ ),
247
+ e );
248
+ nextStep = () -> setTaskToFailed (
249
+ "Failed to flush after writing old state due to: " + e .getMessage (),
250
+ ActionListener .wrap (t -> shutdown (e ), f -> shutdown (e ))
251
+ );
252
+ } else {
253
+ nextStep = this ::requestStateWrite ;
254
+ }
255
+ threadPool .executor (UTILITY_THREAD_POOL_NAME ).execute (nextStep );
256
+ });
257
+ }
258
+
259
+ private void requestStateWrite () {
212
260
task .updatePersistentTaskState (
213
261
new SnapshotUpgradeTaskState (SnapshotUpgradeState .SAVING_NEW_STATE , task .getAllocationId (), "" ),
214
262
ActionListener .wrap (
@@ -282,6 +330,13 @@ private void checkProcessIsAlive() {
282
330
}
283
331
}
284
332
333
+ private void checkResultsProcessorIsAlive () {
334
+ if (processor .isFailed ()) {
335
+ // Don't log here - it just causes double logging when the exception gets logged
336
+ throw new ElasticsearchException ("[{}] Unexpected death of the result processor" , job .getId ());
337
+ }
338
+ }
339
+
285
340
void shutdown (Exception e ) {
286
341
// No point in sending an action to the executor if the process has died
287
342
if (process .isProcessAlive () == false ) {
0 commit comments