Skip to content

Commit 3c7c0c6

Browse files
Move experimental frozen to frozen shard limit (#71781)
Frozen indices created on 7.12 would not belong to the frozen shard limit group, now we convert them when last node is upgraded. Relates #71392
1 parent f50b357 commit 3c7c0c6

File tree

4 files changed

+302
-0
lines changed

4 files changed

+302
-0
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
102102
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
103103
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
104+
import org.elasticsearch.xpack.searchablesnapshots.upgrade.SearchableSnapshotIndexMetadataUpgrader;
104105

105106
import java.io.IOException;
106107
import java.io.UncheckedIOException;
@@ -348,6 +349,7 @@ public Collection<Object> createComponents(
348349
this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
349350
components.add(new FrozenCacheServiceSupplier(frozenCacheService.get()));
350351
components.add(new CacheServiceSupplier(cacheService.get()));
352+
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
351353
return Collections.unmodifiableList(components);
352354
}
353355

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.upgrade;
9+
10+
import org.apache.log4j.LogManager;
11+
import org.apache.log4j.Logger;
12+
import org.elasticsearch.Version;
13+
import org.elasticsearch.cluster.ClusterChangedEvent;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.ClusterStateListener;
16+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.cluster.metadata.Metadata;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.indices.ShardLimitValidator;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
24+
25+
import java.util.concurrent.Executor;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.stream.StreamSupport;
28+
29+
/**
30+
* This class upgrades frozen indices to apply the index.shard_limit.group=frozen setting after all nodes have been upgraded to 7.13+
31+
*/
32+
public class SearchableSnapshotIndexMetadataUpgrader {
33+
private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexMetadataUpgrader.class);
34+
35+
private final ClusterService clusterService;
36+
private final ThreadPool threadPool;
37+
private final AtomicBoolean upgraded = new AtomicBoolean();
38+
private final ClusterStateListener listener = this::clusterChanged;
39+
40+
public SearchableSnapshotIndexMetadataUpgrader(ClusterService clusterService, ThreadPool threadPool) {
41+
this.clusterService = clusterService;
42+
this.threadPool = threadPool;
43+
}
44+
45+
public void initialize() {
46+
clusterService.addListener(listener);
47+
}
48+
49+
private void clusterChanged(ClusterChangedEvent event) {
50+
if (upgraded.get()) {
51+
return;
52+
}
53+
54+
if (event.localNodeMaster() && event.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_13_0)) {
55+
// only want one doing this at a time, assume it succeeds and reset if not.
56+
if (upgraded.compareAndSet(false, true)) {
57+
final Executor executor = threadPool.generic();
58+
executor.execute(() -> maybeUpgradeIndices(event.state()));
59+
}
60+
}
61+
}
62+
63+
private void maybeUpgradeIndices(ClusterState state) {
64+
// 99% of the time, this will be a noop, so precheck that before adding a cluster state update.
65+
if (needsUpgrade(state)) {
66+
logger.info("Upgrading partial searchable snapshots to use frozen shard limit group");
67+
clusterService.submitStateUpdateTask("searchable-snapshot-index-upgrader", new ClusterStateUpdateTask() {
68+
@Override
69+
public ClusterState execute(ClusterState currentState) throws Exception {
70+
return upgradeIndices(currentState);
71+
}
72+
73+
@Override
74+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
75+
clusterService.removeListener(listener);
76+
}
77+
78+
@Override
79+
public void onFailure(String source, Exception e) {
80+
logger.warn(
81+
"upgrading frozen indices to have frozen shard limit group failed, will retry on the next cluster state update",
82+
e
83+
);
84+
// let us try again later.
85+
upgraded.set(false);
86+
}
87+
});
88+
} else {
89+
clusterService.removeListener(listener);
90+
}
91+
}
92+
93+
static boolean needsUpgrade(ClusterState state) {
94+
return StreamSupport.stream(state.metadata().spliterator(), false)
95+
.filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_7_13_0))
96+
.map(IndexMetadata::getSettings)
97+
.filter(SearchableSnapshotsConstants::isPartialSearchableSnapshotIndex)
98+
.anyMatch(SearchableSnapshotIndexMetadataUpgrader::notFrozenShardLimitGroup);
99+
}
100+
101+
static ClusterState upgradeIndices(ClusterState currentState) {
102+
if (needsUpgrade(currentState) == false) {
103+
return currentState;
104+
}
105+
Metadata.Builder builder = Metadata.builder(currentState.metadata());
106+
StreamSupport.stream(currentState.metadata().spliterator(), false)
107+
.filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_8_0_0))
108+
.filter(
109+
imd -> SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(imd.getSettings())
110+
&& notFrozenShardLimitGroup(imd.getSettings())
111+
)
112+
.map(SearchableSnapshotIndexMetadataUpgrader::setShardLimitGroupFrozen)
113+
.forEach(imd -> builder.put(imd, true));
114+
return ClusterState.builder(currentState).metadata(builder).build();
115+
}
116+
117+
private static boolean notFrozenShardLimitGroup(org.elasticsearch.common.settings.Settings settings) {
118+
return ShardLimitValidator.FROZEN_GROUP.equals(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings)) == false;
119+
}
120+
121+
private static IndexMetadata setShardLimitGroupFrozen(IndexMetadata indexMetadata) {
122+
return IndexMetadata.builder(indexMetadata)
123+
.settings(
124+
Settings.builder()
125+
.put(indexMetadata.getSettings())
126+
.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
127+
)
128+
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
129+
.build();
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.upgrade;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.cluster.ClusterName;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.index.IndexModule;
17+
import org.elasticsearch.indices.ShardLimitValidator;
18+
import org.elasticsearch.test.ESTestCase;
19+
import org.elasticsearch.test.VersionUtils;
20+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
21+
22+
import java.util.stream.StreamSupport;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.is;
26+
import static org.hamcrest.Matchers.not;
27+
import static org.hamcrest.Matchers.notNullValue;
28+
import static org.hamcrest.Matchers.sameInstance;
29+
30+
public class SearchableSnapshotIndexMetadataUpgraderTests extends ESTestCase {
31+
32+
public void testNoUpgradeNeeded() {
33+
Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
34+
assertThat(needsUpgrade(metadataBuilder), is(false));
35+
}
36+
37+
public void testNeedsUpgrade() {
38+
Metadata.Builder metadataBuilder = addIndex(
39+
partial_7_12(),
40+
randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
41+
);
42+
assertThat(needsUpgrade(metadataBuilder), is(true));
43+
}
44+
45+
public void testUpgradeIndices() {
46+
Metadata.Builder metadataBuilder = addIndex(
47+
partial_7_12(),
48+
randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
49+
);
50+
51+
ClusterState originalState = clusterState(metadataBuilder);
52+
ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
53+
54+
assertThat(upgradedState, not(sameInstance(originalState)));
55+
assertThat(upgradedState.metadata().indices().size(), equalTo(originalState.metadata().indices().size()));
56+
57+
assertTrue(StreamSupport.stream(upgradedState.metadata().spliterator(), false).anyMatch(upgraded -> {
58+
IndexMetadata original = originalState.metadata().index(upgraded.getIndex());
59+
assertThat(original, notNullValue());
60+
if (isPartial(upgraded) == false
61+
|| ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(original.getSettings())
62+
.equals(ShardLimitValidator.FROZEN_GROUP)) {
63+
assertThat(upgraded, sameInstance(original));
64+
return false;
65+
} else {
66+
assertThat(isPartial(upgraded), is(isPartial(original)));
67+
assertThat(upgraded.getNumberOfShards(), equalTo(original.getNumberOfShards()));
68+
assertThat(upgraded.getNumberOfReplicas(), equalTo(original.getNumberOfReplicas()));
69+
assertThat(
70+
ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(upgraded.getSettings()),
71+
equalTo(ShardLimitValidator.FROZEN_GROUP)
72+
);
73+
assertThat(upgraded.getSettingsVersion(), equalTo(original.getSettingsVersion() + 1));
74+
return true;
75+
}
76+
}));
77+
}
78+
79+
public void testNoopUpgrade() {
80+
Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
81+
ClusterState originalState = clusterState(metadataBuilder);
82+
ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
83+
assertThat(upgradedState, sameInstance(originalState));
84+
}
85+
86+
private Settings normal() {
87+
return settings(VersionUtils.randomVersion(random())).build();
88+
}
89+
90+
private Settings partial_7_12() {
91+
return searchableSnapshotSettings(VersionUtils.randomVersionBetween(random(), Version.V_7_12_0, Version.V_7_12_1), true);
92+
}
93+
94+
private Settings partial_7_13plus() {
95+
Settings settings = searchableSnapshotSettings(
96+
VersionUtils.randomVersionBetween(random(), Version.V_7_13_0, Version.CURRENT),
97+
true
98+
);
99+
if (randomBoolean()) {
100+
return shardLimitGroupFrozen(settings);
101+
} else {
102+
return settings;
103+
}
104+
}
105+
106+
private Settings full() {
107+
return searchableSnapshotSettings(VersionUtils.randomVersion(random()), false);
108+
}
109+
110+
private Settings searchableSnapshotSettings(Version version, boolean partial) {
111+
Settings.Builder settings = settings(version);
112+
settings.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY);
113+
if (partial || randomBoolean()) {
114+
settings.put(SearchableSnapshotsConstants.SNAPSHOT_PARTIAL_SETTING.getKey(), partial);
115+
}
116+
return settings.build();
117+
}
118+
119+
private Settings shardLimitGroupFrozen(Settings settings) {
120+
return Settings.builder()
121+
.put(settings)
122+
.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
123+
.build();
124+
}
125+
126+
private Metadata.Builder addIndex(Settings settings, Metadata.Builder builder) {
127+
builder.put(
128+
IndexMetadata.builder(randomAlphaOfLength(10))
129+
.settings(settings)
130+
.numberOfShards(between(1, 10))
131+
.numberOfReplicas(between(0, 10))
132+
.build(),
133+
false
134+
);
135+
return builder;
136+
}
137+
138+
private Metadata.Builder randomMetadata(Settings... indexSettingsList) {
139+
Metadata.Builder builder = new Metadata.Builder();
140+
for (Settings settings : indexSettingsList) {
141+
for (int i = 0; i < between(0, 10); ++i) {
142+
addIndex(settings, builder);
143+
}
144+
}
145+
return builder;
146+
}
147+
148+
private boolean needsUpgrade(Metadata.Builder metadataBuilder) {
149+
return SearchableSnapshotIndexMetadataUpgrader.needsUpgrade(clusterState(metadataBuilder));
150+
}
151+
152+
private ClusterState clusterState(Metadata.Builder metadataBuilder) {
153+
return ClusterState.builder(ClusterName.DEFAULT).metadata(metadataBuilder).build();
154+
}
155+
156+
private boolean isPartial(IndexMetadata upgraded) {
157+
return SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(upgraded.getSettings());
158+
}
159+
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.xcontent.support.XContentMapValues;
22+
import org.elasticsearch.indices.ShardLimitValidator;
2223
import org.elasticsearch.repositories.fs.FsRepository;
2324
import org.elasticsearch.rest.RestStatus;
2425
import org.hamcrest.Matcher;
@@ -31,6 +32,7 @@
3132

3233
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
3334
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.hasEntry;
3436
import static org.hamcrest.Matchers.notNullValue;
3537

3638
public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase {
@@ -46,6 +48,14 @@ public void testMountPartialCopyAndRecoversCorrectly() throws Exception {
4648
final Storage storage = Storage.SHARED_CACHE;
4749
assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE);
4850

51+
if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) {
52+
assertBusy(() -> {
53+
Map<String, Object> settings = getIndexSettingsAsMap("mounted_index_shared_cache");
54+
assertThat(settings,
55+
hasEntry(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP));
56+
});
57+
}
58+
4959
executeMountAndRecoversCorrectlyTestCase(storage, 5678L);
5060
}
5161

0 commit comments

Comments
 (0)