Skip to content

Commit 7a3a45a

Browse files
authored
Reroute when new repository is registered (#73761)
Today we fail to allocate searchable snapshot shards if the repository containing their underlying data is not registered with the cluster. This failure is somewhat messy, we allocate them and let the recovery fail, and furthermore we don't automatically retry the allocation if the repository is subsequently registered. This commit introduces an allocation decider to prevent the allocation of such shards, and explain more clearly why, and also a cluster state listener that performs a reroute when a new repository is registered. Relates #73669 Relates #73714
1 parent d81398a commit 7a3a45a

File tree

6 files changed

+385
-43
lines changed

6 files changed

+385
-43
lines changed

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.lucene.search.TotalHits;
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
1213
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
1314
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
1415
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
@@ -20,11 +21,16 @@
2021
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2122
import org.elasticsearch.action.index.IndexRequestBuilder;
2223
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.RestoreInProgress;
2325
import org.elasticsearch.cluster.metadata.DataStream;
2426
import org.elasticsearch.cluster.metadata.IndexMetadata;
2527
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2628
import org.elasticsearch.cluster.node.DiscoveryNode;
2729
import org.elasticsearch.cluster.routing.ShardRouting;
30+
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
31+
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
32+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
33+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
2834
import org.elasticsearch.common.Priority;
2935
import org.elasticsearch.common.Strings;
3036
import org.elasticsearch.common.settings.Settings;
@@ -971,6 +977,123 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr
971977
);
972978
}
973979

980+
public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegistered() throws Exception {
981+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
982+
createAndPopulateIndex(
983+
indexName,
984+
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
985+
);
986+
987+
final TotalHits originalAllHits = internalCluster().client()
988+
.prepareSearch(indexName)
989+
.setTrackTotalHits(true)
990+
.get()
991+
.getHits()
992+
.getTotalHits();
993+
final TotalHits originalBarHits = internalCluster().client()
994+
.prepareSearch(indexName)
995+
.setTrackTotalHits(true)
996+
.setQuery(matchQuery("foo", "bar"))
997+
.get()
998+
.getHits()
999+
.getTotalHits();
1000+
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);
1001+
1002+
// Take snapshot containing the actual data to one repository
1003+
final String dataRepoName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
1004+
createRepository(dataRepoName, "fs");
1005+
1006+
final SnapshotId dataSnapshot = createSnapshot(dataRepoName, "data-snapshot", List.of(indexName)).snapshotId();
1007+
assertAcked(client().admin().indices().prepareDelete(indexName));
1008+
1009+
final String restoredIndexName = randomValueOtherThan(indexName, () -> randomAlphaOfLength(10).toLowerCase(Locale.ROOT));
1010+
mountSnapshot(dataRepoName, dataSnapshot.getName(), indexName, restoredIndexName, Settings.EMPTY);
1011+
ensureGreen(restoredIndexName);
1012+
1013+
if (randomBoolean()) {
1014+
logger.info("--> closing index before snapshot");
1015+
assertAcked(client().admin().indices().prepareClose(restoredIndexName));
1016+
}
1017+
1018+
// Back up the cluster to a different repo
1019+
final String backupRepoName = randomValueOtherThan(dataRepoName, () -> randomAlphaOfLength(10).toLowerCase(Locale.ROOT));
1020+
createRepository(backupRepoName, "fs");
1021+
final SnapshotId backupSnapshot = createSnapshot(backupRepoName, "backup-snapshot", List.of(restoredIndexName)).snapshotId();
1022+
1023+
// Clear out data & the repo that contains it
1024+
final RepositoryMetadata dataRepoMetadata = client().admin()
1025+
.cluster()
1026+
.prepareGetRepositories(dataRepoName)
1027+
.get()
1028+
.repositories()
1029+
.get(0);
1030+
assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
1031+
assertAcked(client().admin().cluster().prepareDeleteRepository(dataRepoName));
1032+
1033+
// Restore the backup snapshot
1034+
assertThat(
1035+
client().admin()
1036+
.cluster()
1037+
.prepareRestoreSnapshot(backupRepoName, backupSnapshot.getName())
1038+
.setIndices(restoredIndexName)
1039+
.get()
1040+
.status(),
1041+
equalTo(RestStatus.ACCEPTED)
1042+
);
1043+
1044+
assertBusy(() -> {
1045+
final ClusterAllocationExplanation clusterAllocationExplanation = client().admin()
1046+
.cluster()
1047+
.prepareAllocationExplain()
1048+
.setIndex(restoredIndexName)
1049+
.setShard(0)
1050+
.setPrimary(true)
1051+
.get()
1052+
.getExplanation();
1053+
1054+
final String description = Strings.toString(clusterAllocationExplanation);
1055+
final AllocateUnassignedDecision allocateDecision = clusterAllocationExplanation.getShardAllocationDecision()
1056+
.getAllocateDecision();
1057+
assertTrue(description, allocateDecision.isDecisionTaken());
1058+
assertThat(description, allocateDecision.getAllocationDecision(), equalTo(AllocationDecision.NO));
1059+
for (NodeAllocationResult nodeAllocationResult : allocateDecision.getNodeDecisions()) {
1060+
for (Decision decision : nodeAllocationResult.getCanAllocateDecision().getDecisions()) {
1061+
final String explanation = decision.getExplanation();
1062+
if (explanation.contains("this index is backed by a searchable snapshot")
1063+
&& explanation.contains("no such repository is registered")
1064+
&& explanation.contains("the required repository was originally named [" + dataRepoName + "]")) {
1065+
return;
1066+
}
1067+
}
1068+
}
1069+
1070+
fail(description);
1071+
});
1072+
1073+
assertBusy(() -> {
1074+
final RestoreInProgress restoreInProgress = client().admin()
1075+
.cluster()
1076+
.prepareState()
1077+
.clear()
1078+
.setCustoms(true)
1079+
.get()
1080+
.getState()
1081+
.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
1082+
assertTrue(Strings.toString(restoreInProgress, true, true), restoreInProgress.isEmpty());
1083+
});
1084+
1085+
// Re-register the repository containing the actual data & verify that the shards are now allocated
1086+
final String newRepositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
1087+
final Settings.Builder settings = Settings.builder().put(dataRepoMetadata.settings());
1088+
if (randomBoolean()) {
1089+
settings.put(READONLY_SETTING_KEY, "true");
1090+
}
1091+
assertAcked(clusterAdmin().preparePutRepository(newRepositoryName).setType("fs").setSettings(settings));
1092+
1093+
ensureGreen(restoredIndexName);
1094+
assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
1095+
}
1096+
9741097
private void assertSearchableSnapshotStats(String indexName, boolean cacheEnabled, List<String> nonCachedExtensions) {
9751098
final SearchableSnapshotsStatsResponse statsResponse = client().execute(
9761099
SearchableSnapshotsStatsAction.INSTANCE,

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,20 @@
99
import org.apache.lucene.store.BufferedIndexInput;
1010
import org.apache.lucene.util.SetOnce;
1111
import org.elasticsearch.Version;
12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.ActionRequest;
1314
import org.elasticsearch.action.ActionResponse;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.ClusterStateListener;
17+
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
18+
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
19+
import org.elasticsearch.cluster.routing.RerouteService;
20+
import org.elasticsearch.common.Priority;
1421
import org.elasticsearch.common.util.concurrent.EsExecutors;
22+
import org.elasticsearch.repositories.RepositoryData;
1523
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotsNodeCachesStatsAction;
1624
import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider;
25+
import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider;
1726
import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService;
1827
import org.elasticsearch.client.Client;
1928
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -108,12 +117,15 @@
108117
import java.util.ArrayList;
109118
import java.util.Collection;
110119
import java.util.Collections;
120+
import java.util.HashSet;
111121
import java.util.List;
112122
import java.util.Locale;
113123
import java.util.Map;
114124
import java.util.Optional;
125+
import java.util.Set;
115126
import java.util.function.Function;
116127
import java.util.function.Supplier;
128+
import java.util.stream.Collectors;
117129

118130
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
119131
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
@@ -344,7 +356,10 @@ public Collection<Object> createComponents(
344356
this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
345357
components.add(new FrozenCacheServiceSupplier(frozenCacheService.get()));
346358
components.add(new CacheServiceSupplier(cacheService.get()));
347-
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
359+
if (DiscoveryNode.isMasterNode(settings)) {
360+
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
361+
clusterService.addListener(new RepositoryUuidWatcher(clusterService.getRerouteService()));
362+
}
348363
return Collections.unmodifiableList(components);
349364
}
350365

@@ -519,6 +534,7 @@ protected XPackLicenseState getLicenseState() {
519534
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
520535
return List.of(
521536
new SearchableSnapshotAllocationDecider(() -> getLicenseState().isAllowed(XPackLicenseState.Feature.SEARCHABLE_SNAPSHOTS)),
537+
new SearchableSnapshotRepositoryExistsAllocationDecider(),
522538
new SearchableSnapshotEnableAllocationDecider(settings, clusterSettings),
523539
new HasFrozenCacheAllocationDecider(frozenCacheInfoService),
524540
new DedicatedFrozenNodeAllocationDecider()
@@ -697,4 +713,34 @@ public FrozenCacheService get() {
697713
return frozenCacheService;
698714
}
699715
}
716+
717+
private static final class RepositoryUuidWatcher implements ClusterStateListener {
718+
719+
private final RerouteService rerouteService;
720+
private final HashSet<String> knownUuids = new HashSet<>();
721+
722+
RepositoryUuidWatcher(RerouteService rerouteService) {
723+
this.rerouteService = rerouteService;
724+
}
725+
726+
@Override
727+
public void clusterChanged(ClusterChangedEvent event) {
728+
final RepositoriesMetadata repositoriesMetadata = event.state().metadata().custom(RepositoriesMetadata.TYPE);
729+
if (repositoriesMetadata == null) {
730+
knownUuids.clear();
731+
return;
732+
}
733+
734+
final Set<String> newUuids = repositoriesMetadata.repositories()
735+
.stream()
736+
.map(RepositoryMetadata::uuid)
737+
.filter(s -> s.equals(RepositoryData.MISSING_UUID) == false)
738+
.collect(Collectors.toSet());
739+
if (knownUuids.addAll(newUuids)) {
740+
rerouteService.reroute("repository UUIDs changed", Priority.NORMAL, ActionListener.wrap((() -> {})));
741+
}
742+
knownUuids.retainAll(newUuids);
743+
assert knownUuids.equals(newUuids) : knownUuids + " vs " + newUuids;
744+
}
745+
}
700746
}

0 commit comments

Comments
 (0)