Skip to content

Commit c29cebb

Browse files
authored
Inline TransportReplAction#registerRequestHandlers (#40762)
It is important that resync actions are not rejected on the primary even if its `write` threadpool is overloaded. Today we do this by exposing `registerRequestHandlers` to subclasses and overriding it in `TransportResyncReplicationAction`. This isn't ideal because it obscures the difference between this action and other replication actions, and also might allow subclasses to try and use some state before they are properly initialised. This change replaces this override with a constructor parameter to solve these issues. Relates #40706
1 parent ffc0b9b commit c29cebb

File tree

7 files changed

+23
-39
lines changed

7 files changed

+23
-39
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
9292
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
9393
IndexNameExpressionResolver indexNameExpressionResolver) {
9494
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
95-
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
95+
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
9696
this.threadPool = threadPool;
9797
this.updateHelper = updateHelper;
9898
this.mappingUpdatedAction = mappingUpdatedAction;

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.transport.TransportService;
4848

4949
import java.io.IOException;
50-
import java.util.function.Supplier;
5150

5251
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
5352
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
@@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
6059
ShardStateAction shardStateAction, ActionFilters actionFilters,
6160
IndexNameExpressionResolver indexNameExpressionResolver) {
6261
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
63-
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
64-
}
65-
66-
@Override
67-
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
68-
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
69-
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
70-
// we should never reject resync because of thread pool capacity on primary
71-
transportService.registerRequestHandler(transportPrimaryAction,
72-
() -> new ConcreteShardRequest<>(request),
73-
executor, true, true,
74-
this::handlePrimaryRequest);
75-
transportService.registerRequestHandler(transportReplicaAction,
76-
() -> new ConcreteReplicaRequest<>(replicaRequest),
77-
executor, true, true,
78-
this::handleReplicaRequest);
62+
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
63+
true /* we should never reject resync because of thread pool capacity on primary */);
7964
}
8065

8166
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+11-13
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
122122
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
123123
Supplier<ReplicaRequest> replicaRequest, String executor) {
124124
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
125-
indexNameExpressionResolver, request, replicaRequest, executor, false);
125+
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
126126
}
127127

128128

@@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
132132
ActionFilters actionFilters,
133133
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
134134
Supplier<ReplicaRequest> replicaRequest, String executor,
135-
boolean syncGlobalCheckpointAfterOperation) {
135+
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
136136
super(actionName, actionFilters, transportService.getTaskManager());
137137
this.threadPool = threadPool;
138138
this.transportService = transportService;
@@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
144144

145145
this.transportPrimaryAction = actionName + "[p]";
146146
this.transportReplicaAction = actionName + "[r]";
147-
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
148147

149-
this.transportOptions = transportOptions(settings);
148+
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
150149

151-
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
152-
}
150+
transportService.registerRequestHandler(transportPrimaryAction,
151+
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);
153152

154-
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
155-
Supplier<ReplicaRequest> replicaRequest, String executor) {
156-
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
157-
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
158-
this::handlePrimaryRequest);
159153
// we must never reject on because of thread pool capacity on replicas
160-
transportService.registerRequestHandler(
161-
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
154+
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
155+
executor, true, true, this::handleReplicaRequest);
156+
157+
this.transportOptions = transportOptions(settings);
158+
159+
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
162160
}
163161

164162
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
6060
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
6161

6262
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
63-
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
64-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
65-
Supplier<ReplicaRequest> replicaRequest, String executor) {
63+
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
64+
ShardStateAction shardStateAction, ActionFilters actionFilters,
65+
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
66+
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
6667
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
67-
indexNameExpressionResolver, request, replicaRequest, executor, true);
68+
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
6869
}
6970

7071
/** Syncs operation result to the translog or throws a shard not available failure */

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public RetentionLeaseSyncAction(
8888
indexNameExpressionResolver,
8989
RetentionLeaseSyncAction.Request::new,
9090
RetentionLeaseSyncAction.Request::new,
91-
ThreadPool.Names.MANAGEMENT);
91+
ThreadPool.Names.MANAGEMENT, false);
9292
}
9393

9494
/**

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
402402
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
403403
x -> null, null, Collections.emptySet()), null, null, null, null,
404404
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
405-
TestRequest::new, ThreadPool.Names.SAME);
405+
TestRequest::new, ThreadPool.Names.SAME, false);
406406
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
407407
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
408408
}
@@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
412412
super(settings, actionName, transportService, clusterService,
413413
mockIndicesService(clusterService), threadPool, shardStateAction,
414414
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
415-
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
415+
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
416416
this.withDocumentFailureOnPrimary = false;
417417
this.withDocumentFailureOnReplica = false;
418418
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction(
5757
indexNameExpressionResolver,
5858
BulkShardOperationsRequest::new,
5959
BulkShardOperationsRequest::new,
60-
ThreadPool.Names.WRITE);
60+
ThreadPool.Names.WRITE, false);
6161
}
6262

6363
@Override

0 commit comments

Comments
 (0)