Skip to content

Commit 8bf1856

Browse files
committed
TEST: write ops should execute under shard permit (#28966)
Currently ESIndexLevelReplicationTestCase executes write operations without acquiring index shard permit. This may prevent the primary term on replica from being updated or cause a race between resync and indexing on primary. This commit ensures that write operations are always executed under shard permit like the production code.
1 parent 1b8ecf8 commit 8bf1856

File tree

3 files changed

+26
-13
lines changed

3 files changed

+26
-13
lines changed

server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

+21-8
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,10 @@ public void onFailure(Exception e) {
456456
}
457457
}
458458

459+
IndexShard getPrimaryShard() {
460+
return replicationGroup.primary;
461+
}
462+
459463
protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception;
460464

461465
protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception;
@@ -592,7 +596,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest re
592596

593597
@Override
594598
protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
595-
executeShardBulkOnReplica(replica, request);
599+
executeShardBulkOnReplica(request, replica, getPrimaryShard().getPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
596600
}
597601
}
598602

@@ -602,15 +606,24 @@ private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardRespo
602606
((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName());
603607
}
604608
}
605-
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
606-
TransportShardBulkAction.performOnPrimary(request, primary, null,
607-
System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer());
609+
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
610+
primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request);
611+
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result;
612+
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
613+
result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis,
614+
new TransportShardBulkActionTests.NoopMappingUpdatePerformer());
615+
}
608616
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);
609617
return result;
610618
}
611619

612-
private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception {
613-
final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica);
620+
private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception {
621+
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
622+
replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request);
623+
final Translog.Location location;
624+
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
625+
location = TransportShardBulkAction.performOnReplica(request, replica);
626+
}
614627
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
615628
}
616629

@@ -630,8 +643,8 @@ BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws
630643
/**
631644
* indexes the given requests on the supplied replica shard
632645
*/
633-
void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
634-
executeShardBulkOnReplica(replica, request);
646+
void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
647+
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
635648
}
636649

637650
class GlobalCheckpointSync extends ReplicationAction<

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public void testConflictingOpsOnReplica() throws Exception {
209209
logger.info("--> isolated replica " + replica1.routingEntry());
210210
BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary());
211211
for (int i = 1; i < replicas.size(); i++) {
212-
indexOnReplica(replicationRequest, replicas.get(i));
212+
indexOnReplica(replicationRequest, shards, replicas.get(i));
213213
}
214214

215215
logger.info("--> promoting replica to primary " + replica1.routingEntry());
@@ -318,7 +318,7 @@ public void testSeqNoCollision() throws Exception {
318318
logger.info("--> Isolate replica1");
319319
IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
320320
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
321-
indexOnReplica(replicationRequest, replica2);
321+
indexOnReplica(replicationRequest, shards, replica2);
322322

323323
final Translog.Operation op1;
324324
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
236236
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i)
237237
.source("{}", XContentType.JSON);
238238
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
239-
indexOnReplica(bulkShardRequest, replica);
239+
indexOnReplica(bulkShardRequest, shards, replica);
240240
}
241241
if (randomBoolean()) {
242242
oldPrimary.flush(new FlushRequest(index.getName()));
@@ -326,7 +326,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception {
326326
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i)
327327
.source("{}", XContentType.JSON);
328328
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
329-
indexOnReplica(bulkShardRequest, replica);
329+
indexOnReplica(bulkShardRequest, shards, replica);
330330
}
331331
shards.flush();
332332
shards.promoteReplicaToPrimary(newPrimary).get();
@@ -374,7 +374,7 @@ public void testResyncAfterPrimaryPromotion() throws Exception {
374374
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i)
375375
.source("{}", XContentType.JSON);
376376
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
377-
indexOnReplica(bulkShardRequest, newPrimary);
377+
indexOnReplica(bulkShardRequest, shards, newPrimary);
378378
}
379379
logger.info("--> resyncing replicas");
380380
PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();

0 commit comments

Comments
 (0)