diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index f74ffdc4b4dc4..ad046dddc0c27 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -456,6 +456,10 @@ public void onFailure(Exception e) { } } + IndexShard getPrimaryShard() { + return replicationGroup.primary; + } + protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; @@ -592,7 +596,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest re @Override protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { - executeShardBulkOnReplica(replica, request); + executeShardBulkOnReplica(request, replica, getPrimaryShard().getPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint()); } } @@ -602,15 +606,24 @@ private TransportWriteAction.WritePrimaryResult result = - TransportShardBulkAction.performOnPrimary(request, primary, null, - System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); + final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); + primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); + final TransportWriteAction.WritePrimaryResult result; + try (Releasable ignored = permitAcquiredFuture.actionGet()) { + result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, + new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); + } TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); return result; } - private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception { - final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica); + private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception { + final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request); + final Translog.Location location; + try (Releasable ignored = permitAcquiredFuture.actionGet()) { + location = TransportShardBulkAction.performOnReplica(request, replica); + } TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); } @@ -630,8 +643,8 @@ BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws /** * indexes the given requests on the supplied replica shard */ - void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { - executeShardBulkOnReplica(replica, request); + void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception { + executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint()); } class GlobalCheckpointSync extends ReplicationAction< diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 8c15a2a84ddb8..86436d8d88ac9 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -209,7 +209,7 @@ public void testConflictingOpsOnReplica() throws Exception { logger.info("--> isolated replica " + replica1.routingEntry()); BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary()); for (int i = 1; i < replicas.size(); i++) { - indexOnReplica(replicationRequest, replicas.get(i)); + indexOnReplica(replicationRequest, shards, replicas.get(i)); } logger.info("--> promoting replica to primary " + replica1.routingEntry()); @@ -318,7 +318,7 @@ public void testSeqNoCollision() throws Exception { logger.info("--> Isolate replica1"); IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); - indexOnReplica(replicationRequest, replica2); + indexOnReplica(replicationRequest, shards, replica2); final Translog.Operation op1; final List initOperations = new ArrayList<>(initDocs); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index dcfa2cb34a2db..66e2a09750a2d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -236,7 +236,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i) .source("{}", XContentType.JSON); final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(bulkShardRequest, replica); + indexOnReplica(bulkShardRequest, shards, replica); } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); @@ -326,7 +326,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i) .source("{}", XContentType.JSON); final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(bulkShardRequest, replica); + indexOnReplica(bulkShardRequest, shards, replica); } shards.flush(); shards.promoteReplicaToPrimary(newPrimary).get(); @@ -374,7 +374,7 @@ public void testResyncAfterPrimaryPromotion() throws Exception { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i) .source("{}", XContentType.JSON); final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(bulkShardRequest, newPrimary); + indexOnReplica(bulkShardRequest, shards, newPrimary); } logger.info("--> resyncing replicas"); PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();