21
21
import org .apache .logging .log4j .message .ParameterizedMessage ;
22
22
import org .apache .logging .log4j .util .Supplier ;
23
23
import org .elasticsearch .ElasticsearchException ;
24
+ import org .elasticsearch .Version ;
24
25
import org .elasticsearch .action .ActionListener ;
25
26
import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
26
27
import org .elasticsearch .action .admin .indices .flush .SyncedFlushResponse ;
44
45
import org .elasticsearch .index .Index ;
45
46
import org .elasticsearch .index .IndexNotFoundException ;
46
47
import org .elasticsearch .index .IndexService ;
48
+ import org .elasticsearch .index .engine .CommitStats ;
47
49
import org .elasticsearch .index .engine .Engine ;
48
50
import org .elasticsearch .index .shard .IndexEventListener ;
49
51
import org .elasticsearch .index .shard .IndexShard ;
@@ -199,10 +201,10 @@ private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState s
199
201
return ;
200
202
}
201
203
202
- final ActionListener <Map <String , Engine . CommitId >> commitIdsListener = new ActionListener <Map <String , Engine . CommitId >>() {
204
+ final ActionListener <Map <String , PreSyncedFlushResponse >> presyncListener = new ActionListener <Map <String , PreSyncedFlushResponse >>() {
203
205
@ Override
204
- public void onResponse (final Map <String , Engine . CommitId > commitIds ) {
205
- if (commitIds .isEmpty ()) {
206
+ public void onResponse (final Map <String , PreSyncedFlushResponse > presyncResponses ) {
207
+ if (presyncResponses .isEmpty ()) {
206
208
actionListener .onResponse (new ShardsSyncedFlushResult (shardId , totalShards , "all shards failed to commit on pre-sync" ));
207
209
return ;
208
210
}
@@ -216,7 +218,7 @@ public void onResponse(InFlightOpsResponse response) {
216
218
} else {
217
219
// 3. now send the sync request to all the shards
218
220
String syncId = UUIDs .randomBase64UUID ();
219
- sendSyncRequests (syncId , activeShards , state , commitIds , shardId , totalShards , actionListener );
221
+ sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
220
222
}
221
223
}
222
224
@@ -236,7 +238,7 @@ public void onFailure(Exception e) {
236
238
};
237
239
238
240
// 1. send pre-sync flushes to all replicas
239
- sendPreSyncRequests (activeShards , state , shardId , commitIdsListener );
241
+ sendPreSyncRequests (activeShards , state , shardId , presyncListener );
240
242
} catch (Exception e ) {
241
243
actionListener .onFailure (e );
242
244
}
@@ -299,28 +301,49 @@ public String executor() {
299
301
}
300
302
}
301
303
304
+ private int numDocsOnPrimary (List <ShardRouting > shards , Map <String , PreSyncedFlushResponse > preSyncResponses ) {
305
+ for (ShardRouting shard : shards ) {
306
+ if (shard .primary ()) {
307
+ final PreSyncedFlushResponse resp = preSyncResponses .get (shard .currentNodeId ());
308
+ if (resp != null ) {
309
+ return resp .numDocs ;
310
+ }
311
+ }
312
+ }
313
+ return PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ;
314
+ }
302
315
303
- void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , Engine . CommitId > expectedCommitIds ,
316
+ void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , PreSyncedFlushResponse > preSyncResponses ,
304
317
final ShardId shardId , final int totalShards , final ActionListener <ShardsSyncedFlushResult > listener ) {
305
318
final CountDown countDown = new CountDown (shards .size ());
306
319
final Map <ShardRouting , ShardSyncedFlushResponse > results = ConcurrentCollections .newConcurrentMap ();
320
+ final int numDocsOnPrimary = numDocsOnPrimary (shards , preSyncResponses );
307
321
for (final ShardRouting shard : shards ) {
308
322
final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
309
323
if (node == null ) {
310
324
logger .trace ("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
311
325
results .put (shard , new ShardSyncedFlushResponse ("unknown node" ));
312
- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
326
+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
313
327
continue ;
314
328
}
315
- final Engine . CommitId expectedCommitId = expectedCommitIds .get (shard .currentNodeId ());
316
- if (expectedCommitId == null ) {
329
+ final PreSyncedFlushResponse preSyncedResponse = preSyncResponses .get (shard .currentNodeId ());
330
+ if (preSyncedResponse == null ) {
317
331
logger .trace ("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
318
332
results .put (shard , new ShardSyncedFlushResponse ("no commit id from pre-sync flush" ));
319
- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
333
+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
334
+ continue ;
335
+ }
336
+ if (preSyncedResponse .numDocs != numDocsOnPrimary
337
+ && preSyncedResponse .numDocs != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ) {
338
+ logger .warn ("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]" ,
339
+ shardId , syncId , shard , preSyncedResponse .numDocs , numDocsOnPrimary );
340
+ results .put (shard , new ShardSyncedFlushResponse ("out of sync replica; " +
341
+ "num docs on replica [" + preSyncedResponse .numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]" ));
342
+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
320
343
continue ;
321
344
}
322
345
logger .trace ("{} sending synced flush request to {}. sync id [{}]." , shardId , shard , syncId );
323
- transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , expectedCommitId ),
346
+ transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , preSyncedResponse . commitId ),
324
347
new TransportResponseHandler <ShardSyncedFlushResponse >() {
325
348
@ Override
326
349
public ShardSyncedFlushResponse newInstance () {
@@ -332,14 +355,14 @@ public void handleResponse(ShardSyncedFlushResponse response) {
332
355
ShardSyncedFlushResponse existing = results .put (shard , response );
333
356
assert existing == null : "got two answers for node [" + node + "]" ;
334
357
// count after the assert so we won't decrement twice in handleException
335
- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
358
+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
336
359
}
337
360
338
361
@ Override
339
362
public void handleException (TransportException exp ) {
340
363
logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing synced flush on [{}], skipping" , shardId , shard ), exp );
341
364
results .put (shard , new ShardSyncedFlushResponse (exp .getMessage ()));
342
- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
365
+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
343
366
}
344
367
345
368
@ Override
@@ -351,8 +374,8 @@ public String executor() {
351
374
352
375
}
353
376
354
- private void contDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
355
- ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
377
+ private void countDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
378
+ ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
356
379
if (countDown .countDown ()) {
357
380
assert results .size () == shards .size ();
358
381
listener .onResponse (new ShardsSyncedFlushResult (shardId , syncId , totalShards , results ));
@@ -362,16 +385,16 @@ private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> sha
362
385
/**
363
386
* send presync requests to all started copies of the given shard
364
387
*/
365
- void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , Engine . CommitId >> listener ) {
388
+ void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , PreSyncedFlushResponse >> listener ) {
366
389
final CountDown countDown = new CountDown (shards .size ());
367
- final ConcurrentMap <String , Engine . CommitId > commitIds = ConcurrentCollections .newConcurrentMap ();
390
+ final ConcurrentMap <String , PreSyncedFlushResponse > presyncResponses = ConcurrentCollections .newConcurrentMap ();
368
391
for (final ShardRouting shard : shards ) {
369
392
logger .trace ("{} sending pre-synced flush request to {}" , shardId , shard );
370
393
final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
371
394
if (node == null ) {
372
395
logger .trace ("{} shard routing {} refers to an unknown node. skipping." , shardId , shard );
373
396
if (countDown .countDown ()) {
374
- listener .onResponse (commitIds );
397
+ listener .onResponse (presyncResponses );
375
398
}
376
399
continue ;
377
400
}
@@ -383,19 +406,19 @@ public PreSyncedFlushResponse newInstance() {
383
406
384
407
@ Override
385
408
public void handleResponse (PreSyncedFlushResponse response ) {
386
- Engine . CommitId existing = commitIds .putIfAbsent (node .getId (), response . commitId () );
409
+ PreSyncedFlushResponse existing = presyncResponses .putIfAbsent (node .getId (), response );
387
410
assert existing == null : "got two answers for node [" + node + "]" ;
388
411
// count after the assert so we won't decrement twice in handleException
389
412
if (countDown .countDown ()) {
390
- listener .onResponse (commitIds );
413
+ listener .onResponse (presyncResponses );
391
414
}
392
415
}
393
416
394
417
@ Override
395
418
public void handleException (TransportException exp ) {
396
419
logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing pre synced flush on [{}], skipping" , shardId , shard ), exp );
397
420
if (countDown .countDown ()) {
398
- listener .onResponse (commitIds );
421
+ listener .onResponse (presyncResponses );
399
422
}
400
423
}
401
424
@@ -411,9 +434,11 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
411
434
IndexShard indexShard = indicesService .indexServiceSafe (request .shardId ().getIndex ()).getShard (request .shardId ().id ());
412
435
FlushRequest flushRequest = new FlushRequest ().force (false ).waitIfOngoing (true );
413
436
logger .trace ("{} performing pre sync flush" , request .shardId ());
414
- Engine .CommitId commitId = indexShard .flush (flushRequest );
415
- logger .trace ("{} pre sync flush done. commit id {}" , request .shardId (), commitId );
416
- return new PreSyncedFlushResponse (commitId );
437
+ indexShard .flush (flushRequest );
438
+ final CommitStats commitStats = indexShard .commitStats ();
439
+ final Engine .CommitId commitId = commitStats .getRawCommitId ();
440
+ logger .trace ("{} pre sync flush done. commit id {}, num docs {}" , request .shardId (), commitId , commitStats .getNumDocs ());
441
+ return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs ());
417
442
}
418
443
419
444
private ShardSyncedFlushResponse performSyncedFlush (ShardSyncedFlushRequest request ) {
@@ -483,30 +508,45 @@ public ShardId shardId() {
483
508
* Response for first step of synced flush (flush) for one shard copy
484
509
*/
485
510
static final class PreSyncedFlushResponse extends TransportResponse {
511
+ static final int UNKNOWN_NUM_DOCS = -1 ;
486
512
487
513
Engine .CommitId commitId ;
514
+ int numDocs ;
488
515
489
516
PreSyncedFlushResponse () {
490
517
}
491
518
492
- PreSyncedFlushResponse (Engine .CommitId commitId ) {
519
+ PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs ) {
493
520
this .commitId = commitId ;
521
+ this .numDocs = numDocs ;
494
522
}
495
523
496
- public Engine .CommitId commitId () {
524
+ Engine .CommitId commitId () {
497
525
return commitId ;
498
526
}
499
527
528
+ int numDocs () {
529
+ return numDocs ;
530
+ }
531
+
500
532
@ Override
501
533
public void readFrom (StreamInput in ) throws IOException {
502
534
super .readFrom (in );
503
535
commitId = new Engine .CommitId (in );
536
+ if (in .getVersion ().onOrAfter (Version .V_5_6_8 )) {
537
+ numDocs = in .readInt ();
538
+ } else {
539
+ numDocs = UNKNOWN_NUM_DOCS ;
540
+ }
504
541
}
505
542
506
543
@ Override
507
544
public void writeTo (StreamOutput out ) throws IOException {
508
545
super .writeTo (out );
509
546
commitId .writeTo (out );
547
+ if (out .getVersion ().onOrAfter (Version .V_5_6_8 )) {
548
+ out .writeInt (numDocs );
549
+ }
510
550
}
511
551
}
512
552
0 commit comments