26
26
import org .elasticsearch .action .admin .indices .stats .ShardStats ;
27
27
import org .elasticsearch .action .bulk .BulkProcessor ;
28
28
import org .elasticsearch .action .bulk .BulkRequest ;
29
+ import org .elasticsearch .action .bulk .BulkRequestBuilder ;
29
30
import org .elasticsearch .action .bulk .BulkResponse ;
30
31
import org .elasticsearch .action .get .GetResponse ;
31
32
import org .elasticsearch .action .index .IndexRequest ;
@@ -101,9 +102,30 @@ public void testFollowIndex() throws Exception {
101
102
assertAcked (leaderClient ().admin ().indices ().prepareCreate ("index1" ).setSource (leaderIndexSettings , XContentType .JSON ));
102
103
ensureLeaderYellow ("index1" );
103
104
104
- final int firstBatchNumDocs = randomIntBetween (2 , 64 );
105
+ final int firstBatchNumDocs ;
106
+ // Sometimes we want to index a lot of documents to ensure that the recovery works with larger files
107
+ if (rarely ()) {
108
+ firstBatchNumDocs = randomIntBetween (1800 , 2000 );
109
+ } else {
110
+ firstBatchNumDocs = randomIntBetween (10 , 64 );
111
+ }
112
+ final int flushPoint = (int ) (firstBatchNumDocs * 0.75 );
113
+
105
114
logger .info ("Indexing [{}] docs as first batch" , firstBatchNumDocs );
106
- for (int i = 0 ; i < firstBatchNumDocs ; i ++) {
115
+ BulkRequestBuilder bulkRequestBuilder = leaderClient ().prepareBulk ();
116
+ for (int i = 0 ; i < flushPoint ; i ++) {
117
+ final String source = String .format (Locale .ROOT , "{\" f\" :%d}" , i );
118
+ IndexRequest indexRequest = new IndexRequest ("index1" , "doc" , Integer .toString (i ))
119
+ .source (source , XContentType .JSON )
120
+ .timeout (TimeValue .timeValueSeconds (1 ));
121
+ bulkRequestBuilder .add (indexRequest );
122
+ }
123
+ bulkRequestBuilder .get ();
124
+
125
+ leaderClient ().admin ().indices ().prepareFlush ("index1" ).setWaitIfOngoing (true ).get ();
126
+
127
+ // Index some docs after the flush that might be recovered in the normal index following operations
128
+ for (int i = flushPoint ; i < firstBatchNumDocs ; i ++) {
107
129
final String source = String .format (Locale .ROOT , "{\" f\" :%d}" , i );
108
130
leaderClient ().prepareIndex ("index1" , "doc" , Integer .toString (i )).setSource (source , XContentType .JSON ).get ();
109
131
}
@@ -147,7 +169,7 @@ public void testFollowIndex() throws Exception {
147
169
for (int i = 0 ; i < firstBatchNumDocs ; i ++) {
148
170
assertBusy (assertExpectedDocumentRunnable (i ));
149
171
}
150
- assertTotalNumberOfOptimizedIndexing ( resolveFollowerIndex ( "index2" ), numberOfPrimaryShards , firstBatchNumDocs );
172
+
151
173
pauseFollow ("index2" );
152
174
followerClient ().execute (ResumeFollowAction .INSTANCE , followRequest .getFollowRequest ()).get ();
153
175
final int secondBatchNumDocs = randomIntBetween (2 , 64 );
@@ -172,8 +194,6 @@ public void testFollowIndex() throws Exception {
172
194
for (int i = firstBatchNumDocs ; i < firstBatchNumDocs + secondBatchNumDocs ; i ++) {
173
195
assertBusy (assertExpectedDocumentRunnable (i ));
174
196
}
175
- assertTotalNumberOfOptimizedIndexing (resolveFollowerIndex ("index2" ), numberOfPrimaryShards ,
176
- firstBatchNumDocs + secondBatchNumDocs );
177
197
pauseFollow ("index2" );
178
198
assertMaxSeqNoOfUpdatesIsTransferred (resolveLeaderIndex ("index1" ), resolveFollowerIndex ("index2" ), numberOfPrimaryShards );
179
199
}
@@ -287,7 +307,6 @@ public void testFollowIndexWithoutWaitForComplete() throws Exception {
287
307
for (int i = 0 ; i < firstBatchNumDocs ; i ++) {
288
308
assertBusy (assertExpectedDocumentRunnable (i ));
289
309
}
290
- assertTotalNumberOfOptimizedIndexing (resolveFollowerIndex ("index2" ), numberOfPrimaryShards , firstBatchNumDocs );
291
310
pauseFollow ("index2" );
292
311
}
293
312
@@ -432,8 +451,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
432
451
assertIndexFullyReplicatedToFollower ("index1" , "index2" );
433
452
pauseFollow ("index2" );
434
453
leaderClient ().admin ().indices ().prepareRefresh ("index1" ).get ();
435
- assertTotalNumberOfOptimizedIndexing (resolveFollowerIndex ("index2" ), numberOfShards ,
436
- leaderClient ().prepareSearch ("index1" ).get ().getHits ().getTotalHits ().value );
437
454
assertMaxSeqNoOfUpdatesIsTransferred (resolveLeaderIndex ("index1" ), resolveFollowerIndex ("index2" ), numberOfShards );
438
455
}
439
456
@@ -475,7 +492,6 @@ public void testFollowIndexWithNestedField() throws Exception {
475
492
}
476
493
pauseFollow ("index2" );
477
494
assertMaxSeqNoOfUpdatesIsTransferred (resolveLeaderIndex ("index1" ), resolveFollowerIndex ("index2" ), 1 );
478
- assertTotalNumberOfOptimizedIndexing (resolveFollowerIndex ("index2" ), 1 , numDocs );
479
495
}
480
496
481
497
public void testUnfollowNonExistingIndex () {
@@ -538,7 +554,6 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
538
554
}
539
555
pauseFollow ("index2" );
540
556
assertMaxSeqNoOfUpdatesIsTransferred (resolveLeaderIndex ("index1" ), resolveFollowerIndex ("index2" ), 1 );
541
- assertTotalNumberOfOptimizedIndexing (resolveFollowerIndex ("index2" ), 1 , numDocs );
542
557
}
543
558
544
559
public void testAttemptToChangeCcrFollowingIndexSetting () throws Exception {
0 commit comments