|
165 | 165 | import java.util.concurrent.BrokenBarrierException;
|
166 | 166 | import java.util.concurrent.CountDownLatch;
|
167 | 167 | import java.util.concurrent.CyclicBarrier;
|
| 168 | +import java.util.concurrent.Phaser; |
168 | 169 | import java.util.concurrent.Semaphore;
|
169 | 170 | import java.util.concurrent.TimeUnit;
|
170 | 171 | import java.util.concurrent.atomic.AtomicBoolean;
|
@@ -6094,4 +6095,58 @@ private void runTestDeleteFailure(
|
6094 | 6095 | }
|
6095 | 6096 | }
|
6096 | 6097 |
|
| 6098 | + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { |
| 6099 | + final AtomicInteger refreshCount = new AtomicInteger(); |
| 6100 | + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { |
| 6101 | + @Override |
| 6102 | + public void beforeRefresh() { |
| 6103 | + |
| 6104 | + } |
| 6105 | + |
| 6106 | + @Override |
| 6107 | + public void afterRefresh(boolean didRefresh) { |
| 6108 | + if (didRefresh) { |
| 6109 | + refreshCount.incrementAndGet(); |
| 6110 | + } |
| 6111 | + } |
| 6112 | + }; |
| 6113 | + try (Store store = createStore()) { |
| 6114 | + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, |
| 6115 | + refreshListener, null, null, engine.config().getCircuitBreakerService()); |
| 6116 | + try (InternalEngine engine = createEngine(config)) { |
| 6117 | + int numDocs = randomIntBetween(10, 100); |
| 6118 | + Set<String> ids = new HashSet<>(); |
| 6119 | + for (int i = 0; i < numDocs; i++) { |
| 6120 | + String id = Integer.toString(i); |
| 6121 | + engine.index(indexForDoc(createParsedDoc(id, null))); |
| 6122 | + ids.add(id); |
| 6123 | + } |
| 6124 | + final int refreshCountBeforeGet = refreshCount.get(); |
| 6125 | + Thread[] getters = new Thread[randomIntBetween(1, 4)]; |
| 6126 | + Phaser phaser = new Phaser(getters.length + 1); |
| 6127 | + for (int t = 0; t < getters.length; t++) { |
| 6128 | + getters[t] = new Thread(() -> { |
| 6129 | + phaser.arriveAndAwaitAdvance(); |
| 6130 | + int iters = randomIntBetween(1, 10); |
| 6131 | + for (int i = 0; i < iters; i++) { |
| 6132 | + ParsedDocument doc = createParsedDoc(randomFrom(ids), null); |
| 6133 | + try (Engine.GetResult getResult = engine.get(newGet(true, doc), engine::acquireSearcher)) { |
| 6134 | + assertThat(getResult.exists(), equalTo(true)); |
| 6135 | + assertThat(getResult.docIdAndVersion(), notNullValue()); |
| 6136 | + } |
| 6137 | + } |
| 6138 | + }); |
| 6139 | + getters[t].start(); |
| 6140 | + } |
| 6141 | + phaser.arriveAndAwaitAdvance(); |
| 6142 | + for (int i = 0; i < numDocs; i++) { |
| 6143 | + engine.index(indexForDoc(createParsedDoc("more-" + i, null))); |
| 6144 | + } |
| 6145 | + for (Thread getter : getters) { |
| 6146 | + getter.join(); |
| 6147 | + } |
| 6148 | + assertThat(refreshCount.get(), lessThanOrEqualTo(refreshCountBeforeGet + 1)); |
| 6149 | + } |
| 6150 | + } |
| 6151 | + } |
6097 | 6152 | }
|
0 commit comments