Skip to content

Commit 9aeebca

Browse files
elastic#38941 start reproducer
1 parent ed20d7c commit 9aeebca

File tree

3 files changed

+266
-13
lines changed

3 files changed

+266
-13
lines changed

server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java

+17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collections;
2424
import java.util.Iterator;
2525
import java.util.List;
26+
import java.util.Objects;
2627

2728
/**
2829
* The list of paths where a blob can reside. The contents of the paths are dependent upon the implementation of {@link BlobContainer}.
@@ -76,4 +77,20 @@ public String toString() {
7677
}
7778
return sb.toString();
7879
}
80+
81+
@Override
82+
public boolean equals(final Object o) {
83+
if (this == o) {
84+
return true;
85+
}
86+
if (o == null || getClass() != o.getClass()) {
87+
return false;
88+
}
89+
return paths.equals(((BlobPath) o).paths);
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return paths.hashCode();
95+
}
7996
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+36-13
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.elasticsearch.repositories.fs.FsRepository;
117117
import org.elasticsearch.script.ScriptService;
118118
import org.elasticsearch.search.SearchService;
119+
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
119120
import org.elasticsearch.test.ESTestCase;
120121
import org.elasticsearch.test.disruption.DisruptableMockTransport;
121122
import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -165,9 +166,20 @@ public class SnapshotResiliencyTests extends ESTestCase {
165166

166167
private Path tempDir;
167168

169+
/**
170+
* Whether to use eventually consistent blob store in tests.
171+
*/
172+
private boolean eventuallyConsistent;
173+
174+
private MockEventuallyConsistentRepository.Context blobStoreContext;
175+
168176
@Before
169177
public void createServices() {
170178
tempDir = createTempDir();
179+
eventuallyConsistent = randomBoolean();
180+
if (eventuallyConsistent) {
181+
blobStoreContext = new MockEventuallyConsistentRepository.Context();
182+
}
171183
deterministicTaskQueue =
172184
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
173185
}
@@ -803,19 +815,7 @@ public void onFailure(final Exception e) {
803815
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
804816
repositoriesService = new RepositoriesService(
805817
settings, clusterService, transportService,
806-
Collections.singletonMap(FsRepository.TYPE, metaData -> {
807-
final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) {
808-
@Override
809-
protected void assertSnapshotOrGenericThread() {
810-
// eliminate thread name check as we create repo in the test thread
811-
}
812-
};
813-
repository.start();
814-
return repository;
815-
}
816-
),
817-
emptyMap(),
818-
threadPool
818+
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
819819
);
820820
snapshotsService =
821821
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
@@ -946,6 +946,29 @@ allocationService, new AliasValidator(), environment, indexScopedSettings,
946946
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
947947
}
948948

949+
private Repository.Factory getRepoFactory(final Environment environment) {
950+
// Run half the tests with the eventually consistent repository
951+
if (eventuallyConsistent) {
952+
return metaData -> {
953+
final Repository repository = new MockEventuallyConsistentRepository(
954+
metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext);
955+
repository.start();
956+
return repository;
957+
};
958+
} else {
959+
return metaData -> {
960+
final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) {
961+
@Override
962+
protected void assertSnapshotOrGenericThread() {
963+
// eliminate thread name check as we create repo in the test thread
964+
}
965+
};
966+
repository.start();
967+
return repository;
968+
};
969+
}
970+
}
971+
949972
public void restart() {
950973
testClusterNodes.disconnectNode(this);
951974
final ClusterState oldState = this.clusterService.state();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.snapshots.mockstore;
21+
22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
24+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
25+
import org.elasticsearch.cluster.metadata.MetaData;
26+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
27+
import org.elasticsearch.common.blobstore.BlobContainer;
28+
import org.elasticsearch.common.blobstore.BlobMetaData;
29+
import org.elasticsearch.common.blobstore.BlobPath;
30+
import org.elasticsearch.common.blobstore.BlobStore;
31+
import org.elasticsearch.common.collect.Tuple;
32+
import org.elasticsearch.common.io.PathUtils;
33+
import org.elasticsearch.common.io.Streams;
34+
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
36+
import org.elasticsearch.env.Environment;
37+
import org.elasticsearch.repositories.IndexId;
38+
import org.elasticsearch.repositories.fs.FsRepository;
39+
import org.elasticsearch.snapshots.SnapshotId;
40+
41+
import java.io.ByteArrayInputStream;
42+
import java.io.ByteArrayOutputStream;
43+
import java.io.FileNotFoundException;
44+
import java.io.IOException;
45+
import java.io.InputStream;
46+
import java.nio.file.FileAlreadyExistsException;
47+
import java.nio.file.Files;
48+
import java.nio.file.NoSuchFileException;
49+
import java.nio.file.Path;
50+
import java.util.HashMap;
51+
import java.util.HashSet;
52+
import java.util.List;
53+
import java.util.Map;
54+
import java.util.Set;
55+
56+
/**
57+
* Mock Repository that simulates the mechanics of an eventually consistent blobstore.
58+
*/
59+
public class MockEventuallyConsistentRepository extends FsRepository {
60+
private static final Logger logger = LogManager.getLogger(MockEventuallyConsistentRepository.class);
61+
62+
private final DeterministicTaskQueue deterministicTaskQueue;
63+
64+
private final Context context;
65+
66+
public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment,
67+
NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) {
68+
super(overrideSettings(metadata, environment), environment, namedXContentRegistry);
69+
this.deterministicTaskQueue = deterministicTaskQueue;
70+
this.context = context;
71+
}
72+
73+
@Override
74+
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
75+
super.initializeSnapshot(snapshotId, indices, clusterMetadata);
76+
}
77+
78+
@Override
79+
protected void assertSnapshotOrGenericThread() {
80+
// eliminate thread name check as we create repo in the test thread
81+
}
82+
83+
private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
84+
// TODO: use another method of testing not being able to read the test file written by the master...
85+
// this is super duper hacky
86+
if (metadata.settings().getAsBoolean("localize_location", false)) {
87+
Path location = PathUtils.get(metadata.settings().get("location"));
88+
location = location.resolve(Integer.toString(environment.hashCode()));
89+
return new RepositoryMetaData(metadata.name(), metadata.type(),
90+
Settings.builder().put(metadata.settings()).put("location", location.toAbsolutePath()).build());
91+
} else {
92+
return metadata;
93+
}
94+
}
95+
96+
@Override
97+
protected void doStop() {
98+
super.doStop();
99+
}
100+
101+
@Override
102+
protected BlobStore createBlobStore() throws Exception {
103+
return new MockBlobStore(super.createBlobStore());
104+
}
105+
106+
public static final class Context {
107+
108+
private final Map<BlobPath, Tuple<Set<String>, Map<String, Runnable>>> state = new HashMap<>();
109+
110+
public Tuple<Set<String>, Map<String, Runnable>> getState(BlobPath path) {
111+
return state.computeIfAbsent(path, p -> new Tuple<>(new HashSet<>(), new HashMap<>()));
112+
}
113+
114+
}
115+
116+
private class MockBlobStore extends BlobStoreWrapper {
117+
118+
MockBlobStore(BlobStore delegate) {
119+
super(delegate);
120+
}
121+
122+
@Override
123+
public BlobContainer blobContainer(BlobPath path) {
124+
return new MockBlobContainer(super.blobContainer(path), context.getState(path));
125+
}
126+
127+
private class MockBlobContainer extends BlobContainerWrapper {
128+
129+
private final Set<String> cachedMisses;
130+
131+
private final Map<String, Runnable> pendingWrites;
132+
133+
MockBlobContainer(BlobContainer delegate, Tuple<Set<String>, Map<String, Runnable>> state) {
134+
super(delegate);
135+
cachedMisses = state.v1();
136+
pendingWrites = state.v2();
137+
}
138+
139+
@Override
140+
public boolean blobExists(String blobName) {
141+
ensureReadAfterWrite(blobName);
142+
final boolean result = super.blobExists(blobName);
143+
if (result == false) {
144+
cachedMisses.add(blobName);
145+
}
146+
return result;
147+
}
148+
149+
@Override
150+
public InputStream readBlob(String name) throws IOException {
151+
ensureReadAfterWrite(name);
152+
return super.readBlob(name);
153+
}
154+
155+
private void ensureReadAfterWrite(String blobName) {
156+
if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) {
157+
pendingWrites.remove(blobName).run();
158+
}
159+
}
160+
161+
@Override
162+
public void deleteBlob(String blobName) throws IOException {
163+
super.deleteBlob(blobName);
164+
}
165+
166+
@Override
167+
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
168+
super.deleteBlobIgnoringIfNotExists(blobName);
169+
}
170+
171+
@Override
172+
public Map<String, BlobMetaData> listBlobs() throws IOException {
173+
return super.listBlobs();
174+
}
175+
176+
@Override
177+
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
178+
return super.listBlobsByPrefix(blobNamePrefix);
179+
}
180+
181+
@Override
182+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
183+
throws IOException {
184+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185+
Streams.copy(inputStream, baos);
186+
pendingWrites.put(blobName, () -> {
187+
try {
188+
super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists);
189+
if (cachedMisses.contains(blobName)) {
190+
deterministicTaskQueue.scheduleNow(() -> cachedMisses.remove(blobName));
191+
}
192+
} catch (NoSuchFileException | FileAlreadyExistsException e) {
193+
// Ignoring, assuming a previous concurrent delete removed the parent path and that overwrites are not
194+
// detectable with this kind of store
195+
} catch (IOException e) {
196+
throw new AssertionError(e);
197+
}
198+
});
199+
deterministicTaskQueue.scheduleNow(() -> {
200+
if (pendingWrites.containsKey(blobName)) {
201+
pendingWrites.remove(blobName).run();
202+
}
203+
});
204+
}
205+
206+
@Override
207+
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
208+
final boolean failIfAlreadyExists) throws IOException {
209+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
210+
}
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)