5
5
*/
6
6
package org .elasticsearch .xpack .ml .integration ;
7
7
8
+ import org .elasticsearch .action .search .SearchResponse ;
9
+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
8
10
import org .elasticsearch .common .unit .TimeValue ;
11
+ import org .elasticsearch .search .SearchHit ;
12
+ import org .elasticsearch .xpack .core .ml .action .FlushJobAction ;
9
13
import org .elasticsearch .xpack .core .ml .action .PersistJobAction ;
10
14
import org .elasticsearch .xpack .core .ml .job .config .AnalysisConfig ;
11
15
import org .elasticsearch .xpack .core .ml .job .config .DataDescription ;
12
16
import org .elasticsearch .xpack .core .ml .job .config .Detector ;
13
17
import org .elasticsearch .xpack .core .ml .job .config .Job ;
18
+ import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
14
19
import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSnapshot ;
15
20
import org .junit .After ;
16
21
17
22
import java .util .Arrays ;
18
23
import java .util .List ;
19
24
import java .util .stream .Collectors ;
20
25
26
+ import static org .hamcrest .Matchers .equalTo ;
27
+ import static org .hamcrest .Matchers .is ;
28
+
21
29
public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
22
30
31
+ private static final long BUCKET_SPAN_SECONDS = 300 ;
32
+ private static final TimeValue BUCKET_SPAN = TimeValue .timeValueSeconds (BUCKET_SPAN_SECONDS );
33
+
23
34
@ After
24
35
public void cleanUpJobs () {
25
36
cleanUp ();
@@ -39,19 +50,153 @@ public void testPersistJob() throws Exception {
39
50
});
40
51
}
41
52
42
- private void runJob (String jobId ) throws Exception {
43
- TimeValue bucketSpan = TimeValue .timeValueMinutes (5 );
53
+ // check that state is persisted after time has been advanced even if no new data is seen in the interim
54
+ public void testPersistJobOnGracefulShutdown_givenTimeAdvancedAfterNoNewData () throws Exception {
55
+ String jobId = "time-advanced-after-no-new-data-test" ;
56
+
57
+ // open and run a job with a small data set
58
+ runJob (jobId );
59
+ FlushJobAction .Response flushResponse = flushJob (jobId , true );
60
+
61
+ closeJob (jobId );
62
+
63
+ // Check that state has been persisted
64
+ SearchResponse stateDocsResponse1 = client ().prepareSearch (AnomalyDetectorsIndex .jobStateIndexPattern ())
65
+ .setFetchSource (false )
66
+ .setTrackTotalHits (true )
67
+ .setSize (10000 )
68
+ .get ();
69
+
70
+ int numQuantileRecords = 0 ;
71
+ int numStateRecords = 0 ;
72
+ for (SearchHit hit : stateDocsResponse1 .getHits ().getHits ()) {
73
+ logger .info (hit .getId ());
74
+ if (hit .getId ().contains ("quantiles" )) {
75
+ ++numQuantileRecords ;
76
+ } else if (hit .getId ().contains ("model_state" )) {
77
+ ++numStateRecords ;
78
+ }
79
+ }
80
+ assertThat (stateDocsResponse1 .getHits ().getTotalHits ().value , equalTo (2L ));
81
+ assertThat (numQuantileRecords , equalTo (1 ));
82
+ assertThat (numStateRecords , equalTo (1 ));
83
+
84
+ // re-open the job
85
+ openJob (jobId );
86
+
87
+ // advance time
88
+ long lastFinalizedBucketEnd = flushResponse .getLastFinalizedBucketEnd ().getTime ();
89
+ FlushJobAction .Request advanceTimeRequest = new FlushJobAction .Request (jobId );
90
+ advanceTimeRequest .setAdvanceTime (String .valueOf (lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000 ));
91
+ advanceTimeRequest .setCalcInterim (false );
92
+ assertThat (client ().execute (FlushJobAction .INSTANCE , advanceTimeRequest ).actionGet ().isFlushed (), is (true ));
93
+
94
+ closeJob (jobId );
95
+
96
+ // Check that a new state record exists.
97
+ SearchResponse stateDocsResponse2 = client ().prepareSearch (AnomalyDetectorsIndex .jobStateIndexPattern ())
98
+ .setFetchSource (false )
99
+ .setTrackTotalHits (true )
100
+ .setSize (10000 )
101
+ .get ();
102
+
103
+ numQuantileRecords = 0 ;
104
+ numStateRecords = 0 ;
105
+ for (SearchHit hit : stateDocsResponse2 .getHits ().getHits ()) {
106
+ logger .info (hit .getId ());
107
+ if (hit .getId ().contains ("quantiles" )) {
108
+ ++numQuantileRecords ;
109
+ } else if (hit .getId ().contains ("model_state" )) {
110
+ ++numStateRecords ;
111
+ }
112
+ }
113
+
114
+ assertThat (stateDocsResponse2 .getHits ().getTotalHits ().value , equalTo (3L ));
115
+ assertThat (numQuantileRecords , equalTo (1 ));
116
+ assertThat (numStateRecords , equalTo (2 ));
117
+
118
+ deleteJob (jobId );
119
+ }
120
+
121
+ // Check an edge case where time is manually advanced before any valid data is seen
122
+ public void testPersistJobOnGracefulShutdown_givenNoDataAndTimeAdvanced () throws Exception {
123
+ String jobId = "no-data-and-time-advanced-test" ;
124
+
125
+ createAndOpenJob (jobId );
126
+
127
+ // Manually advance time.
128
+ FlushJobAction .Request advanceTimeRequest = new FlushJobAction .Request (jobId );
129
+ advanceTimeRequest .setAdvanceTime (String .valueOf (BUCKET_SPAN_SECONDS * 1000 ));
130
+ advanceTimeRequest .setCalcInterim (false );
131
+ assertThat (client ().execute (FlushJobAction .INSTANCE , advanceTimeRequest ).actionGet ().isFlushed (), is (true ));
132
+
133
+ closeJob (jobId );
134
+
135
+ // Check that state has been persisted
136
+ SearchResponse stateDocsResponse = client ().prepareSearch (AnomalyDetectorsIndex .jobStateIndexPattern ())
137
+ .setFetchSource (false )
138
+ .setTrackTotalHits (true )
139
+ .setSize (10000 )
140
+ .get ();
141
+
142
+ int numQuantileRecords = 0 ;
143
+ int numStateRecords = 0 ;
144
+ for (SearchHit hit : stateDocsResponse .getHits ().getHits ()) {
145
+ logger .info (hit .getId ());
146
+ if (hit .getId ().contains ("quantiles" )) {
147
+ ++numQuantileRecords ;
148
+ } else if (hit .getId ().contains ("model_state" )) {
149
+ ++numStateRecords ;
150
+ }
151
+ }
152
+ assertThat (stateDocsResponse .getHits ().getTotalHits ().value , equalTo (2L ));
153
+ assertThat (numQuantileRecords , equalTo (1 ));
154
+ assertThat (numStateRecords , equalTo (1 ));
155
+
156
+ // now check that the job can be happily restored - even though no data has been seen
157
+ AcknowledgedResponse ack = openJob (jobId );
158
+ assertTrue (ack .isAcknowledged ());
159
+
160
+ closeJob (jobId );
161
+ deleteJob (jobId );
162
+ }
163
+
164
+ // Check an edge case where a job is opened and then immediately closed
165
+ public void testPersistJobOnGracefulShutdown_givenNoDataAndNoTimeAdvance () throws Exception {
166
+ String jobId = "no-data-and-no-time-advance-test" ;
167
+
168
+ createAndOpenJob (jobId );
169
+
170
+ closeJob (jobId );
171
+
172
+ // Check that state has not been persisted
173
+ SearchResponse stateDocsResponse = client ().prepareSearch (AnomalyDetectorsIndex .jobStateIndexPattern ())
174
+ .setFetchSource (false )
175
+ .setTrackTotalHits (true )
176
+ .setSize (10000 )
177
+ .get ();
178
+
179
+ assertThat (stateDocsResponse .getHits ().getTotalHits ().value , equalTo (0L ));
180
+
181
+ deleteJob (jobId );
182
+ }
183
+
184
+ private void createAndOpenJob (String jobId ) throws Exception {
44
185
Detector .Builder detector = new Detector .Builder ("count" , null );
45
186
AnalysisConfig .Builder analysisConfig = new AnalysisConfig .Builder (Arrays .asList (detector .build ()));
46
- analysisConfig .setBucketSpan (bucketSpan );
187
+ analysisConfig .setBucketSpan (BUCKET_SPAN );
47
188
Job .Builder job = new Job .Builder (jobId );
48
189
job .setAnalysisConfig (analysisConfig );
49
190
job .setDataDescription (new DataDescription .Builder ());
50
191
registerJob (job );
51
192
putJob (job );
52
193
53
194
openJob (job .getId ());
54
- List <String > data = generateData (System .currentTimeMillis (), bucketSpan , 10 , bucketIndex -> randomIntBetween (10 , 20 ));
55
- postData (job .getId (), data .stream ().collect (Collectors .joining ()));
195
+ }
196
+
197
+ private void runJob (String jobId ) throws Exception {
198
+ createAndOpenJob (jobId );
199
+ List <String > data = generateData (System .currentTimeMillis (), BUCKET_SPAN , 10 , bucketIndex -> randomIntBetween (10 , 20 ));
200
+ postData (jobId , data .stream ().collect (Collectors .joining ()));
56
201
}
57
202
}
0 commit comments