10
10
import org .elasticsearch .action .admin .cluster .node .hotthreads .NodeHotThreads ;
11
11
import org .elasticsearch .action .admin .cluster .node .hotthreads .NodesHotThreadsResponse ;
12
12
import org .elasticsearch .action .support .master .AcknowledgedResponse ;
13
+ import org .elasticsearch .common .CheckedConsumer ;
14
+ import org .elasticsearch .common .CheckedRunnable ;
13
15
import org .elasticsearch .common .unit .TimeValue ;
14
16
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
15
17
import org .elasticsearch .common .util .concurrent .ConcurrentMapLong ;
@@ -106,15 +108,15 @@ public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
106
108
Instant now = Instant .now ();
107
109
indexDocs (logger , "data" , numDocs , now .minus (Duration .ofDays (14 )).toEpochMilli (), now .toEpochMilli ());
108
110
109
- Job .Builder job = createScheduledJob ("lookback-job" );
111
+ Job .Builder job = createScheduledJob ("lookback-job-datafeed-recreated " );
110
112
111
- String datafeedId = "lookback-datafeed" ;
113
+ String datafeedId = "lookback-datafeed-datafeed-recreated " ;
112
114
DatafeedConfig datafeedConfig = createDatafeed (datafeedId , job .getId (), Arrays .asList ("data" ));
113
115
114
116
registerJob (job );
115
117
putJob (job );
116
118
117
- for ( int i = 0 ; i < 2 ; ++ i ) {
119
+ CheckedRunnable < Exception > openAndRunJob = () -> {
118
120
openJob (job .getId ());
119
121
assertBusy (() -> assertEquals (getJobStats (job .getId ()).get (0 ).getState (), JobState .OPENED ));
120
122
registerDatafeed (datafeedConfig );
@@ -129,7 +131,10 @@ public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
129
131
}, 60 , TimeUnit .SECONDS );
130
132
deleteDatafeed (datafeedId );
131
133
waitUntilJobIsClosed (job .getId ());
132
- }
134
+ };
135
+
136
+ openAndRunJob .run ();
137
+ openAndRunJob .run ();
133
138
}
134
139
135
140
public void testDatafeedTimingStats_DatafeedJobIdUpdated () throws Exception {
@@ -140,8 +145,8 @@ public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
140
145
Instant now = Instant .now ();
141
146
indexDocs (logger , "data" , numDocs , now .minus (Duration .ofDays (14 )).toEpochMilli (), now .toEpochMilli ());
142
147
143
- Job .Builder jobA = createScheduledJob ("lookback-job" );
144
- Job .Builder jobB = createScheduledJob ("other-lookback-job" );
148
+ Job .Builder jobA = createScheduledJob ("lookback-job-jobid-updated " );
149
+ Job .Builder jobB = createScheduledJob ("other-lookback-job-jobid-updated " );
145
150
for (Job .Builder job : Arrays .asList (jobA , jobB )) {
146
151
registerJob (job );
147
152
putJob (job );
@@ -152,11 +157,10 @@ public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
152
157
registerDatafeed (datafeedConfig );
153
158
putDatafeed (datafeedConfig );
154
159
155
- for ( Job .Builder job : Arrays . asList ( jobA , jobB , jobA )) {
160
+ CheckedConsumer < Job .Builder , Exception > openAndRunJob = job -> {
156
161
openJob (job .getId ());
157
162
assertBusy (() -> assertEquals (getJobStats (job .getId ()).get (0 ).getState (), JobState .OPENED ));
158
163
// Bind datafeedId to the current job on the list, timing stats are wiped out.
159
- updateDatafeed (new DatafeedUpdate .Builder (datafeedId ).setJobId (job .getId ()).build ());
160
164
// Datafeed did not do anything yet, hence search_count is equal to 0.
161
165
assertDatafeedStats (datafeedId , DatafeedState .STOPPED , job .getId (), equalTo (0L ));
162
166
startDatafeed (datafeedId , 0L , now .toEpochMilli ());
@@ -166,7 +170,13 @@ public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
166
170
assertDatafeedStats (datafeedId , DatafeedState .STOPPED , job .getId (), greaterThan (0L ));
167
171
}, 60 , TimeUnit .SECONDS );
168
172
waitUntilJobIsClosed (job .getId ());
169
- }
173
+ };
174
+
175
+ openAndRunJob .accept (jobA );
176
+ updateDatafeed (new DatafeedUpdate .Builder (datafeedId ).setJobId (jobB .getId ()).build ()); // wipes out timing stats
177
+ openAndRunJob .accept (jobB );
178
+ updateDatafeed (new DatafeedUpdate .Builder (datafeedId ).setJobId (jobA .getId ()).build ()); // wipes out timing stats
179
+ openAndRunJob .accept (jobA );
170
180
}
171
181
172
182
public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset () throws Exception {
@@ -177,11 +187,11 @@ public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() thro
177
187
Instant now = Instant .now ();
178
188
indexDocs (logger , "data" , numDocs , now .minus (Duration .ofDays (14 )).toEpochMilli (), now .toEpochMilli ());
179
189
180
- Job .Builder job = createScheduledJob ("lookback-job" );
190
+ Job .Builder job = createScheduledJob ("lookback-job-query-delay-updated " );
181
191
registerJob (job );
182
192
putJob (job );
183
193
184
- String datafeedId = "lookback-datafeed" ;
194
+ String datafeedId = "lookback-datafeed-query-delay-updated " ;
185
195
DatafeedConfig datafeedConfig = createDatafeed (datafeedId , job .getId (), Arrays .asList ("data" ));
186
196
registerDatafeed (datafeedConfig );
187
197
putDatafeed (datafeedConfig );
0 commit comments