Skip to content

Commit 6a24fd3

Browse files
Add Restore Operation to SnapshotResiliencyTests (#40634) (#41546)
* Add Restore Operation to SnapshotResiliencyTests * Expand the successful snapshot test case to also include restoring the snapshop * Add indexing of documents as well to be able to meaningfully verify the restore * This is part of the larger effort to test eventually consistent blob stores in #39504
1 parent 6996739 commit 6a24fd3

File tree

1 file changed

+149
-15
lines changed

1 file changed

+149
-15
lines changed

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+149-15
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,42 @@
3434
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
3535
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
3636
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
37+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
38+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
39+
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
3740
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
3841
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
3942
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
4043
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
4144
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
4245
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
46+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
47+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
48+
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
49+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
50+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
51+
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
4352
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
4453
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
54+
import org.elasticsearch.action.bulk.BulkAction;
55+
import org.elasticsearch.action.bulk.BulkRequest;
56+
import org.elasticsearch.action.bulk.TransportBulkAction;
57+
import org.elasticsearch.action.bulk.TransportShardBulkAction;
58+
import org.elasticsearch.action.index.IndexRequest;
4559
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
60+
import org.elasticsearch.action.search.SearchAction;
61+
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
62+
import org.elasticsearch.action.search.SearchPhaseController;
63+
import org.elasticsearch.action.search.SearchRequest;
64+
import org.elasticsearch.action.search.SearchTransportService;
65+
import org.elasticsearch.action.search.TransportSearchAction;
4666
import org.elasticsearch.action.support.ActionFilters;
4767
import org.elasticsearch.action.support.ActiveShardCount;
68+
import org.elasticsearch.action.support.AutoCreateIndex;
69+
import org.elasticsearch.action.support.DestructiveOperations;
4870
import org.elasticsearch.action.support.TransportAction;
71+
import org.elasticsearch.action.support.WriteRequest;
72+
import org.elasticsearch.action.update.UpdateHelper;
4973
import org.elasticsearch.client.AdminClient;
5074
import org.elasticsearch.client.node.NodeClient;
5175
import org.elasticsearch.cluster.ClusterModule;
@@ -54,6 +78,7 @@
5478
import org.elasticsearch.cluster.ESAllocationTestCase;
5579
import org.elasticsearch.cluster.NodeConnectionsService;
5680
import org.elasticsearch.cluster.SnapshotsInProgress;
81+
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
5782
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
5883
import org.elasticsearch.cluster.action.shard.ShardStateAction;
5984
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
@@ -68,6 +93,8 @@
6893
import org.elasticsearch.cluster.metadata.IndexMetaData;
6994
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
7095
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
96+
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
97+
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
7198
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
7299
import org.elasticsearch.cluster.node.DiscoveryNode;
73100
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -100,7 +127,9 @@
100127
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
101128
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
102129
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
130+
import org.elasticsearch.indices.IndicesModule;
103131
import org.elasticsearch.indices.IndicesService;
132+
import org.elasticsearch.indices.analysis.AnalysisModule;
104133
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
105134
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
106135
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@@ -109,13 +138,16 @@
109138
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
110139
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
111140
import org.elasticsearch.indices.recovery.RecoverySettings;
112-
import org.elasticsearch.plugins.MapperPlugin;
141+
import org.elasticsearch.ingest.IngestService;
142+
import org.elasticsearch.node.ResponseCollectorService;
113143
import org.elasticsearch.plugins.PluginsService;
114144
import org.elasticsearch.repositories.RepositoriesService;
115145
import org.elasticsearch.repositories.Repository;
116146
import org.elasticsearch.repositories.fs.FsRepository;
117147
import org.elasticsearch.script.ScriptService;
118148
import org.elasticsearch.search.SearchService;
149+
import org.elasticsearch.search.builder.SearchSourceBuilder;
150+
import org.elasticsearch.search.fetch.FetchPhase;
119151
import org.elasticsearch.test.ESTestCase;
120152
import org.elasticsearch.test.disruption.DisruptableMockTransport;
121153
import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -138,10 +170,12 @@
138170
import java.util.LinkedHashMap;
139171
import java.util.List;
140172
import java.util.Map;
173+
import java.util.Objects;
141174
import java.util.Optional;
142175
import java.util.Set;
143176
import java.util.concurrent.TimeUnit;
144177
import java.util.concurrent.atomic.AtomicBoolean;
178+
import java.util.concurrent.atomic.AtomicInteger;
145179
import java.util.function.Consumer;
146180
import java.util.function.Supplier;
147181
import java.util.stream.Collectors;
@@ -177,18 +211,20 @@ public void stopServices() {
177211
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
178212
}
179213

180-
public void testSuccessfulSnapshot() {
214+
public void testSuccessfulSnapshotAndRestore() {
181215
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
182216

183217
String repoName = "repo";
184218
String snapshotName = "snapshot";
185219
final String index = "test";
186220

187221
final int shards = randomIntBetween(1, 10);
188-
222+
final int documents = randomIntBetween(0, 100);
189223
TestClusterNode masterNode =
190224
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
191225
final AtomicBoolean createdSnapshot = new AtomicBoolean();
226+
final AtomicBoolean snapshotRestored = new AtomicBoolean();
227+
final AtomicBoolean documentCountVerified = new AtomicBoolean();
192228
masterNode.client.admin().cluster().preparePutRepository(repoName)
193229
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
194230
.execute(
@@ -197,12 +233,61 @@ public void testSuccessfulSnapshot() {
197233
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
198234
.settings(defaultIndexSettings(shards)),
199235
assertNoFailureListener(
200-
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
201-
.execute(assertNoFailureListener(() -> createdSnapshot.set(true)))))));
202-
203-
deterministicTaskQueue.runAllRunnableTasks();
204-
236+
() -> {
237+
final Runnable afterIndexing = () ->
238+
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
239+
.setWaitForCompletion(true).execute(assertNoFailureListener(() -> {
240+
createdSnapshot.set(true);
241+
masterNode.client.admin().indices().delete(
242+
new DeleteIndexRequest(index),
243+
assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot(
244+
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true),
245+
assertNoFailureListener(restoreSnapshotResponse -> {
246+
snapshotRestored.set(true);
247+
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
248+
masterNode.client.search(
249+
new SearchRequest(index).source(
250+
new SearchSourceBuilder().size(0).trackTotalHits(true)
251+
),
252+
assertNoFailureListener(r -> {
253+
assertEquals(
254+
(long) documents,
255+
Objects.requireNonNull(r.getHits().getTotalHits()).value
256+
);
257+
documentCountVerified.set(true);
258+
}));
259+
})
260+
)));
261+
}));
262+
final AtomicInteger countdown = new AtomicInteger(documents);
263+
masterNode.client.admin().indices().putMapping(
264+
new PutMappingRequest(index).type("_doc").source("foo", "type=text"),
265+
assertNoFailureListener(r -> {
266+
for (int i = 0; i < documents; ++i) {
267+
masterNode.client.bulk(
268+
new BulkRequest().add(new IndexRequest(index).source(
269+
Collections.singletonMap("foo", "bar" + i)))
270+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
271+
assertNoFailureListener(
272+
bulkResponse -> {
273+
assertFalse(
274+
"Failures in bulkresponse: " + bulkResponse.buildFailureMessage(),
275+
bulkResponse.hasFailures());
276+
if (countdown.decrementAndGet() == 0) {
277+
afterIndexing.run();
278+
}
279+
}));
280+
}
281+
if (documents == 0) {
282+
afterIndexing.run();
283+
}
284+
}
285+
));
286+
}))));
287+
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
205288
assertTrue(createdSnapshot.get());
289+
assertTrue(snapshotRestored.get());
290+
assertTrue(documentCountVerified.get());
206291
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
207292
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
208293
final Repository repository = masterNode.repositoriesService.repository(repoName);
@@ -236,7 +321,6 @@ public void testSnapshotWithNodeDisconnects() {
236321
.execute(
237322
assertNoFailureListener(
238323
() -> masterNode.client.admin().indices().create(
239-
240324
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
241325
.settings(defaultIndexSettings(shards)),
242326
assertNoFailureListener(
@@ -833,6 +917,8 @@ protected void assertSnapshotOrGenericThread() {
833917
allocationService = ESAllocationTestCase.createAllocationService(settings);
834918
final IndexScopedSettings indexScopedSettings =
835919
new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
920+
final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test");
921+
final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
836922
indicesService = new IndicesService(
837923
settings,
838924
mock(PluginsService.class),
@@ -841,12 +927,12 @@ protected void assertSnapshotOrGenericThread() {
841927
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(),
842928
emptyMap(), emptyMap(), emptyMap(), emptyMap()),
843929
indexNameExpressionResolver,
844-
new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER),
930+
mapperRegistry,
845931
namedWriteableRegistry,
846932
threadPool,
847933
indexScopedSettings,
848934
new NoneCircuitBreakerService(),
849-
new BigArrays(new PageCacheRecycler(settings), null, "test"),
935+
bigArrays,
850936
scriptService,
851937
client,
852938
new MetaStateService(nodeEnv, namedXContentRegistry),
@@ -863,14 +949,15 @@ protected void assertSnapshotOrGenericThread() {
863949
new RoutingService(clusterService, allocationService),
864950
threadPool
865951
);
952+
final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
866953
indicesClusterStateService = new IndicesClusterStateService(
867954
settings,
868955
indicesService,
869956
clusterService,
870957
threadPool,
871958
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
872959
shardStateAction,
873-
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
960+
new NodeMappingRefreshAction(transportService, metaDataMappingService),
874961
repositoriesService,
875962
mock(SearchService.class),
876963
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
@@ -915,14 +1002,61 @@ protected void assertSnapshotOrGenericThread() {
9151002
actionFilters,
9161003
indexNameExpressionResolver));
9171004
Map<Action, TransportAction> actions = new HashMap<>();
1005+
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
1006+
indicesService,
1007+
allocationService, new AliasValidator(), environment, indexScopedSettings,
1008+
threadPool, namedXContentRegistry, false);
9181009
actions.put(CreateIndexAction.INSTANCE,
9191010
new TransportCreateIndexAction(
9201011
transportService, clusterService, threadPool,
921-
new MetaDataCreateIndexService(settings, clusterService, indicesService,
922-
allocationService, new AliasValidator(), environment, indexScopedSettings,
923-
threadPool, namedXContentRegistry, false),
1012+
metaDataCreateIndexService,
9241013
actionFilters, indexNameExpressionResolver
9251014
));
1015+
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
1016+
mappingUpdatedAction.setClient(client);
1017+
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
1018+
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
1019+
actionFilters, indexNameExpressionResolver);
1020+
actions.put(BulkAction.INSTANCE,
1021+
new TransportBulkAction(threadPool, transportService, clusterService,
1022+
new IngestService(
1023+
clusterService, threadPool, environment, scriptService,
1024+
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
1025+
Collections.emptyList()),
1026+
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
1027+
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
1028+
));
1029+
final RestoreService restoreService = new RestoreService(
1030+
clusterService, repositoriesService, allocationService,
1031+
metaDataCreateIndexService,
1032+
new MetaDataIndexUpgradeService(
1033+
settings, namedXContentRegistry,
1034+
mapperRegistry,
1035+
indexScopedSettings,
1036+
Collections.emptyList()
1037+
),
1038+
clusterSettings
1039+
);
1040+
actions.put(PutMappingAction.INSTANCE,
1041+
new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService,
1042+
actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList())));
1043+
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
1044+
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
1045+
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
1046+
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
1047+
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
1048+
actions.put(SearchAction.INSTANCE,
1049+
new TransportSearchAction(threadPool, transportService, searchService,
1050+
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
1051+
actionFilters, indexNameExpressionResolver));
1052+
actions.put(RestoreSnapshotAction.INSTANCE,
1053+
new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,
1054+
indexNameExpressionResolver));
1055+
actions.put(DeleteIndexAction.INSTANCE,
1056+
new TransportDeleteIndexAction(
1057+
transportService, clusterService, threadPool,
1058+
new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters,
1059+
indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings)));
9261060
actions.put(PutRepositoryAction.INSTANCE,
9271061
new TransportPutRepositoryAction(
9281062
transportService, clusterService, repositoriesService, threadPool,

0 commit comments

Comments
 (0)