34
34
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
35
35
import org .elasticsearch .cluster .routing .ShardRouting ;
36
36
import org .elasticsearch .cluster .service .ClusterService ;
37
+ import org .elasticsearch .common .Nullable ;
38
+ import org .elasticsearch .common .Strings ;
37
39
import org .elasticsearch .common .UUIDs ;
38
40
import org .elasticsearch .common .component .AbstractComponent ;
39
41
import org .elasticsearch .common .inject .Inject ;
65
67
import java .io .IOException ;
66
68
import java .util .ArrayList ;
67
69
import java .util .Collections ;
70
+ import java .util .HashMap ;
68
71
import java .util .List ;
69
72
import java .util .Map ;
70
73
import java .util .concurrent .ConcurrentMap ;
@@ -216,9 +219,16 @@ public void onResponse(InFlightOpsResponse response) {
216
219
if (inflight != 0 ) {
217
220
actionListener .onResponse (new ShardsSyncedFlushResult (shardId , totalShards , "[" + inflight + "] ongoing operations on primary" ));
218
221
} else {
219
- // 3. now send the sync request to all the shards
220
- String syncId = UUIDs .randomBase64UUID ();
221
- sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
222
+ // 3. now send the sync request to all the shards;
223
+ final String sharedSyncId = sharedExistingSyncId (presyncResponses );
224
+ if (sharedSyncId != null ) {
225
+ assert presyncResponses .values ().stream ().allMatch (r -> r .existingSyncId .equals (sharedSyncId )) :
226
+ "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]" ;
227
+ reportSuccessWithExistingSyncId (shardId , sharedSyncId , activeShards , totalShards , presyncResponses , actionListener );
228
+ }else {
229
+ String syncId = UUIDs .randomBase64UUID ();
230
+ sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
231
+ }
222
232
}
223
233
}
224
234
@@ -244,6 +254,33 @@ public void onFailure(Exception e) {
244
254
}
245
255
}
246
256
257
+ private String sharedExistingSyncId (Map <String , PreSyncedFlushResponse > preSyncedFlushResponses ) {
258
+ String existingSyncId = null ;
259
+ for (PreSyncedFlushResponse resp : preSyncedFlushResponses .values ()) {
260
+ if (Strings .isNullOrEmpty (resp .existingSyncId )) {
261
+ return null ;
262
+ }
263
+ if (existingSyncId == null ) {
264
+ existingSyncId = resp .existingSyncId ;
265
+ }
266
+ if (existingSyncId .equals (resp .existingSyncId ) == false ) {
267
+ return null ;
268
+ }
269
+ }
270
+ return existingSyncId ;
271
+ }
272
+
273
+ private void reportSuccessWithExistingSyncId (ShardId shardId , String existingSyncId , List <ShardRouting > shards , int totalShards ,
274
+ Map <String , PreSyncedFlushResponse > preSyncResponses , ActionListener <ShardsSyncedFlushResult > listener ) {
275
+ final Map <ShardRouting , ShardSyncedFlushResponse > results = new HashMap <>();
276
+ for (final ShardRouting shard : shards ) {
277
+ if (preSyncResponses .containsKey (shard .currentNodeId ())) {
278
+ results .put (shard , new ShardSyncedFlushResponse ());
279
+ }
280
+ }
281
+ listener .onResponse (new ShardsSyncedFlushResult (shardId , existingSyncId , totalShards , results ));
282
+ }
283
+
247
284
final IndexShardRoutingTable getShardRoutingTable (ShardId shardId , ClusterState state ) {
248
285
final IndexRoutingTable indexRoutingTable = state .routingTable ().index (shardId .getIndexName ());
249
286
if (indexRoutingTable == null ) {
@@ -438,7 +475,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
438
475
final CommitStats commitStats = indexShard .commitStats ();
439
476
final Engine .CommitId commitId = commitStats .getRawCommitId ();
440
477
logger .trace ("{} pre sync flush done. commit id {}, num docs {}" , request .shardId (), commitId , commitStats .getNumDocs ());
441
- return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs ());
478
+ return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs (), commitStats . syncId () );
442
479
}
443
480
444
481
private ShardSyncedFlushResponse performSyncedFlush (ShardSyncedFlushRequest request ) {
@@ -512,41 +549,49 @@ static final class PreSyncedFlushResponse extends TransportResponse {
512
549
513
550
Engine .CommitId commitId ;
514
551
int numDocs ;
552
+ @ Nullable String existingSyncId = null ;
515
553
516
554
PreSyncedFlushResponse () {
517
555
}
518
556
519
- PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs ) {
557
+ PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs , String existingSyncId ) {
520
558
this .commitId = commitId ;
521
559
this .numDocs = numDocs ;
560
+ this .existingSyncId = existingSyncId ;
522
561
}
523
562
524
- Engine . CommitId commitId ( ) {
525
- return commitId ;
563
+ boolean includeNumDocs ( Version version ) {
564
+ return version . onOrAfter ( Version . V_5_6_8 ) ;
526
565
}
527
566
528
- int numDocs ( ) {
529
- return numDocs ;
567
+ boolean includeExistingSyncId ( Version version ) {
568
+ return version . onOrAfter ( Version . V_5_6_9 ) ;
530
569
}
531
570
532
571
@ Override
533
572
public void readFrom (StreamInput in ) throws IOException {
534
573
super .readFrom (in );
535
574
commitId = new Engine .CommitId (in );
536
- if (in .getVersion (). onOrAfter ( Version . V_5_6_8 )) {
575
+ if (includeNumDocs ( in .getVersion ())) {
537
576
numDocs = in .readInt ();
538
577
} else {
539
578
numDocs = UNKNOWN_NUM_DOCS ;
540
579
}
580
+ if (includeExistingSyncId (in .getVersion ())) {
581
+ existingSyncId = in .readOptionalString ();
582
+ }
541
583
}
542
584
543
585
@ Override
544
586
public void writeTo (StreamOutput out ) throws IOException {
545
587
super .writeTo (out );
546
588
commitId .writeTo (out );
547
- if (out .getVersion (). onOrAfter ( Version . V_5_6_8 )) {
589
+ if (includeNumDocs ( out .getVersion ())) {
548
590
out .writeInt (numDocs );
549
591
}
592
+ if (includeExistingSyncId (out .getVersion ())) {
593
+ out .writeOptionalString (existingSyncId );
594
+ }
550
595
}
551
596
}
552
597
0 commit comments