@@ -144,21 +144,21 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
144
144
145
145
this .transportPrimaryAction = actionName + "[p]" ;
146
146
this .transportReplicaAction = actionName + "[r]" ;
147
- registerRequestHandlers (actionName , transportService , request , replicaRequest , executor );
147
+
148
+ transportService .registerRequestHandler (actionName , request , ThreadPool .Names .SAME , this ::handleOperationRequest );
149
+ transportService .registerRequestHandler (transportPrimaryAction , () -> new ConcreteShardRequest <>(request ), executor , true ,
150
+ forcePrimaryActionExecution (), this ::handlePrimaryRequest );
151
+ // we must never reject on because of thread pool capacity on replicas
152
+ transportService .registerRequestHandler (
153
+ transportReplicaAction , () -> new ConcreteReplicaRequest <>(replicaRequest ), executor , true , true , this ::handleReplicaRequest );
148
154
149
155
this .transportOptions = transportOptions (settings );
150
156
151
157
this .syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation ;
152
158
}
153
159
154
- protected void registerRequestHandlers (String actionName , TransportService transportService , Supplier <Request > request ,
155
- Supplier <ReplicaRequest > replicaRequest , String executor ) {
156
- transportService .registerRequestHandler (actionName , request , ThreadPool .Names .SAME , this ::handleOperationRequest );
157
- transportService .registerRequestHandler (transportPrimaryAction , () -> new ConcreteShardRequest <>(request ), executor ,
158
- this ::handlePrimaryRequest );
159
- // we must never reject on because of thread pool capacity on replicas
160
- transportService .registerRequestHandler (
161
- transportReplicaAction , () -> new ConcreteReplicaRequest <>(replicaRequest ), executor , true , true , this ::handleReplicaRequest );
160
+ protected boolean forcePrimaryActionExecution () {
161
+ return false ;
162
162
}
163
163
164
164
@ Override
0 commit comments