15
15
import org .elasticsearch .action .ActionListener ;
16
16
import org .elasticsearch .action .UnavailableShardsException ;
17
17
import org .elasticsearch .action .support .ActiveShardCount ;
18
+ import org .elasticsearch .action .support .RefCountingListener ;
18
19
import org .elasticsearch .action .support .RetryableAction ;
19
20
import org .elasticsearch .action .support .TransportActions ;
20
21
import org .elasticsearch .cluster .action .shard .ShardStateAction ;
@@ -56,16 +57,16 @@ public class ReplicationOperation<
56
57
private final String opType ;
57
58
private final AtomicInteger totalShards = new AtomicInteger ();
58
59
/**
59
- * The number of pending sub-operations in this operation. This is incremented when the following operations start and decremented when
60
- * they complete:
60
+ * A {@link RefCountingListener} that encapsulates the pending sub-operations in this operation. The references are incremented when
61
+ * the following operations start and decremented when they complete:
61
62
* <ul>
62
63
* <li>The operation on the primary</li>
63
64
* <li>The operation on each replica</li>
64
65
* <li>Coordination of the operation as a whole. This prevents the operation from terminating early if we haven't started any replica
65
66
* operations and the primary finishes.</li>
66
67
* </ul>
67
68
*/
68
- private final AtomicInteger pendingActions = new AtomicInteger () ;
69
+ private final RefCountingListener pendingActionsListener ;
69
70
private final AtomicInteger successfulShards = new AtomicInteger ();
70
71
private final Primary <Request , ReplicaRequest , PrimaryResultT > primary ;
71
72
private final Replicas <ReplicaRequest > replicasProxy ;
@@ -74,9 +75,6 @@ public class ReplicationOperation<
74
75
private final TimeValue retryTimeout ;
75
76
private final long primaryTerm ;
76
77
77
- // exposed for tests
78
- private final ActionListener <PrimaryResultT > resultListener ;
79
-
80
78
private volatile PrimaryResultT primaryResult = null ;
81
79
82
80
private final List <ReplicationResponse .ShardInfo .Failure > shardReplicaFailures = Collections .synchronizedList (new ArrayList <>());
@@ -95,7 +93,16 @@ public ReplicationOperation(
95
93
) {
96
94
this .replicasProxy = replicas ;
97
95
this .primary = primary ;
98
- this .resultListener = listener ;
96
+ this .pendingActionsListener = new RefCountingListener (ActionListener .wrap ((ignored ) -> {
97
+ primaryResult .setShardInfo (
98
+ ReplicationResponse .ShardInfo .of (
99
+ totalShards .get (),
100
+ successfulShards .get (),
101
+ shardReplicaFailures .toArray (ReplicationResponse .NO_FAILURES )
102
+ )
103
+ );
104
+ listener .onResponse (primaryResult );
105
+ }, listener ::onFailure ));
99
106
this .logger = logger ;
100
107
this .threadPool = threadPool ;
101
108
this .request = request ;
@@ -106,28 +113,34 @@ public ReplicationOperation(
106
113
}
107
114
108
115
public void execute () throws Exception {
109
- final String activeShardCountFailure = checkActiveShardCount ();
110
- final ShardRouting primaryRouting = primary .routingEntry ();
111
- final ShardId primaryId = primaryRouting .shardId ();
112
- if (activeShardCountFailure != null ) {
113
- finishAsFailed (
114
- new UnavailableShardsException (
115
- primaryId ,
116
- "{} Timeout: [{}], request: [{}]" ,
117
- activeShardCountFailure ,
118
- request .timeout (),
119
- request
120
- )
121
- );
122
- return ;
123
- }
116
+ try (pendingActionsListener ) {
117
+ final String activeShardCountFailure = checkActiveShardCount ();
118
+ final ShardRouting primaryRouting = primary .routingEntry ();
119
+ final ShardId primaryId = primaryRouting .shardId ();
120
+ if (activeShardCountFailure != null ) {
121
+ pendingActionsListener .acquire ()
122
+ .onFailure (
123
+ new UnavailableShardsException (
124
+ primaryId ,
125
+ "{} Timeout: [{}], request: [{}]" ,
126
+ activeShardCountFailure ,
127
+ request .timeout (),
128
+ request
129
+ )
130
+ );
131
+ return ;
132
+ }
124
133
125
- totalShards .incrementAndGet ();
126
- pendingActions .incrementAndGet (); // increase by 1 until we finish all primary coordination
127
- primary .perform (request , ActionListener .wrap (this ::handlePrimaryResult , this ::finishAsFailed ));
134
+ totalShards .incrementAndGet ();
135
+ var primaryCoordinationPendingActionListener = pendingActionsListener .acquire (); // triggered when we finish all primary
136
+ // coordination
137
+ primary .perform (request , ActionListener .wrap (primaryResult -> {
138
+ handlePrimaryResult (primaryResult , primaryCoordinationPendingActionListener );
139
+ }, primaryCoordinationPendingActionListener ::onFailure ));
140
+ }
128
141
}
129
142
130
- private void handlePrimaryResult (final PrimaryResultT primaryResult ) {
143
+ private void handlePrimaryResult (final PrimaryResultT primaryResult , ActionListener < Void > primaryCoordinationPendingActionListener ) {
131
144
this .primaryResult = primaryResult ;
132
145
final ReplicaRequest replicaRequest = primaryResult .replicaRequest ();
133
146
if (replicaRequest != null ) {
@@ -136,11 +149,11 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
136
149
}
137
150
final ReplicationGroup replicationGroup = primary .getReplicationGroup ();
138
151
139
- pendingActions . incrementAndGet ();
152
+ var primaryOperationPendingActionListener = pendingActionsListener . acquire ();
140
153
replicasProxy .onPrimaryOperationComplete (
141
154
replicaRequest ,
142
155
replicationGroup .getRoutingTable (),
143
- ActionListener .wrap (ignored -> decPendingAndFinishIfNeeded ( ), exception -> {
156
+ ActionListener .wrap (ignored -> primaryOperationPendingActionListener . onResponse ( null ), exception -> {
144
157
totalShards .incrementAndGet ();
145
158
shardReplicaFailures .add (
146
159
new ReplicationResponse .ShardInfo .Failure (
@@ -151,7 +164,7 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
151
164
false
152
165
)
153
166
);
154
- decPendingAndFinishIfNeeded ( );
167
+ primaryOperationPendingActionListener . onResponse ( null );
155
168
})
156
169
);
157
170
@@ -181,7 +194,7 @@ public void onResponse(Void aVoid) {
181
194
primary .routingEntry (),
182
195
primary ::localCheckpoint ,
183
196
primary ::globalCheckpoint ,
184
- () -> decPendingAndFinishIfNeeded ( )
197
+ () -> primaryCoordinationPendingActionListener . onResponse ( null )
185
198
);
186
199
}
187
200
@@ -193,20 +206,28 @@ public void onFailure(Exception e) {
193
206
// We update the checkpoints since a refresh might fail but the operations could be safely persisted, in the case that the
194
207
// fsync failed the local checkpoint won't advance and the engine will be marked as failed when the next indexing operation
195
208
// is appended into the translog.
196
- updateCheckPoints (primary .routingEntry (), primary ::localCheckpoint , primary ::globalCheckpoint , () -> finishAsFailed (e ));
209
+ updateCheckPoints (
210
+ primary .routingEntry (),
211
+ primary ::localCheckpoint ,
212
+ primary ::globalCheckpoint ,
213
+ () -> primaryCoordinationPendingActionListener .onFailure (e )
214
+ );
197
215
}
198
216
});
199
217
}
200
218
201
219
private void markUnavailableShardsAsStale (ReplicaRequest replicaRequest , ReplicationGroup replicationGroup ) {
202
220
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
203
221
for (String allocationId : replicationGroup .getUnavailableInSyncShards ()) {
204
- pendingActions . incrementAndGet ();
222
+ var staleCopyPendingActionListener = pendingActionsListener . acquire ();
205
223
replicasProxy .markShardCopyAsStaleIfNeeded (
206
224
replicaRequest .shardId (),
207
225
allocationId ,
208
226
primaryTerm ,
209
- ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary )
227
+ ActionListener .wrap (
228
+ r -> staleCopyPendingActionListener .onResponse (null ),
229
+ e -> onNoLongerPrimary (e , staleCopyPendingActionListener )
230
+ )
210
231
);
211
232
}
212
233
}
@@ -243,12 +264,17 @@ private void performOnReplica(
243
264
logger .trace ("[{}] sending op [{}] to replica {} for request [{}]" , shard .shardId (), opType , shard , replicaRequest );
244
265
}
245
266
totalShards .incrementAndGet ();
246
- pendingActions . incrementAndGet ();
267
+ var replicationPendingActionListener = pendingActionsListener . acquire ();
247
268
final ActionListener <ReplicaResponse > replicationListener = new ActionListener <>() {
248
269
@ Override
249
270
public void onResponse (ReplicaResponse response ) {
250
271
successfulShards .incrementAndGet ();
251
- updateCheckPoints (shard , response ::localCheckpoint , response ::globalCheckpoint , () -> decPendingAndFinishIfNeeded ());
272
+ updateCheckPoints (
273
+ shard ,
274
+ response ::localCheckpoint ,
275
+ response ::globalCheckpoint ,
276
+ () -> replicationPendingActionListener .onResponse (null )
277
+ );
252
278
}
253
279
254
280
@ Override
@@ -282,7 +308,10 @@ public void onFailure(Exception replicaException) {
282
308
primaryTerm ,
283
309
message ,
284
310
replicaException ,
285
- ActionListener .wrap (r -> decPendingAndFinishIfNeeded (), ReplicationOperation .this ::onNoLongerPrimary )
311
+ ActionListener .wrap (
312
+ r -> replicationPendingActionListener .onResponse (null ),
313
+ e -> onNoLongerPrimary (e , replicationPendingActionListener )
314
+ )
286
315
);
287
316
}
288
317
@@ -369,13 +398,13 @@ public void onAfter() {
369
398
}
370
399
}
371
400
372
- private void onNoLongerPrimary (Exception failure ) {
401
+ private void onNoLongerPrimary (Exception failure , ActionListener < Void > pendingActionlistener ) {
373
402
final Throwable cause = ExceptionsHelper .unwrapCause (failure );
374
403
final boolean nodeIsClosing = cause instanceof NodeClosedException ;
375
404
if (nodeIsClosing ) {
376
405
// We prefer not to fail the primary to avoid unnecessary warning log
377
406
// when the node with the primary shard is gracefully shutting down.
378
- finishAsFailed (
407
+ pendingActionlistener . onFailure (
379
408
new RetryOnPrimaryException (
380
409
primary .routingEntry ().shardId (),
381
410
String .format (
@@ -398,7 +427,7 @@ protected void doRun() {
398
427
primary .routingEntry ()
399
428
);
400
429
primary .failShard (message , failure );
401
- finishAsFailed (new RetryOnPrimaryException (primary .routingEntry ().shardId (), message , failure ));
430
+ pendingActionlistener . onFailure (new RetryOnPrimaryException (primary .routingEntry ().shardId (), message , failure ));
402
431
}
403
432
404
433
@ Override
@@ -411,7 +440,7 @@ public void onFailure(Exception e) {
411
440
e .addSuppressed (failure );
412
441
assert false : e ;
413
442
logger .error (() -> "unexpected failure while failing primary [" + primary .routingEntry () + "]" , e );
414
- finishAsFailed (
443
+ pendingActionlistener . onFailure (
415
444
new RetryOnPrimaryException (
416
445
primary .routingEntry ().shardId (),
417
446
String .format (Locale .ROOT , "unexpected failure while failing primary [%s]" , primary .routingEntry ()),
@@ -461,32 +490,6 @@ protected String checkActiveShardCount() {
461
490
}
462
491
}
463
492
464
- private void decPendingAndFinishIfNeeded () {
465
- assert pendingActions .get () > 0 : "pending action count goes below 0 for request [" + request + "]" ;
466
- if (pendingActions .decrementAndGet () == 0 ) {
467
- finish ();
468
- }
469
- }
470
-
471
- private void finish () {
472
- if (finished .compareAndSet (false , true )) {
473
- primaryResult .setShardInfo (
474
- ReplicationResponse .ShardInfo .of (
475
- totalShards .get (),
476
- successfulShards .get (),
477
- shardReplicaFailures .toArray (ReplicationResponse .NO_FAILURES )
478
- )
479
- );
480
- resultListener .onResponse (primaryResult );
481
- }
482
- }
483
-
484
- private void finishAsFailed (Exception exception ) {
485
- if (finished .compareAndSet (false , true )) {
486
- resultListener .onFailure (exception );
487
- }
488
- }
489
-
490
493
/**
491
494
* An encapsulation of an operation that is to be performed on the primary shard
492
495
*/
@@ -693,7 +696,7 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
693
696
694
697
/**
695
698
* Run actions to be triggered post replication
696
- * @param listener calllback that is invoked after post replication actions have completed
699
+ * @param listener callback that is invoked after post replication actions have completed
697
700
* */
698
701
void runPostReplicationActions (ActionListener <Void > listener );
699
702
}
0 commit comments