Skip to content

Commit 042b528

Browse files
DaveCTurnerGurkan Kaymak
authored and
Gurkan Kaymak
committed
Inline TransportReplAct#createReplicatedOperation (elastic#41197)
`TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation` exists so it can be overridden in tests. This commit re-works these tests to use a real `ReplicationOperation` and inlines the now-unnecessary method. Relates elastic#40706.
1 parent 671c4c4 commit 042b528

File tree

3 files changed

+93
-132
lines changed

3 files changed

+93
-132
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public class ReplicationOperation<
7474
private final long primaryTerm;
7575

7676
// exposed for tests
77-
final ActionListener<PrimaryResultT> resultListener;
77+
private final ActionListener<PrimaryResultT> resultListener;
7878

7979
private volatile PrimaryResultT primaryResult = null;
8080

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+27-35
Original file line numberDiff line numberDiff line change
@@ -357,37 +357,35 @@ public void handleException(TransportException exp) {
357357
});
358358
} else {
359359
setPhase(replicationTask, "primary");
360-
createReplicatedOperation(primaryRequest.getRequest(),
361-
ActionListener.wrap(result -> result.respond(
362-
new ActionListener<>() {
363-
@Override
364-
public void onResponse(Response response) {
365-
if (syncGlobalCheckpointAfterOperation) {
366-
final IndexShard shard = primaryShardReference.indexShard;
367-
try {
368-
shard.maybeSyncGlobalCheckpoint("post-operation");
369-
} catch (final Exception e) {
370-
// only log non-closed exceptions
371-
if (ExceptionsHelper.unwrap(
372-
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
373-
// intentionally swallow, a missed global checkpoint sync should not fail this operation
374-
logger.info(
375-
new ParameterizedMessage(
376-
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
377-
}
378-
}
379-
}
380-
primaryShardReference.close(); // release shard operation lock before responding to caller
381-
setPhase(replicationTask, "finished");
382-
onCompletionListener.onResponse(response);
383-
}
384360

385-
@Override
386-
public void onFailure(Exception e) {
387-
handleException(primaryShardReference, e);
361+
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
362+
primaryShardReference.close(); // release shard operation lock before responding to caller
363+
setPhase(replicationTask, "finished");
364+
onCompletionListener.onResponse(response);
365+
}, e -> handleException(primaryShardReference, e));
366+
367+
final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
368+
if (syncGlobalCheckpointAfterOperation) {
369+
final IndexShard shard = primaryShardReference.indexShard;
370+
try {
371+
shard.maybeSyncGlobalCheckpoint("post-operation");
372+
} catch (final Exception e) {
373+
// only log non-closed exceptions
374+
if (ExceptionsHelper.unwrap(
375+
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
376+
// intentionally swallow, a missed global checkpoint sync should not fail this operation
377+
logger.info(
378+
new ParameterizedMessage(
379+
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
388380
}
389-
}), e -> handleException(primaryShardReference, e)
390-
), primaryShardReference).execute();
381+
}
382+
}
383+
referenceClosingListener.onResponse(response);
384+
}, referenceClosingListener::onFailure);
385+
386+
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
387+
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
388+
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
391389
}
392390
} catch (Exception e) {
393391
handleException(primaryShardReference, e);
@@ -405,12 +403,6 @@ public void onFailure(Exception e) {
405403
onCompletionListener.onFailure(e);
406404
}
407405

408-
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
409-
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
410-
PrimaryShardReference primaryShardReference) {
411-
return new ReplicationOperation<>(request, primaryShardReference, listener,
412-
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm());
413-
}
414406
}
415407

416408
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,

0 commit comments

Comments
 (0)