7
7
8
8
import org .apache .http .entity .ContentType ;
9
9
import org .apache .http .entity .StringEntity ;
10
- import org .apache .lucene .util .LuceneTestCase ;
11
10
import org .elasticsearch .Version ;
12
11
import org .elasticsearch .client .Request ;
13
12
import org .elasticsearch .client .Response ;
13
+ import org .elasticsearch .client .core .IndexerState ;
14
14
import org .elasticsearch .client .dataframe .GetDataFrameTransformStatsResponse ;
15
15
import org .elasticsearch .client .dataframe .transforms .DataFrameTransformConfig ;
16
16
import org .elasticsearch .client .dataframe .transforms .DataFrameTransformStats ;
28
28
import org .elasticsearch .common .xcontent .XContentBuilder ;
29
29
import org .elasticsearch .common .xcontent .XContentParser ;
30
30
import org .elasticsearch .common .xcontent .XContentType ;
31
+ import org .elasticsearch .common .xcontent .support .XContentMapValues ;
31
32
import org .elasticsearch .search .aggregations .AggregationBuilders ;
32
33
import org .elasticsearch .search .aggregations .AggregatorFactories ;
33
34
import org .elasticsearch .xpack .test .rest .XPackRestTestConstants ;
37
38
import java .util .ArrayList ;
38
39
import java .util .Collection ;
39
40
import java .util .List ;
41
+ import java .util .Map ;
40
42
import java .util .concurrent .TimeUnit ;
43
+ import java .util .function .Consumer ;
41
44
import java .util .stream .Collectors ;
42
45
import java .util .stream .Stream ;
43
46
48
51
import static org .hamcrest .Matchers .hasSize ;
49
52
import static org .hamcrest .Matchers .oneOf ;
50
53
51
- @ LuceneTestCase .AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/43662" )
52
54
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
53
55
54
56
private static final Version UPGRADE_FROM_VERSION = Version .fromString (System .getProperty ("tests.upgrade_from_version" ));
@@ -80,11 +82,18 @@ protected static void waitForPendingDataFrameTasks() throws Exception {
80
82
*/
81
83
public void testDataFramesRollingUpgrade () throws Exception {
82
84
assumeTrue ("Continuous data frames not supported until 7.3" , UPGRADE_FROM_VERSION .onOrAfter (Version .V_7_3_0 ));
85
+ Request adjustLoggingLevels = new Request ("PUT" , "/_cluster/settings" );
86
+ adjustLoggingLevels .setJsonEntity (
87
+ "{\" transient\" : {" +
88
+ "\" logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\" : \" trace\" ," +
89
+ "\" logger.org.elasticsearch.xpack.dataframe\" : \" trace\" }}" );
90
+ client ().performRequest (adjustLoggingLevels );
83
91
Request waitForYellow = new Request ("GET" , "/_cluster/health" );
84
92
waitForYellow .addParameter ("wait_for_nodes" , "3" );
85
93
waitForYellow .addParameter ("wait_for_status" , "yellow" );
86
94
switch (CLUSTER_TYPE ) {
87
95
case OLD :
96
+ client ().performRequest (waitForYellow );
88
97
createAndStartContinuousDataFrame ();
89
98
break ;
90
99
case MIXED :
@@ -113,35 +122,44 @@ private void cleanUpTransforms() throws Exception {
113
122
114
123
private void createAndStartContinuousDataFrame () throws Exception {
115
124
createIndex (CONTINUOUS_DATA_FRAME_SOURCE );
116
- long totalDocsWritten = 0 ;
125
+ long totalDocsWrittenSum = 0 ;
117
126
for (TimeValue bucket : BUCKETS ) {
118
127
int docs = randomIntBetween (1 , 25 );
119
128
putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , bucket , ENTITIES );
120
- totalDocsWritten += docs * ENTITIES .size ();
129
+ totalDocsWrittenSum += docs * ENTITIES .size ();
121
130
}
122
-
131
+ long totalDocsWritten = totalDocsWrittenSum ;
123
132
DataFrameTransformConfig config = DataFrameTransformConfig .builder ()
124
- .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (30 )))
133
+ .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (1 )))
125
134
.setPivotConfig (PivotConfig .builder ()
126
135
.setAggregations (new AggregatorFactories .Builder ().addAggregator (AggregationBuilders .avg ("stars" ).field ("stars" )))
127
136
.setGroups (GroupConfig .builder ().groupBy ("user_id" , TermsGroupSource .builder ().setField ("user_id" ).build ()).build ())
128
137
.build ())
129
138
.setDest (DestConfig .builder ().setIndex (CONTINUOUS_DATA_FRAME_ID + "_idx" ).build ())
130
139
.setSource (SourceConfig .builder ().setIndex (CONTINUOUS_DATA_FRAME_SOURCE ).build ())
131
140
.setId (CONTINUOUS_DATA_FRAME_ID )
141
+ .setFrequency (TimeValue .timeValueSeconds (1 ))
132
142
.build ();
133
143
putTransform (CONTINUOUS_DATA_FRAME_ID , config );
134
144
135
145
startTransform (CONTINUOUS_DATA_FRAME_ID );
136
146
waitUntilAfterCheckpoint (CONTINUOUS_DATA_FRAME_ID , 0L );
137
147
138
- DataFrameTransformStats stateAndStats = getTransformStats (CONTINUOUS_DATA_FRAME_ID );
148
+ assertBusy (() -> {
149
+ DataFrameTransformStats stateAndStats = getTransformStats (CONTINUOUS_DATA_FRAME_ID );
150
+ assertThat (stateAndStats .getIndexerStats ().getOutputDocuments (), equalTo ((long )ENTITIES .size ()));
151
+ assertThat (stateAndStats .getIndexerStats ().getNumDocuments (), equalTo (totalDocsWritten ));
152
+ // Even if we get back to started, we may periodically get set back to `indexing` when triggered.
153
+ // Though short lived due to no changes on the source indices, it could result in flaky test behavior
154
+ assertThat (stateAndStats .getState (), oneOf (DataFrameTransformStats .State .STARTED , DataFrameTransformStats .State .INDEXING ));
155
+ }, 120 , TimeUnit .SECONDS );
139
156
140
- assertThat ( stateAndStats . getIndexerStats (). getOutputDocuments (), equalTo (( long ) ENTITIES . size ()));
141
- assertThat ( stateAndStats . getIndexerStats (). getNumDocuments (), equalTo ( totalDocsWritten ));
142
- assertThat ( stateAndStats . getState (), oneOf ( DataFrameTransformStats . State . STARTED , DataFrameTransformStats . State . INDEXING ));
157
+
158
+ // We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable
159
+ awaitWrittenIndexerState ( CONTINUOUS_DATA_FRAME_ID , IndexerState . STARTED . value ( ));
143
160
}
144
161
162
+ @ SuppressWarnings ("unchecked" )
145
163
private void verifyContinuousDataFrameHandlesData (long expectedLastCheckpoint ) throws Exception {
146
164
147
165
// A continuous data frame should automatically become started when it gets assigned to a node
@@ -161,9 +179,9 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
161
179
List <String > entities = new ArrayList <>(1 );
162
180
entities .add ("user_" + ENTITIES .size () + expectedLastCheckpoint );
163
181
int docs = 5 ;
164
- // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin
165
- // wait later.
166
- putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , TimeValue .timeValueSeconds (1 ), entities );
182
+ // Index the data
183
+ // The frequency and delay should see the data once its indexed
184
+ putData (CONTINUOUS_DATA_FRAME_SOURCE , docs , TimeValue .timeValueSeconds (0 ), entities );
167
185
168
186
waitUntilAfterCheckpoint (CONTINUOUS_DATA_FRAME_ID , expectedLastCheckpoint );
169
187
@@ -176,10 +194,55 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
176
194
177
195
assertThat (stateAndStats .getState (),
178
196
oneOf (DataFrameTransformStats .State .STARTED , DataFrameTransformStats .State .INDEXING ));
179
- assertThat (stateAndStats .getIndexerStats ().getOutputDocuments (),
180
- greaterThan (previousStateAndStats .getIndexerStats ().getOutputDocuments ()));
181
- assertThat (stateAndStats .getIndexerStats ().getNumDocuments (),
182
- greaterThanOrEqualTo (docs + previousStateAndStats .getIndexerStats ().getNumDocuments ()));
197
+ awaitWrittenIndexerState (CONTINUOUS_DATA_FRAME_ID , (responseBody ) -> {
198
+ Map <String , Object > indexerStats = (Map <String ,Object >)((List <?>)XContentMapValues .extractValue ("hits.hits._source.stats" ,
199
+ responseBody ))
200
+ .get (0 );
201
+ assertThat ((Integer )indexerStats .get ("documents_indexed" ),
202
+ greaterThan (Long .valueOf (previousStateAndStats .getIndexerStats ().getOutputDocuments ()).intValue ()));
203
+ assertThat ((Integer )indexerStats .get ("documents_processed" ),
204
+ greaterThan (Long .valueOf (previousStateAndStats .getIndexerStats ().getNumDocuments ()).intValue ()));
205
+ });
206
+ }
207
+
208
+ private void awaitWrittenIndexerState (String id , Consumer <Map <?, ?>> responseAssertion ) throws Exception {
209
+ Request getStatsDocsRequest = new Request ("GET" , ".data-frame-internal-*/_search" );
210
+ getStatsDocsRequest .setJsonEntity ("{\n " +
211
+ " \" query\" : {\n " +
212
+ " \" bool\" : {\n " +
213
+ " \" filter\" : \n " +
214
+ " {\" term\" : {\n " +
215
+ " \" _id\" : \" data_frame_transform_state_and_stats-" + id + "\" \n " +
216
+ " }}\n " +
217
+ " }\n " +
218
+ " },\n " +
219
+ " \" sort\" : [\n " +
220
+ " {\n " +
221
+ " \" _index\" : {\n " +
222
+ " \" order\" : \" desc\" \n " +
223
+ " }\n " +
224
+ " }\n " +
225
+ " ],\n " +
226
+ " \" size\" : 1\n " +
227
+ "}" );
228
+ assertBusy (() -> {
229
+ // Want to make sure we get the latest docs
230
+ client ().performRequest (new Request ("POST" , ".data-frame-internal-*/_refresh" ));
231
+ Response response = client ().performRequest (getStatsDocsRequest );
232
+ assertEquals (200 , response .getStatusLine ().getStatusCode ());
233
+ Map <String , Object > responseBody = entityAsMap (response );
234
+ assertEquals (1 , XContentMapValues .extractValue ("hits.total.value" , responseBody ));
235
+ responseAssertion .accept (responseBody );
236
+ }, 60 , TimeUnit .SECONDS );
237
+ }
238
+
239
+ private void awaitWrittenIndexerState (String id , String indexerState ) throws Exception {
240
+ awaitWrittenIndexerState (id , (responseBody ) -> {
241
+ String storedState = ((List <?>)XContentMapValues .extractValue ("hits.hits._source.state.indexer_state" , responseBody ))
242
+ .get (0 )
243
+ .toString ();
244
+ assertThat (storedState , equalTo (indexerState ));
245
+ });
183
246
}
184
247
185
248
private void putTransform (String id , DataFrameTransformConfig config ) throws IOException {
@@ -222,7 +285,7 @@ private DataFrameTransformStats getTransformStats(String id) throws IOException
222
285
}
223
286
224
287
private void waitUntilAfterCheckpoint (String id , long currentCheckpoint ) throws Exception {
225
- assertBusy (() -> assertThat (getTransformStats (id ).getCheckpointingInfo ().getNext ().getCheckpoint (), greaterThan (currentCheckpoint )),
288
+ assertBusy (() -> assertThat (getTransformStats (id ).getCheckpointingInfo ().getLast ().getCheckpoint (), greaterThan (currentCheckpoint )),
226
289
60 , TimeUnit .SECONDS );
227
290
}
228
291
@@ -249,7 +312,7 @@ private void createIndex(String indexName) throws IOException {
249
312
final StringEntity entity = new StringEntity (Strings .toString (builder ), ContentType .APPLICATION_JSON );
250
313
Request req = new Request ("PUT" , indexName );
251
314
req .setEntity (entity );
252
- client ().performRequest (req );
315
+ assertThat ( client ().performRequest (req ). getStatusLine (). getStatusCode (), equalTo ( 200 ) );
253
316
}
254
317
}
255
318
0 commit comments