29
29
import org .elasticsearch .action .bulk .BulkItemResponse ;
30
30
import org .elasticsearch .action .bulk .BulkShardRequest ;
31
31
import org .elasticsearch .action .bulk .BulkShardResponse ;
32
+ import org .elasticsearch .action .bulk .MappingUpdatePerformer ;
32
33
import org .elasticsearch .action .bulk .TransportShardBulkAction ;
33
- import org .elasticsearch .action .bulk .TransportShardBulkActionTests ;
34
34
import org .elasticsearch .action .delete .DeleteRequest ;
35
35
import org .elasticsearch .action .index .IndexRequest ;
36
36
import org .elasticsearch .action .resync .ResyncReplicationRequest ;
@@ -595,7 +595,8 @@ class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardReques
595
595
596
596
@ Override
597
597
protected PrimaryResult performOnPrimary (IndexShard primary , BulkShardRequest request ) throws Exception {
598
- final TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > result = executeShardBulkOnPrimary (primary , request );
598
+ final TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse >
599
+ result = executeShardBulkOnPrimary (primary , request );
599
600
return new PrimaryResult (result .replicaRequest (), result .finalResponseIfSuccessful );
600
601
}
601
602
@@ -605,7 +606,8 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th
605
606
}
606
607
}
607
608
608
- private TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > executeShardBulkOnPrimary (IndexShard primary , BulkShardRequest request ) throws Exception {
609
+ private TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > executeShardBulkOnPrimary (
610
+ IndexShard primary , BulkShardRequest request ) throws Exception {
609
611
for (BulkItemRequest itemRequest : request .items ()) {
610
612
if (itemRequest .request () instanceof IndexRequest ) {
611
613
((IndexRequest ) itemRequest .request ()).process (Version .CURRENT , null , index .getName ());
@@ -615,8 +617,8 @@ private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardRespo
615
617
primary .acquirePrimaryOperationPermit (permitAcquiredFuture , ThreadPool .Names .SAME , request );
616
618
final TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > result ;
617
619
try (Releasable ignored = permitAcquiredFuture .actionGet ()) {
618
- result = TransportShardBulkAction . performOnPrimary ( request , primary , null , System :: currentTimeMillis ,
619
- new TransportShardBulkActionTests . NoopMappingUpdatePerformer () );
620
+ MappingUpdatePerformer noopMappingUpdater = ( update , shardId , type ) -> { };
621
+ result = TransportShardBulkAction . performOnPrimary ( request , primary , null , System :: currentTimeMillis , noopMappingUpdater );
620
622
}
621
623
TransportWriteActionTestHelper .performPostWriteActions (primary , request , result .location , logger );
622
624
return result ;
@@ -629,9 +631,11 @@ BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request
629
631
return executeShardBulkOnPrimary (primary , bulkShardRequest ).replicaRequest ();
630
632
}
631
633
632
- private void executeShardBulkOnReplica (BulkShardRequest request , IndexShard replica , long operationPrimaryTerm , long globalCheckpointOnPrimary ) throws Exception {
634
+ private void executeShardBulkOnReplica (BulkShardRequest request , IndexShard replica , long operationPrimaryTerm ,
635
+ long globalCheckpointOnPrimary ) throws Exception {
633
636
final PlainActionFuture <Releasable > permitAcquiredFuture = new PlainActionFuture <>();
634
- replica .acquireReplicaOperationPermit (operationPrimaryTerm , globalCheckpointOnPrimary , permitAcquiredFuture , ThreadPool .Names .SAME , request );
637
+ replica .acquireReplicaOperationPermit (
638
+ operationPrimaryTerm , globalCheckpointOnPrimary , permitAcquiredFuture , ThreadPool .Names .SAME , request );
635
639
final Translog .Location location ;
636
640
try (Releasable ignored = permitAcquiredFuture .actionGet ()) {
637
641
location = TransportShardBulkAction .performOnReplica (request , replica );
@@ -695,8 +699,8 @@ protected void performOnReplica(final GlobalCheckpointSyncAction.Request request
695
699
696
700
class ResyncAction extends ReplicationAction <ResyncReplicationRequest , ResyncReplicationRequest , ResyncReplicationResponse > {
697
701
698
- ResyncAction (ResyncReplicationRequest request , ActionListener <ResyncReplicationResponse > listener , ReplicationGroup replicationGroup ) {
699
- super (request , listener , replicationGroup , "resync" );
702
+ ResyncAction (ResyncReplicationRequest request , ActionListener <ResyncReplicationResponse > listener , ReplicationGroup group ) {
703
+ super (request , listener , group , "resync" );
700
704
}
701
705
702
706
@ Override
0 commit comments