Skip to content

Commit 738ab4e

Browse files
committed
test recovery cancelled during phase1
1 parent 3aaa986 commit 738ab4e

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

server/src/main/java/org/elasticsearch/action/StepListener.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,20 @@
5050
* }</pre>
5151
*/
5252

53-
public final class StepListener<Response> implements ActionListener<Response> {
53+
public final class StepListener<Response> extends NotifyOnceListener<Response> {
5454
private final ListenableFuture<Response> delegate;
5555

5656
public StepListener() {
5757
this.delegate = new ListenableFuture<>();
5858
}
5959

6060
@Override
61-
public void onResponse(Response response) {
61+
protected void innerOnResponse(Response response) {
6262
delegate.onResponse(response);
6363
}
6464

6565
@Override
66-
public void onFailure(Exception e) {
66+
protected void innerOnFailure(Exception e) {
6767
delegate.onFailure(e);
6868
}
6969

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

+66
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.Version;
3838
import org.elasticsearch.action.ActionListener;
3939
import org.elasticsearch.action.LatchedActionListener;
40+
import org.elasticsearch.action.StepListener;
4041
import org.elasticsearch.action.support.PlainActionFuture;
4142
import org.elasticsearch.cluster.metadata.IndexMetaData;
4243
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -615,6 +616,71 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
615616
store.close();
616617
}
617618

619+
public void testCancelRecoveryDuringPhase1() throws Exception {
620+
Store store = newStore(createTempDir("source"), false);
621+
IndexShard shard = mock(IndexShard.class);
622+
when(shard.store()).thenReturn(store);
623+
Directory dir = store.directory();
624+
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
625+
int numDocs = randomIntBetween(10, 100);
626+
for (int i = 0; i < numDocs; i++) {
627+
Document document = new Document();
628+
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
629+
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
630+
writer.addDocument(document);
631+
}
632+
writer.commit();
633+
writer.close();
634+
AtomicBoolean wasCancelled = new AtomicBoolean();
635+
SetOnce<Runnable> cancelRecovery = new SetOnce<>();
636+
final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
637+
@Override
638+
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
639+
List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
640+
recoveryExecutor.execute(() -> listener.onResponse(null));
641+
if (randomBoolean()) {
642+
wasCancelled.set(true);
643+
cancelRecovery.get().run();
644+
}
645+
}
646+
647+
@Override
648+
public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content,
649+
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
650+
recoveryExecutor.execute(() -> listener.onResponse(null));
651+
if (rarely()) {
652+
wasCancelled.set(true);
653+
cancelRecovery.get().run();
654+
}
655+
}
656+
657+
@Override
658+
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
659+
ActionListener<Void> listener) {
660+
recoveryExecutor.execute(() -> listener.onResponse(null));
661+
if (randomBoolean()) {
662+
wasCancelled.set(true);
663+
cancelRecovery.get().run();
664+
}
665+
}
666+
};
667+
final RecoverySourceHandler handler = new RecoverySourceHandler(
668+
shard, recoveryTarget, threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4));
669+
cancelRecovery.set(() -> handler.cancel("test"));
670+
final StepListener<RecoverySourceHandler.SendFileResult> phase1Listener = new StepListener<>();
671+
try {
672+
final CountDownLatch latch = new CountDownLatch(1);
673+
handler.phase1(DirectoryReader.listCommits(dir).get(0), randomNonNegativeLong(), () -> 0,
674+
new LatchedActionListener<>(phase1Listener, latch));
675+
latch.await();
676+
phase1Listener.result();
677+
} catch (Exception e) {
678+
assertTrue(wasCancelled.get());
679+
assertNotNull(ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class));
680+
}
681+
store.close();
682+
}
683+
618684
public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
619685
IndexShard shard = mock(IndexShard.class);
620686
when(shard.state()).thenReturn(IndexShardState.STARTED);

0 commit comments

Comments
 (0)