@@ -71,7 +71,10 @@ public class ReplicationOperation<
71
71
private final Primary <Request , ReplicaRequest , PrimaryResultT > primary ;
72
72
private final Replicas <ReplicaRequest > replicasProxy ;
73
73
private final AtomicBoolean finished = new AtomicBoolean ();
74
- protected final ActionListener <PrimaryResultT > resultListener ;
74
+ private final long primaryTerm ;
75
+
76
+ // exposed for tests
77
+ final ActionListener <PrimaryResultT > resultListener ;
75
78
76
79
private volatile PrimaryResultT primaryResult = null ;
77
80
@@ -80,13 +83,14 @@ public class ReplicationOperation<
80
83
public ReplicationOperation (Request request , Primary <Request , ReplicaRequest , PrimaryResultT > primary ,
81
84
ActionListener <PrimaryResultT > listener ,
82
85
Replicas <ReplicaRequest > replicas ,
83
- Logger logger , String opType ) {
86
+ Logger logger , String opType , long primaryTerm ) {
84
87
this .replicasProxy = replicas ;
85
88
this .primary = primary ;
86
89
this .resultListener = listener ;
87
90
this .logger = logger ;
88
91
this .request = request ;
89
92
this .opType = opType ;
93
+ this .primaryTerm = primaryTerm ;
90
94
}
91
95
92
96
public void execute () throws Exception {
@@ -137,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
137
141
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
138
142
for (String allocationId : replicationGroup .getUnavailableInSyncShards ()) {
139
143
pendingActions .incrementAndGet ();
140
- replicasProxy .markShardCopyAsStaleIfNeeded (replicaRequest .shardId (), allocationId ,
144
+ replicasProxy .markShardCopyAsStaleIfNeeded (replicaRequest .shardId (), allocationId , primaryTerm ,
141
145
ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
142
146
}
143
147
}
@@ -165,44 +169,45 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
165
169
166
170
totalShards .incrementAndGet ();
167
171
pendingActions .incrementAndGet ();
168
- replicasProxy .performOn (shard , replicaRequest , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes , new ActionListener <ReplicaResponse >() {
169
- @ Override
170
- public void onResponse (ReplicaResponse response ) {
171
- successfulShards .incrementAndGet ();
172
- try {
173
- primary .updateLocalCheckpointForShard (shard .allocationId ().getId (), response .localCheckpoint ());
174
- primary .updateGlobalCheckpointForShard (shard .allocationId ().getId (), response .globalCheckpoint ());
175
- } catch (final AlreadyClosedException e ) {
176
- // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
177
- } catch (final Exception e ) {
178
- // fail the primary but fall through and let the rest of operation processing complete
179
- final String message = String .format (Locale .ROOT , "primary failed updating local checkpoint for replica %s" , shard );
180
- primary .failShard (message , e );
172
+ replicasProxy .performOn (shard , replicaRequest , primaryTerm , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes ,
173
+ new ActionListener <>() {
174
+ @ Override
175
+ public void onResponse (ReplicaResponse response ) {
176
+ successfulShards .incrementAndGet ();
177
+ try {
178
+ primary .updateLocalCheckpointForShard (shard .allocationId ().getId (), response .localCheckpoint ());
179
+ primary .updateGlobalCheckpointForShard (shard .allocationId ().getId (), response .globalCheckpoint ());
180
+ } catch (final AlreadyClosedException e ) {
181
+ // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
182
+ } catch (final Exception e ) {
183
+ // fail the primary but fall through and let the rest of operation processing complete
184
+ final String message = String .format (Locale .ROOT , "primary failed updating local checkpoint for replica %s" , shard );
185
+ primary .failShard (message , e );
186
+ }
187
+ decPendingAndFinishIfNeeded ();
181
188
}
182
- decPendingAndFinishIfNeeded ();
183
- }
184
189
185
- @ Override
186
- public void onFailure (Exception replicaException ) {
187
- logger .trace (() -> new ParameterizedMessage (
188
- "[{}] failure while performing [{}] on replica {}, request [{}]" ,
189
- shard .shardId (), opType , shard , replicaRequest ), replicaException );
190
- // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
191
- if (TransportActions .isShardNotAvailableException (replicaException ) == false ) {
192
- RestStatus restStatus = ExceptionsHelper .status (replicaException );
193
- shardReplicaFailures .add (new ReplicationResponse .ShardInfo .Failure (
194
- shard .shardId (), shard .currentNodeId (), replicaException , restStatus , false ));
190
+ @ Override
191
+ public void onFailure (Exception replicaException ) {
192
+ logger .trace (() -> new ParameterizedMessage (
193
+ "[{}] failure while performing [{}] on replica {}, request [{}]" ,
194
+ shard .shardId (), opType , shard , replicaRequest ), replicaException );
195
+ // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
196
+ if (TransportActions .isShardNotAvailableException (replicaException ) == false ) {
197
+ RestStatus restStatus = ExceptionsHelper .status (replicaException );
198
+ shardReplicaFailures .add (new ReplicationResponse .ShardInfo .Failure (
199
+ shard .shardId (), shard .currentNodeId (), replicaException , restStatus , false ));
200
+ }
201
+ String message = String .format (Locale .ROOT , "failed to perform %s on replica %s" , opType , shard );
202
+ replicasProxy .failShardIfNeeded (shard , primaryTerm , message , replicaException ,
203
+ ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
195
204
}
196
- String message = String .format (Locale .ROOT , "failed to perform %s on replica %s" , opType , shard );
197
- replicasProxy .failShardIfNeeded (shard , message , replicaException ,
198
- ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary ));
199
- }
200
205
201
- @ Override
202
- public String toString () {
203
- return "[" + replicaRequest + "][" + shard + "]" ;
204
- }
205
- });
206
+ @ Override
207
+ public String toString () {
208
+ return "[" + replicaRequest + "][" + shard + "]" ;
209
+ }
210
+ });
206
211
}
207
212
208
213
private void onNoLongerPrimary (Exception failure ) {
@@ -373,25 +378,27 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
373
378
*
374
379
* @param replica the shard this request should be executed on
375
380
* @param replicaRequest the operation to perform
381
+ * @param primaryTerm the primary term
376
382
* @param globalCheckpoint the global checkpoint on the primary
377
383
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
378
384
* after this replication was executed on it.
379
385
* @param listener callback for handling the response or failure
380
386
*/
381
- void performOn (ShardRouting replica , RequestT replicaRequest , long globalCheckpoint ,
382
- long maxSeqNoOfUpdatesOrDeletes , ActionListener <ReplicaResponse > listener );
387
+ void performOn (ShardRouting replica , RequestT replicaRequest ,
388
+ long primaryTerm , long globalCheckpoint , long maxSeqNoOfUpdatesOrDeletes , ActionListener <ReplicaResponse > listener );
383
389
384
390
/**
385
391
* Fail the specified shard if needed, removing it from the current set
386
392
* of active shards. Whether a failure is needed is left up to the
387
393
* implementation.
388
394
*
389
- * @param replica shard to fail
390
- * @param message a (short) description of the reason
391
- * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
392
- * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
395
+ * @param replica shard to fail
396
+ * @param primaryTerm the primary term
397
+ * @param message a (short) description of the reason
398
+ * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
399
+ * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
393
400
*/
394
- void failShardIfNeeded (ShardRouting replica , String message , Exception exception , ActionListener <Void > listener );
401
+ void failShardIfNeeded (ShardRouting replica , long primaryTerm , String message , Exception exception , ActionListener <Void > listener );
395
402
396
403
/**
397
404
* Marks shard copy as stale if needed, removing its allocation id from
@@ -400,9 +407,10 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
400
407
*
401
408
* @param shardId shard id
402
409
* @param allocationId allocation id to remove from the set of in-sync allocation ids
410
+ * @param primaryTerm the primary term
403
411
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
404
412
*/
405
- void markShardCopyAsStaleIfNeeded (ShardId shardId , String allocationId , ActionListener <Void > listener );
413
+ void markShardCopyAsStaleIfNeeded (ShardId shardId , String allocationId , long primaryTerm , ActionListener <Void > listener );
406
414
}
407
415
408
416
/**
@@ -427,11 +435,11 @@ public interface ReplicaResponse {
427
435
}
428
436
429
437
public static class RetryOnPrimaryException extends ElasticsearchException {
430
- public RetryOnPrimaryException (ShardId shardId , String msg ) {
438
+ RetryOnPrimaryException (ShardId shardId , String msg ) {
431
439
this (shardId , msg , null );
432
440
}
433
441
434
- public RetryOnPrimaryException (ShardId shardId , String msg , Throwable cause ) {
442
+ RetryOnPrimaryException (ShardId shardId , String msg , Throwable cause ) {
435
443
super (msg , cause );
436
444
setShard (shardId );
437
445
}
0 commit comments