|
158 | 158 | import java.util.concurrent.BrokenBarrierException;
|
159 | 159 | import java.util.concurrent.CountDownLatch;
|
160 | 160 | import java.util.concurrent.CyclicBarrier;
|
| 161 | +import java.util.concurrent.Phaser; |
161 | 162 | import java.util.concurrent.Semaphore;
|
162 | 163 | import java.util.concurrent.TimeUnit;
|
163 | 164 | import java.util.concurrent.atomic.AtomicBoolean;
|
@@ -5896,4 +5897,58 @@ private void runTestDeleteFailure(
|
5896 | 5897 | }
|
5897 | 5898 | }
|
5898 | 5899 |
|
| 5900 | + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { |
| 5901 | + final AtomicInteger refreshCount = new AtomicInteger(); |
| 5902 | + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { |
| 5903 | + @Override |
| 5904 | + public void beforeRefresh() { |
| 5905 | + |
| 5906 | + } |
| 5907 | + |
| 5908 | + @Override |
| 5909 | + public void afterRefresh(boolean didRefresh) { |
| 5910 | + if (didRefresh) { |
| 5911 | + refreshCount.incrementAndGet(); |
| 5912 | + } |
| 5913 | + } |
| 5914 | + }; |
| 5915 | + try (Store store = createStore()) { |
| 5916 | + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, |
| 5917 | + refreshListener, null, null, engine.config().getCircuitBreakerService()); |
| 5918 | + try (InternalEngine engine = createEngine(config)) { |
| 5919 | + int numDocs = randomIntBetween(10, 100); |
| 5920 | + Set<String> ids = new HashSet<>(); |
| 5921 | + for (int i = 0; i < numDocs; i++) { |
| 5922 | + String id = Integer.toString(i); |
| 5923 | + engine.index(indexForDoc(createParsedDoc(id, null))); |
| 5924 | + ids.add(id); |
| 5925 | + } |
| 5926 | + final int refreshCountBeforeGet = refreshCount.get(); |
| 5927 | + Thread[] getters = new Thread[randomIntBetween(1, 4)]; |
| 5928 | + Phaser phaser = new Phaser(getters.length + 1); |
| 5929 | + for (int t = 0; t < getters.length; t++) { |
| 5930 | + getters[t] = new Thread(() -> { |
| 5931 | + phaser.arriveAndAwaitAdvance(); |
| 5932 | + int iters = randomIntBetween(1, 10); |
| 5933 | + for (int i = 0; i < iters; i++) { |
| 5934 | + ParsedDocument doc = createParsedDoc(randomFrom(ids), null); |
| 5935 | + try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) { |
| 5936 | + assertThat(getResult.exists(), equalTo(true)); |
| 5937 | + assertThat(getResult.docIdAndVersion(), notNullValue()); |
| 5938 | + } |
| 5939 | + } |
| 5940 | + }); |
| 5941 | + getters[t].start(); |
| 5942 | + } |
| 5943 | + phaser.arriveAndAwaitAdvance(); |
| 5944 | + for (int i = 0; i < numDocs; i++) { |
| 5945 | + engine.index(indexForDoc(createParsedDoc("more-" + i, null))); |
| 5946 | + } |
| 5947 | + for (Thread getter : getters) { |
| 5948 | + getter.join(); |
| 5949 | + } |
| 5950 | + assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1)); |
| 5951 | + } |
| 5952 | + } |
| 5953 | + } |
5899 | 5954 | }
|
0 commit comments