Skip to content

Commit ffc5a82

Browse files
authored
Wait for concurrent reads in ESIndexInputTestCase (#51425)
Today `ESIndexInputTestCase#randomReadAndSlice` performs some reads on background threads, but does not necessarily wait for these reads to finish. It's possible that the underlying `IndexInput` may be closed before these reads complete, causing them to fail.
1 parent ec4b5bf commit ffc5a82

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO
109109
case 5:
110110
// Read clone or slice concurrently
111111
final int cloneCount = between(1, 3);
112-
final CountDownLatch countDownLatch = new CountDownLatch(1 + cloneCount);
112+
final CountDownLatch startLatch = new CountDownLatch(1 + cloneCount);
113+
final CountDownLatch finishLatch = new CountDownLatch(cloneCount);
113114

114115
final PlainActionFuture<byte[]> mainThreadResultFuture = new PlainActionFuture<>();
115116
final int mainThreadReadStart = readPos;
@@ -133,8 +134,8 @@ protected void doRun() throws Exception {
133134
final int sliceEnd = between(readEnd, length);
134135
clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd);
135136
}
136-
countDownLatch.countDown();
137-
countDownLatch.await();
137+
startLatch.countDown();
138+
startLatch.await();
138139
clone.seek(readStart);
139140
final byte[] cloneResult = randomReadAndSlice(clone, readEnd);
140141
if (randomBoolean()) {
@@ -155,20 +156,26 @@ protected void doRun() throws Exception {
155156
}
156157
}
157158

159+
@Override
160+
public void onAfter() {
161+
finishLatch.countDown();
162+
}
163+
158164
@Override
159165
public void onRejection(Exception e) {
160166
// all threads are busy, and queueing can lead this test to deadlock, so we need take no action
161-
countDownLatch.countDown();
167+
startLatch.countDown();
162168
}
163169
});
164170
}
165171

166172
try {
167-
countDownLatch.countDown();
168-
countDownLatch.await();
173+
startLatch.countDown();
174+
startLatch.await();
169175
ActionListener.completeWith(mainThreadResultFuture, () -> randomReadAndSlice(indexInput, mainThreadReadEnd));
170176
System.arraycopy(mainThreadResultFuture.actionGet(), readPos, output, readPos, mainThreadReadEnd - readPos);
171177
readPos = mainThreadReadEnd;
178+
finishLatch.await();
172179
} catch (InterruptedException e) {
173180
throw new AssertionError(e);
174181
}

0 commit comments

Comments
 (0)