@@ -94,7 +94,7 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
94
94
private static final int MAX_DIM_VALUES = 5 ;
95
95
private static final long MAX_NUM_BUCKETS = 10 ;
96
96
97
- private String sourceIndex , sourceIndexClone , rollupIndex ;
97
+ private String sourceIndex , rollupIndex ;
98
98
private long startTime ;
99
99
private int docCount , numOfShards , numOfReplicas ;
100
100
private List <String > dimensionValues ;
@@ -112,13 +112,12 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
112
112
113
113
@ Before
114
114
public void setup () {
115
- sourceIndex = randomAlphaOfLength (5 ).toLowerCase (Locale .ROOT );
116
- sourceIndexClone = sourceIndex + "-clone" ;
117
- rollupIndex = randomAlphaOfLength (6 ).toLowerCase (Locale .ROOT );
115
+ sourceIndex = randomAlphaOfLength (12 ).toLowerCase (Locale .ROOT );
116
+ rollupIndex = "rollup-" + sourceIndex + "-" + randomAlphaOfLength (4 ).toLowerCase (Locale .ROOT );
118
117
startTime = randomLongBetween (946769284000L , 1607470084000L ); // random date between 2000-2020
119
118
docCount = randomIntBetween (10 , 9000 );
120
119
numOfShards = randomIntBetween (1 , 4 );
121
- numOfReplicas = randomIntBetween ( 0 , 3 );
120
+ numOfReplicas = 0 ; // Since this is a single node, we cannot have replicas
122
121
123
122
// Values for keyword dimensions
124
123
dimensionValues = new ArrayList <>(MAX_DIM_VALUES );
@@ -169,6 +168,8 @@ public void testRollupIndex() throws IOException {
169
168
};
170
169
bulkIndex (sourceSupplier );
171
170
prepareSourceIndex (sourceIndex );
171
+ // Clone the source index before rollup deletes it
172
+ String sourceIndexClone = cloneSourceIndex (sourceIndex );
172
173
rollup (sourceIndex , rollupIndex , config );
173
174
assertRollupIndex (config , sourceIndex , sourceIndexClone , rollupIndex );
174
175
}
@@ -217,6 +218,8 @@ public void testRollupSparseMetrics() throws IOException {
217
218
};
218
219
bulkIndex (sourceSupplier );
219
220
prepareSourceIndex (sourceIndex );
221
+ // Clone the source index before rollup deletes it
222
+ String sourceIndexClone = cloneSourceIndex (sourceIndex );
220
223
rollup (sourceIndex , rollupIndex , config );
221
224
assertRollupIndex (config , sourceIndex , sourceIndexClone , rollupIndex );
222
225
}
@@ -238,14 +241,15 @@ public void testRollupEmptyIndex() {
238
241
RollupActionConfig config = new RollupActionConfig (randomInterval ());
239
242
// Source index has been created in the setup() method
240
243
prepareSourceIndex (sourceIndex );
244
+ // Clone the source index before rollup deletes it
245
+ String sourceIndexClone = cloneSourceIndex (sourceIndex );
241
246
rollup (sourceIndex , rollupIndex , config );
242
247
assertRollupIndex (config , sourceIndex , sourceIndexClone , rollupIndex );
243
248
}
244
249
245
250
public void testCannotRollupIndexWithNoMetrics () {
246
251
// Create a source index that contains no metric fields in its mapping
247
- sourceIndex = randomAlphaOfLength (5 ).toLowerCase (Locale .ROOT );
248
- sourceIndexClone = sourceIndex + "-clone" ;
252
+ String sourceIndex = "no-metrics-idx-" + randomAlphaOfLength (5 ).toLowerCase (Locale .ROOT );
249
253
client ().admin ()
250
254
.indices ()
251
255
.prepareCreate (sourceIndex )
@@ -270,7 +274,6 @@ public void testCannotRollupIndexWithNoMetrics() {
270
274
.get ();
271
275
272
276
RollupActionConfig config = new RollupActionConfig (randomInterval ());
273
- // Source index has been created in the setup() method
274
277
prepareSourceIndex (sourceIndex );
275
278
Exception exception = expectThrows (RollupActionRequestValidationException .class , () -> rollup (sourceIndex , rollupIndex , config ));
276
279
assertThat (exception .getMessage (), containsString ("does not contain any metric fields" ));
@@ -325,10 +328,11 @@ public void testRollupDatastream() throws Exception {
325
328
};
326
329
bulkIndex (dataStreamName , sourceSupplier );
327
330
328
- this .sourceIndex = rollover (dataStreamName ).getOldIndex ();
329
- this .sourceIndexClone = sourceIndex + "-clone" ;
330
- this .rollupIndex = ".rollup-" + sourceIndex ;
331
+ String sourceIndex = rollover (dataStreamName ).getOldIndex ();
331
332
prepareSourceIndex (sourceIndex );
333
+ // Clone the source index before rollup deletes it
334
+ String sourceIndexClone = cloneSourceIndex (sourceIndex );
335
+ String rollupIndex = "rollup-" + sourceIndex + "-" + randomAlphaOfLength (4 ).toLowerCase (Locale .ROOT );
332
336
rollup (sourceIndex , rollupIndex , config );
333
337
assertRollupIndex (config , sourceIndex , sourceIndexClone , rollupIndex );
334
338
@@ -354,7 +358,15 @@ private String randomDateForRange(long start, long end) {
354
358
return DATE_FORMATTER .formatMillis (randomLongBetween (start , end ));
355
359
}
356
360
357
- private void cloneSourceIndex (String sourceIndex , String sourceIndexClone ) {
361
+ /**
362
+ * The source index is deleted at the end of the rollup process.
363
+ * We clone the source index, so that we validate rollup results against the
364
+ * source index clone.
365
+ *
366
+ * @return the name of the source index clone
367
+ */
368
+ private String cloneSourceIndex (String sourceIndex ) {
369
+ String sourceIndexClone = "clone-" + sourceIndex ;
358
370
ResizeResponse r = client ().admin ()
359
371
.indices ()
360
372
.prepareResizeIndex (sourceIndex , sourceIndexClone )
@@ -364,6 +376,7 @@ private void cloneSourceIndex(String sourceIndex, String sourceIndexClone) {
364
376
)
365
377
.get ();
366
378
assertTrue (r .isAcknowledged ());
379
+ return sourceIndexClone ;
367
380
}
368
381
369
382
private void bulkIndex (SourceSupplier sourceSupplier ) throws IOException {
@@ -386,15 +399,15 @@ private void bulkIndex(String indexName, SourceSupplier sourceSupplier) throws I
386
399
if (response .getFailure ().getCause () instanceof VersionConflictEngineException ) {
387
400
// A duplicate event was created by random generator. We should not fail for this
388
401
// reason.
389
- logger .info ("We tried to insert a duplicate: " + response .getFailureMessage ());
402
+ logger .debug ("We tried to insert a duplicate: [{}]" , response .getFailureMessage ());
390
403
duplicates ++;
391
404
} else {
392
405
fail ("Failed to index data: " + bulkResponse .buildFailureMessage ());
393
406
}
394
407
}
395
408
}
396
409
int docsIndexed = docCount - duplicates ;
397
- logger .info ("Indexed [" + docsIndexed + "] documents" );
410
+ logger .info ("Indexed [{}] documents. Dropped [{}] duplicates." , docsIndexed , duplicates );
398
411
assertHitCount (client ().prepareSearch (indexName ).setSize (0 ).get (), docsIndexed );
399
412
}
400
413
@@ -406,11 +419,6 @@ private void prepareSourceIndex(String sourceIndex) {
406
419
.setSettings (Settings .builder ().put (IndexMetadata .INDEX_BLOCKS_WRITE_SETTING .getKey (), true ).build ())
407
420
.get ();
408
421
assertTrue (r .isAcknowledged ());
409
-
410
- // The source index is deleted at the end of the rollup process.
411
- // We clone the source index, so that we validate rollup results against the
412
- // source index clone.
413
- cloneSourceIndex (sourceIndex , sourceIndexClone );
414
422
}
415
423
416
424
private void rollup (String sourceIndex , String rollupIndex , RollupActionConfig config ) {
0 commit comments