|
150 | 150 | import java.util.concurrent.CountDownLatch;
|
151 | 151 | import java.util.concurrent.CyclicBarrier;
|
152 | 152 | import java.util.concurrent.ExecutionException;
|
| 153 | +import java.util.concurrent.Phaser; |
153 | 154 | import java.util.concurrent.Semaphore;
|
154 | 155 | import java.util.concurrent.TimeUnit;
|
155 | 156 | import java.util.concurrent.atomic.AtomicBoolean;
|
@@ -3925,6 +3926,78 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
|
3925 | 3926 | closeShard(shard, false);
|
3926 | 3927 | }
|
3927 | 3928 |
|
| 3929 | + /** |
| 3930 | + * Verifies that after closing shard is returned, we should have released the engine, and won't open a new engine. |
| 3931 | + */ |
| 3932 | + public void testCloseShardWhileOpeningEngineDuringRecovery() throws Exception { |
| 3933 | + CountDownLatch readyToCloseLatch = new CountDownLatch(1); |
| 3934 | + CountDownLatch closeDoneLatch = new CountDownLatch(1); |
| 3935 | + IndexShard shard = newShard(false, Settings.EMPTY, config -> { |
| 3936 | + InternalEngine engine = new InternalEngine(config); |
| 3937 | + readyToCloseLatch.countDown(); |
| 3938 | + try { |
| 3939 | + closeDoneLatch.await(); |
| 3940 | + } catch (InterruptedException e) { |
| 3941 | + throw new AssertionError(e); |
| 3942 | + } |
| 3943 | + return engine; |
| 3944 | + }); |
| 3945 | + |
| 3946 | + Thread closeShardThread = new Thread(() -> { |
| 3947 | + try { |
| 3948 | + readyToCloseLatch.await(); |
| 3949 | + shard.close("testing", false); |
| 3950 | + // in integration tests, this is done as a listener on IndexService. |
| 3951 | + MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); |
| 3952 | + } catch (InterruptedException | IOException e) { |
| 3953 | + throw new AssertionError(e); |
| 3954 | + } finally { |
| 3955 | + closeDoneLatch.countDown(); |
| 3956 | + } |
| 3957 | + }); |
| 3958 | + closeShardThread.start(); |
| 3959 | + recoveryEmptyReplica(shard, true); |
| 3960 | + closeShardThread.join(); |
| 3961 | + closeShard(shard, false); |
| 3962 | + } |
| 3963 | + |
| 3964 | + /** |
| 3965 | + * Similar to {@link #testCloseShardWhileOpeningEngineDuringRecovery()} but verifies a scenario where a shard is being reset. |
| 3966 | + */ |
| 3967 | + public void testCloseShardWhileOpeningEngineDuringReset() throws Exception { |
| 3968 | + CountDownLatch readyToClose = new CountDownLatch(2); // close shard after we have created two engines in recovery and reset. |
| 3969 | + Phaser readyToReturnEngine = new Phaser(1); |
| 3970 | + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> { |
| 3971 | + InternalEngine engine = new InternalEngine(config); |
| 3972 | + readyToClose.countDown(); |
| 3973 | + readyToReturnEngine.arriveAndAwaitAdvance(); |
| 3974 | + return engine; |
| 3975 | + }); |
| 3976 | + |
| 3977 | + readyToReturnEngine.register(); // for close thread |
| 3978 | + Thread closeShardThread = new Thread(() -> { |
| 3979 | + try { |
| 3980 | + readyToClose.await(); |
| 3981 | + shard.close("testing", false); |
| 3982 | + // in integration tests, this is done as a listener on IndexService. |
| 3983 | + MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); |
| 3984 | + } catch (InterruptedException | IOException e) { |
| 3985 | + throw new AssertionError(e); |
| 3986 | + } finally { |
| 3987 | + readyToReturnEngine.arrive(); |
| 3988 | + } |
| 3989 | + }); |
| 3990 | + closeShardThread.start(); |
| 3991 | + shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, |
| 3992 | + ActionListener.wrap(r -> { |
| 3993 | + try (r) { |
| 3994 | + shard.resetEngineToGlobalCheckpoint(); |
| 3995 | + } |
| 3996 | + }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L)); |
| 3997 | + closeShardThread.join(); |
| 3998 | + closeShard(shard, false); |
| 3999 | + } |
| 4000 | + |
3928 | 4001 | public void testResetEngineWithBrokenTranslog() throws Exception {
|
3929 | 4002 | IndexShard shard = newStartedShard(false);
|
3930 | 4003 | updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData())
|
|
0 commit comments