Skip to content

Commit 0990058

Browse files
committed
Cause some recoveries to fail
1 parent e337bf7 commit 0990058

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ public ReplicationTracker(
638638
final LongSupplier currentTimeMillisSupplier,
639639
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
640640
super(shardId, indexSettings);
641+
logger.info("--> creating new ReplicationTracker");
641642
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
642643
this.shardAllocationId = allocationId;
643644
this.primaryMode = false;

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import org.elasticsearch.ElasticsearchException;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -31,9 +32,12 @@
3132
import org.elasticsearch.index.shard.IndexShard;
3233
import org.elasticsearch.index.shard.ShardId;
3334
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
3436
import org.elasticsearch.plugins.Plugin;
3537
import org.elasticsearch.test.ESIntegTestCase;
38+
import org.elasticsearch.test.transport.MockTransportService;
3639
import org.elasticsearch.threadpool.ThreadPool;
40+
import org.elasticsearch.transport.TransportService;
3741

3842
import java.io.Closeable;
3943
import java.util.ArrayList;
@@ -43,6 +47,7 @@
4347
import java.util.List;
4448
import java.util.Map;
4549
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.Semaphore;
4651
import java.util.concurrent.TimeUnit;
4752
import java.util.concurrent.atomic.AtomicBoolean;
4853
import java.util.concurrent.atomic.AtomicReference;
@@ -73,7 +78,7 @@ public List<Setting<?>> getSettings() {
7378
protected Collection<Class<? extends Plugin>> nodePlugins() {
7479
return Stream.concat(
7580
super.nodePlugins().stream(),
76-
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
81+
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class, MockTransportService.TestPlugin.class))
7782
.collect(Collectors.toList());
7883
}
7984

@@ -355,6 +360,17 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
355360
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
356361
}
357362

363+
// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
364+
final Semaphore recoveriesToDisrupt = new Semaphore(randomIntBetween(0, 4));
365+
final MockTransportService transportService
366+
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
367+
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
368+
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
369+
throw new ElasticsearchException("failing recovery for test");
370+
}
371+
connection.sendRequest(requestId, action, request, options);
372+
});
373+
358374
// now allow the replicas to be allocated and wait for recovery to finalize
359375
allowNodes("index", 1 + numberOfReplicas);
360376
ensureGreen("index");

0 commit comments

Comments
 (0)