Skip to content

Commit d1f1e79

Browse files
CR: Test, fix delete case, dry up ensure open
1 parent 3c94924 commit d1f1e79

File tree

3 files changed

+154
-35
lines changed

3 files changed

+154
-35
lines changed

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ protected void assertSnapshotOrGenericThread() {
10951095
} else {
10961096
return metaData -> {
10971097
final Repository repository = new MockEventuallyConsistentRepository(
1098-
metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext);
1098+
metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
10991099
repository.start();
11001100
return repository;
11011101
};

server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java

+27-34
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.snapshots.mockstore;
2121

22-
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
2322
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2423
import org.elasticsearch.common.Nullable;
2524
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -31,6 +30,7 @@
3130
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3231
import org.elasticsearch.env.Environment;
3332
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
33+
import org.elasticsearch.threadpool.ThreadPool;
3434

3535
import java.io.ByteArrayInputStream;
3636
import java.io.IOException;
@@ -55,8 +55,8 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
5555
private final Context context;
5656

5757
public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment,
58-
NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) {
59-
super(metadata, environment.settings(), namedXContentRegistry, deterministicTaskQueue.getThreadPool(), BlobPath.cleanPath());
58+
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool, Context context) {
59+
super(metadata, environment.settings(), namedXContentRegistry, threadPool, BlobPath.cleanPath());
6060
this.context = context;
6161
}
6262

@@ -141,6 +141,12 @@ public void close() {
141141
closed.set(true);
142142
}
143143

144+
private void ensureNotClosed() {
145+
if (closed.get()) {
146+
throw new AssertionError("Blobstore is closed already");
147+
}
148+
}
149+
144150
private class MockBlobContainer implements BlobContainer {
145151

146152
private final BlobPath path;
@@ -156,9 +162,7 @@ public BlobPath path() {
156162

157163
@Override
158164
public boolean blobExists(String blobName) {
159-
if (closed.get()) {
160-
throw new AssertionError("Blobstore is closed already");
161-
}
165+
ensureNotClosed();
162166
try {
163167
readBlob(blobName);
164168
return true;
@@ -169,14 +173,19 @@ public boolean blobExists(String blobName) {
169173

170174
@Override
171175
public InputStream readBlob(String name) throws NoSuchFileException {
172-
if (closed.get()) {
173-
throw new AssertionError("Blobstore is closed already");
174-
}
176+
ensureNotClosed();
175177
final String blobPath = path.buildAsString() + name;
176178
synchronized (context.actions) {
179+
final List<BlobStoreAction> relevantActions = new ArrayList<>(
180+
context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList()));
177181
context.actions.add(new BlobStoreAction(Operation.GET, blobPath));
178-
final List<BlobStoreAction> relevantActions =
179-
context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList());
182+
for (int i = relevantActions.size() - 1; i > 0; i--) {
183+
if (relevantActions.get(i).operation == Operation.GET) {
184+
relevantActions.remove(i);
185+
} else {
186+
break;
187+
}
188+
}
180189
final List<byte[]> writes = new ArrayList<>();
181190
boolean readBeforeWrite = false;
182191
for (BlobStoreAction relevantAction : relevantActions) {
@@ -190,35 +199,25 @@ public InputStream readBlob(String name) throws NoSuchFileException {
190199
if (writes.isEmpty()) {
191200
throw new NoSuchFileException(blobPath);
192201
}
193-
if (readBeforeWrite == false && writes.size() == 1) {
202+
if (readBeforeWrite == false && relevantActions.size() == 1) {
194203
// Consistent read after write
195204
return new ByteArrayInputStream(writes.get(0));
196205
}
197-
if ("incompatible-snapshots".equals(blobPath) == false && "index.latest".equals(blobPath) == false) {
198-
throw new AssertionError("Inconsistent read on [" + blobPath + ']');
199-
}
200-
return consistentView(relevantActions).stream()
201-
.filter(action -> action.path.equals(blobPath) && action.operation == Operation.PUT)
202-
.findAny().map(
203-
action -> new ByteArrayInputStream(action.data)).orElseThrow(() -> new NoSuchFileException(blobPath));
206+
throw new AssertionError("Inconsistent read on [" + blobPath + ']');
204207
}
205208
}
206209

207210
@Override
208211
public void deleteBlob(String blobName) {
209-
if (closed.get()) {
210-
throw new AssertionError("Blobstore is closed already");
211-
}
212+
ensureNotClosed();
212213
synchronized (context.actions) {
213214
context.actions.add(new BlobStoreAction(Operation.DELETE, path.buildAsString() + blobName));
214215
}
215216
}
216217

217218
@Override
218219
public void delete() {
219-
if (closed.get()) {
220-
throw new AssertionError("Blobstore is closed already");
221-
}
220+
ensureNotClosed();
222221
final String thisPath = path.buildAsString();
223222
synchronized (context.actions) {
224223
consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath))
@@ -228,9 +227,7 @@ public void delete() {
228227

229228
@Override
230229
public Map<String, BlobMetaData> listBlobs() {
231-
if (closed.get()) {
232-
throw new AssertionError("Blobstore is closed already");
233-
}
230+
ensureNotClosed();
234231
final String thisPath = path.buildAsString();
235232
synchronized (context.actions) {
236233
return consistentView(context.actions).stream()
@@ -246,9 +243,7 @@ public Map<String, BlobMetaData> listBlobs() {
246243

247244
@Override
248245
public Map<String, BlobContainer> children() {
249-
if (closed.get()) {
250-
throw new AssertionError("Blobstore is closed already");
251-
}
246+
ensureNotClosed();
252247
final String thisPath = path.buildAsString();
253248
synchronized (context.actions) {
254249
return consistentView(context.actions).stream()
@@ -271,9 +266,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
271266
@Override
272267
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
273268
throws IOException {
274-
if (closed.get()) {
275-
throw new AssertionError("Blobstore is closed already");
276-
}
269+
ensureNotClosed();
277270
// TODO: Throw if we try to overwrite any blob other than incompatible_snapshots or index.latest with different content
278271
// than it already contains.
279272
assert blobSize < Integer.MAX_VALUE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.snapshots.mockstore;
20+
21+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
22+
import org.elasticsearch.common.blobstore.BlobContainer;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.env.Environment;
25+
import org.elasticsearch.env.TestEnvironment;
26+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
27+
import org.elasticsearch.test.ESTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.nio.file.NoSuchFileException;
34+
import java.nio.file.Path;
35+
import java.util.Arrays;
36+
37+
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
38+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.mockito.Mockito.mock;
41+
42+
public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
43+
44+
private Environment environment;
45+
46+
@Override
47+
public void setUp() throws Exception {
48+
super.setUp();
49+
final Path tempDir = createTempDir();
50+
final String nodeName = "testNode";
51+
environment = TestEnvironment.newEnvironment(Settings.builder()
52+
.put(NODE_NAME_SETTING.getKey(), nodeName)
53+
.put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath())
54+
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
55+
.build());
56+
}
57+
58+
public void testReadAfterWriteConsistently() throws IOException {
59+
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
60+
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
61+
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
62+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
63+
repository.start();
64+
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
65+
final String blobName = randomAlphaOfLength(10);
66+
final int lengthWritten = randomIntBetween(1, 100);
67+
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
68+
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
69+
try (InputStream in = blobContainer.readBlob(blobName)) {
70+
final byte[] readBytes = new byte[lengthWritten + 1];
71+
final int lengthSeen = in.read(readBytes);
72+
assertThat(lengthSeen, equalTo(lengthWritten));
73+
assertArrayEquals(blobData, Arrays.copyOf(readBytes, lengthWritten));
74+
}
75+
}
76+
}
77+
78+
public void testReadAfterWriteAfterReadThrows() throws IOException {
79+
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
80+
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
81+
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
82+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
83+
repository.start();
84+
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
85+
final String blobName = randomAlphaOfLength(10);
86+
final int lengthWritten = randomIntBetween(1, 100);
87+
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
88+
assertMissing(blobContainer, blobName);
89+
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
90+
assertThrowsOnInconsistentRead(blobContainer, blobName);
91+
}
92+
}
93+
94+
public void testReadAfterDeleteAfterWriteThrows() throws IOException {
95+
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
96+
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
97+
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
98+
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
99+
repository.start();
100+
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
101+
final String blobName = randomAlphaOfLength(10);
102+
final int lengthWritten = randomIntBetween(1, 100);
103+
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
104+
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
105+
blobContainer.deleteBlob(blobName);
106+
assertThrowsOnInconsistentRead(blobContainer, blobName);
107+
blobStoreContext.forceConsistent();
108+
assertMissing(blobContainer, blobName);
109+
}
110+
}
111+
112+
private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) throws IOException {
113+
try (InputStream in = blobContainer.readBlob(blobName)) {
114+
fail("Inconsistent read should throw");
115+
} catch (AssertionError assertionError) {
116+
assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']'));
117+
}
118+
}
119+
120+
private static void assertMissing(BlobContainer container, String blobName) throws IOException {
121+
try (InputStream in = container.readBlob(blobName)) {
122+
fail("Reading a non-existent blob should throw");
123+
} catch (NoSuchFileException expected) {
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)