Skip to content

Commit cd72774

Browse files
authored
Test concurrent cloning/slicing of IndexInputs (#51032)
We test the behaviour of an `IndexInput` using `randomReadAndSlice()` which uses a wide varity of different access methods to read the data. Lucene sometimes calls `clone()` and `slice()` concurrently, although it does ensure that there are no concurrent readers while these are being called. Today we do not verify that our `IndexInput`s behave correctly under this kind of concurrent access. This commit extracts `randomReadAndSlice()` into a separate test harness for more general consumption and adds support for concurrent cloning and slicing.
1 parent 115997b commit cd72774

File tree

3 files changed

+188
-104
lines changed

3 files changed

+188
-104
lines changed

server/src/test/java/org/elasticsearch/common/lucene/store/ByteArrayIndexInputTests.java

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919

2020
package org.elasticsearch.common.lucene.store;
2121

22-
import org.apache.lucene.store.IndexInput;
23-
import org.elasticsearch.test.ESTestCase;
24-
2522
import java.io.IOException;
2623
import java.nio.charset.StandardCharsets;
2724

2825
import static org.hamcrest.Matchers.containsString;
2926

30-
public class ByteArrayIndexInputTests extends ESTestCase {
27+
public class ByteArrayIndexInputTests extends ESIndexInputTestCase {
3128
public void testRandomReads() throws IOException {
3229
for (int i = 0; i < 100; i++) {
3330
byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8);
@@ -87,47 +84,5 @@ public void testSeekOverflow() throws IOException {
8784
}
8885
}
8986

90-
private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOException {
91-
int readPos = (int) indexInput.getFilePointer();
92-
byte[] output = new byte[length];
93-
while (readPos < length) {
94-
switch (randomIntBetween(0, 3)) {
95-
case 0:
96-
// Read by one byte at a time
97-
output[readPos++] = indexInput.readByte();
98-
break;
99-
case 1:
100-
// Read several bytes into target
101-
int len = randomIntBetween(1, length - readPos);
102-
indexInput.readBytes(output, readPos, len);
103-
readPos += len;
104-
break;
105-
case 2:
106-
// Read several bytes into 0-offset target
107-
len = randomIntBetween(1, length - readPos);
108-
byte[] temp = new byte[len];
109-
indexInput.readBytes(temp, 0, len);
110-
System.arraycopy(temp, 0, output, readPos, len);
111-
readPos += len;
112-
break;
113-
case 3:
114-
// Read using slice
115-
len = randomIntBetween(1, length - readPos);
116-
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput.toString(), readPos, len);
117-
temp = randomReadAndSlice(slice, len);
118-
// assert that position in the original input didn't change
119-
assertEquals(readPos, indexInput.getFilePointer());
120-
System.arraycopy(temp, 0, output, readPos, len);
121-
readPos += len;
122-
indexInput.seek(readPos);
123-
assertEquals(readPos, indexInput.getFilePointer());
124-
break;
125-
default:
126-
fail();
127-
}
128-
assertEquals(readPos, indexInput.getFilePointer());
129-
}
130-
return output;
131-
}
13287
}
13388

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common.lucene.store;
20+
21+
import org.apache.lucene.store.IndexInput;
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.support.PlainActionFuture;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
26+
import org.elasticsearch.common.util.concurrent.EsExecutors;
27+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
28+
import org.elasticsearch.common.util.concurrent.ThreadContext;
29+
import org.elasticsearch.test.ESTestCase;
30+
import org.junit.AfterClass;
31+
import org.junit.BeforeClass;
32+
33+
import java.io.IOException;
34+
import java.util.concurrent.CountDownLatch;
35+
36+
/**
37+
* Test harness for verifying {@link IndexInput} implementations.
38+
*/
39+
public class ESIndexInputTestCase extends ESTestCase {
40+
41+
private static EsThreadPoolExecutor executor;
42+
43+
@BeforeClass
44+
public static void createExecutor() {
45+
final String name = getTestClass().getSimpleName() + "#randomReadAndSlice";
46+
executor = EsExecutors.newFixed(name, 10, 0, EsExecutors.daemonThreadFactory(name), new ThreadContext(Settings.EMPTY));
47+
}
48+
49+
@AfterClass
50+
public static void destroyExecutor() {
51+
executor.shutdown();
52+
}
53+
54+
/**
55+
* Reads the contents of an {@link IndexInput} from {@code indexInput.getFilePointer()} to {@code length} using a wide variety of
56+
* different access methods. Returns an array of length {@code length} containing the bytes that were read starting at index
57+
* {@code indexInput.getFilePointer()}. The bytes of the returned array with indices below the initial value of
58+
* {@code indexInput.getFilePointer()} may contain anything. The final value of {@code indexInput.getFilePointer()} is {@code length}.
59+
*/
60+
protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOException {
61+
int readPos = (int) indexInput.getFilePointer();
62+
byte[] output = new byte[length];
63+
while (readPos < length) {
64+
switch (randomIntBetween(0, 5)) {
65+
case 0:
66+
// Read by one byte at a time
67+
output[readPos++] = indexInput.readByte();
68+
break;
69+
case 1:
70+
// Read several bytes into target
71+
int len = randomIntBetween(1, length - readPos);
72+
indexInput.readBytes(output, readPos, len);
73+
readPos += len;
74+
break;
75+
case 2:
76+
// Read several bytes into 0-offset target
77+
len = randomIntBetween(1, length - readPos);
78+
byte[] temp = new byte[len];
79+
indexInput.readBytes(temp, 0, len);
80+
System.arraycopy(temp, 0, output, readPos, len);
81+
readPos += len;
82+
break;
83+
case 3:
84+
// Read using slice
85+
len = randomIntBetween(1, length - readPos);
86+
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput, readPos, len);
87+
temp = randomReadAndSlice(slice, len);
88+
// assert that position in the original input didn't change
89+
assertEquals(readPos, indexInput.getFilePointer());
90+
System.arraycopy(temp, 0, output, readPos, len);
91+
readPos += len;
92+
indexInput.seek(readPos);
93+
assertEquals(readPos, indexInput.getFilePointer());
94+
break;
95+
case 4:
96+
// Seek at a random position and read a single byte,
97+
// then seek back to original position
98+
final int lastReadPos = readPos;
99+
readPos = randomIntBetween(0, length - 1);
100+
indexInput.seek(readPos);
101+
assertEquals(readPos, indexInput.getFilePointer());
102+
final int bytesToRead = 1;
103+
temp = randomReadAndSlice(indexInput, readPos + bytesToRead);
104+
System.arraycopy(temp, readPos, output, readPos, bytesToRead);
105+
readPos = lastReadPos;
106+
indexInput.seek(readPos);
107+
assertEquals(readPos, indexInput.getFilePointer());
108+
break;
109+
case 5:
110+
// Read clone or slice concurrently
111+
final int cloneCount = between(1, 3);
112+
final CountDownLatch countDownLatch = new CountDownLatch(1 + cloneCount);
113+
114+
final PlainActionFuture<byte[]> mainThreadResultFuture = new PlainActionFuture<>();
115+
final int mainThreadReadStart = readPos;
116+
final int mainThreadReadEnd = randomIntBetween(readPos + 1, length);
117+
118+
for (int i = 0; i < cloneCount; i++) {
119+
executor.execute(new AbstractRunnable() {
120+
@Override
121+
public void onFailure(Exception e) {
122+
throw new AssertionError(e);
123+
}
124+
125+
@Override
126+
protected void doRun() throws Exception {
127+
final IndexInput clone;
128+
final int readStart = between(0, length);
129+
final int readEnd = between(readStart, length);
130+
if (randomBoolean()) {
131+
clone = indexInput.clone();
132+
} else {
133+
final int sliceEnd = between(readEnd, length);
134+
clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd);
135+
}
136+
countDownLatch.countDown();
137+
countDownLatch.await();
138+
clone.seek(readStart);
139+
final byte[] cloneResult = randomReadAndSlice(clone, readEnd);
140+
if (randomBoolean()) {
141+
clone.close();
142+
}
143+
144+
// the read from the clone should agree with the read from the main input on their overlap
145+
final int maxStart = Math.max(mainThreadReadStart, readStart);
146+
final int minEnd = Math.min(mainThreadReadEnd, readEnd);
147+
if (maxStart < minEnd) {
148+
final byte[] mainThreadResult = mainThreadResultFuture.actionGet();
149+
final int overlapLen = minEnd - maxStart;
150+
final byte[] fromMainThread = new byte[overlapLen];
151+
final byte[] fromClone = new byte[overlapLen];
152+
System.arraycopy(mainThreadResult, maxStart, fromMainThread, 0, overlapLen);
153+
System.arraycopy(cloneResult, maxStart, fromClone, 0, overlapLen);
154+
assertArrayEquals(fromMainThread, fromClone);
155+
}
156+
}
157+
158+
@Override
159+
public void onRejection(Exception e) {
160+
// all threads are busy, and queueing can lead this test to deadlock, so we need take no action
161+
countDownLatch.countDown();
162+
}
163+
});
164+
}
165+
166+
try {
167+
countDownLatch.countDown();
168+
countDownLatch.await();
169+
ActionListener.completeWith(mainThreadResultFuture, () -> randomReadAndSlice(indexInput, mainThreadReadEnd));
170+
System.arraycopy(mainThreadResultFuture.actionGet(), readPos, output, readPos, mainThreadReadEnd - readPos);
171+
readPos = mainThreadReadEnd;
172+
} catch (InterruptedException e) {
173+
throw new AssertionError(e);
174+
}
175+
break;
176+
default:
177+
fail();
178+
}
179+
assertEquals(readPos, indexInput.getFilePointer());
180+
}
181+
assertEquals(length, indexInput.getFilePointer());
182+
return output;
183+
}
184+
185+
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotIndexInputTests.java

Lines changed: 2 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
import org.apache.lucene.store.IndexInput;
99
import org.apache.lucene.util.Version;
1010
import org.elasticsearch.common.blobstore.BlobContainer;
11+
import org.elasticsearch.common.lucene.store.ESIndexInputTestCase;
1112
import org.elasticsearch.common.unit.ByteSizeUnit;
1213
import org.elasticsearch.common.unit.ByteSizeValue;
1314
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
14-
import org.elasticsearch.test.ESTestCase;
1515

1616
import java.io.ByteArrayInputStream;
1717
import java.io.EOFException;
@@ -31,7 +31,7 @@
3131
import static org.mockito.Mockito.mock;
3232
import static org.mockito.Mockito.when;
3333

34-
public class SearchableSnapshotIndexInputTests extends ESTestCase {
34+
public class SearchableSnapshotIndexInputTests extends ESIndexInputTestCase {
3535

3636
private SearchableSnapshotIndexInput createIndexInput(final byte[] input) throws IOException {
3737
final long partSize = (long) (randomBoolean() ? input.length : randomIntBetween(1, input.length));
@@ -111,60 +111,4 @@ public void testSeekOverflow() throws IOException {
111111
}
112112
}
113113

114-
private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOException {
115-
int readPos = (int) indexInput.getFilePointer();
116-
byte[] output = new byte[length];
117-
while (readPos < length) {
118-
switch (randomIntBetween(0, 4)) {
119-
case 0:
120-
// Read by one byte at a time
121-
output[readPos++] = indexInput.readByte();
122-
break;
123-
case 1:
124-
// Read several bytes into target
125-
int len = randomIntBetween(1, length - readPos);
126-
indexInput.readBytes(output, readPos, len);
127-
readPos += len;
128-
break;
129-
case 2:
130-
// Read several bytes into 0-offset target
131-
len = randomIntBetween(1, length - readPos);
132-
byte[] temp = new byte[len];
133-
indexInput.readBytes(temp, 0, len);
134-
System.arraycopy(temp, 0, output, readPos, len);
135-
readPos += len;
136-
break;
137-
case 3:
138-
// Read using slice
139-
len = randomIntBetween(1, length - readPos);
140-
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput.toString(), readPos, len);
141-
temp = randomReadAndSlice(slice, len);
142-
// assert that position in the original input didn't change
143-
assertEquals(readPos, indexInput.getFilePointer());
144-
System.arraycopy(temp, 0, output, readPos, len);
145-
readPos += len;
146-
indexInput.seek(readPos);
147-
assertEquals(readPos, indexInput.getFilePointer());
148-
break;
149-
case 4:
150-
// Seek at a random position and read a single byte,
151-
// then seek back to original position
152-
final int lastReadPos = readPos;
153-
readPos = randomIntBetween(0, length - 1);
154-
indexInput.seek(readPos);
155-
assertEquals(readPos, indexInput.getFilePointer());
156-
final int bytesToRead = 1;
157-
temp = randomReadAndSlice(indexInput, readPos + bytesToRead);
158-
System.arraycopy(temp, readPos, output, readPos, bytesToRead);
159-
readPos = lastReadPos;
160-
indexInput.seek(readPos);
161-
assertEquals(readPos, indexInput.getFilePointer());
162-
break;
163-
default:
164-
fail();
165-
}
166-
assertEquals(readPos, indexInput.getFilePointer());
167-
}
168-
return output;
169-
}
170114
}

0 commit comments

Comments
 (0)