Skip to content

Commit a2c6c16

Browse files
Move experimental frozen to frozen shard limit
Frozen indices created on 7.12 would not belong to the frozen shard limit group, now we convert them when last node is upgraded. Relates elastic#71392
1 parent ab7fe5b commit a2c6c16

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
100100
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
101101
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
102+
import org.elasticsearch.xpack.searchablesnapshots.upgrade.SearchableSnapshotIndexMetadataUpgrader;
102103

103104
import java.io.IOException;
104105
import java.io.UncheckedIOException;
@@ -344,6 +345,7 @@ public Collection<Object> createComponents(
344345
}
345346
this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
346347
components.add(new CacheServiceSupplier(cacheService.get()));
348+
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
347349
return Collections.unmodifiableList(components);
348350
}
349351

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.upgrade;
9+
10+
import org.apache.log4j.LogManager;
11+
import org.apache.log4j.Logger;
12+
import org.elasticsearch.Version;
13+
import org.elasticsearch.cluster.ClusterChangedEvent;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
16+
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.metadata.Metadata;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.indices.ShardLimitValidator;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
23+
24+
import java.util.concurrent.Executor;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.stream.StreamSupport;
27+
28+
/**
29+
* This class upgrades frozen indices to apply the index.shard_limit.group=frozen setting after all nodes have been upgraded to 7.13+
30+
*/
31+
public class SearchableSnapshotIndexMetadataUpgrader {
32+
private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexMetadataUpgrader.class);
33+
34+
private final ClusterService clusterService;
35+
private final ThreadPool threadPool;
36+
private final AtomicBoolean upgraded = new AtomicBoolean();
37+
38+
public SearchableSnapshotIndexMetadataUpgrader(ClusterService clusterService, ThreadPool threadPool) {
39+
this.clusterService = clusterService;
40+
this.threadPool = threadPool;
41+
}
42+
43+
public void initialize() {
44+
clusterService.addListener(this::clusterChanged);
45+
}
46+
47+
private void clusterChanged(ClusterChangedEvent event) {
48+
if (upgraded.get()) {
49+
return;
50+
}
51+
52+
if (event.localNodeMaster() && event.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_13_0)) {
53+
// only want one doing this at a time, assume it succeeds and reset if not.
54+
if (upgraded.compareAndSet(false, true)) {
55+
final Executor executor = threadPool.generic();
56+
executor.execute(() -> maybeUpgradeIndices(event.state()));
57+
}
58+
}
59+
}
60+
61+
private void maybeUpgradeIndices(ClusterState state) {
62+
// 99% of the time, this will be a noop, so precheck that before adding a cluster state update.
63+
if (needsUpgrade(state)) {
64+
logger.info("Upgrading partial searchable snapshots to use frozen shard limit group");
65+
clusterService.submitStateUpdateTask("searchable-snapshot-index-upgrader", new ClusterStateUpdateTask() {
66+
@Override
67+
public ClusterState execute(ClusterState currentState) throws Exception {
68+
return upgradeIndices(currentState);
69+
}
70+
71+
@Override
72+
public void onFailure(String source, Exception e) {
73+
logger.warn("upgrading frozen indices to have frozen shard limit group failed", e);
74+
// let us try again later.
75+
upgraded.set(false);
76+
}
77+
});
78+
}
79+
}
80+
81+
static boolean needsUpgrade(ClusterState state) {
82+
return StreamSupport.stream(state.metadata().spliterator(), false)
83+
// todo fix upper bound version after shard limit validator is finally backported.
84+
.filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_8_0_0))
85+
.map(IndexMetadata::getSettings)
86+
.filter(SearchableSnapshotsConstants::isPartialSearchableSnapshotIndex)
87+
.anyMatch(SearchableSnapshotIndexMetadataUpgrader::wrongShardLimitGroup);
88+
}
89+
90+
static ClusterState upgradeIndices(ClusterState currentState) {
91+
if (needsUpgrade(currentState) == false) {
92+
return currentState;
93+
}
94+
Metadata.Builder builder = Metadata.builder(currentState.metadata());
95+
StreamSupport.stream(currentState.metadata().spliterator(), false)
96+
.filter(imd -> imd.getCreationVersion().onOrAfter(Version.V_7_12_0) && imd.getCreationVersion().before(Version.V_8_0_0))
97+
.filter(
98+
imd -> SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(imd.getSettings())
99+
&& wrongShardLimitGroup(imd.getSettings())
100+
)
101+
.map(SearchableSnapshotIndexMetadataUpgrader::setShardLimitGroupFrozen)
102+
.forEach(imd -> builder.put(imd, true));
103+
return ClusterState.builder(currentState).metadata(builder).build();
104+
}
105+
106+
private static boolean wrongShardLimitGroup(org.elasticsearch.common.settings.Settings settings) {
107+
return ShardLimitValidator.FROZEN_GROUP.equals(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings)) == false;
108+
}
109+
110+
private static IndexMetadata setShardLimitGroupFrozen(IndexMetadata indexMetadata) {
111+
return IndexMetadata.builder(indexMetadata)
112+
.settings(
113+
Settings.builder()
114+
.put(indexMetadata.getSettings())
115+
.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
116+
)
117+
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
118+
.build();
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.upgrade;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.cluster.ClusterName;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.index.IndexModule;
17+
import org.elasticsearch.indices.ShardLimitValidator;
18+
import org.elasticsearch.test.ESTestCase;
19+
import org.elasticsearch.test.VersionUtils;
20+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
21+
22+
import java.util.stream.StreamSupport;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.is;
26+
import static org.hamcrest.Matchers.not;
27+
import static org.hamcrest.Matchers.notNullValue;
28+
import static org.hamcrest.Matchers.sameInstance;
29+
30+
public class SearchableSnapshotIndexMetadataUpgraderTests extends ESTestCase {
31+
32+
public void testNoUpgradeNeeded() {
33+
Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
34+
assertThat(needsUpgrade(metadataBuilder), is(false));
35+
}
36+
37+
public void testNeedsUpgrade() {
38+
Metadata.Builder metadataBuilder = addIndex(
39+
partial_7_12(),
40+
randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
41+
);
42+
assertThat(needsUpgrade(metadataBuilder), is(true));
43+
}
44+
45+
public void testUpgradeIndices() {
46+
Metadata.Builder metadataBuilder = addIndex(
47+
partial_7_12(),
48+
randomMetadata(normal(), full(), partial_7_13plus(), partial_7_12(), shardLimitGroupFrozen(partial_7_12()))
49+
);
50+
51+
ClusterState originalState = clusterState(metadataBuilder);
52+
ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
53+
54+
assertThat(upgradedState, not(sameInstance(originalState)));
55+
assertThat(upgradedState.metadata().indices().size(), equalTo(originalState.metadata().indices().size()));
56+
57+
assertTrue(StreamSupport.stream(upgradedState.metadata().spliterator(), false).anyMatch(upgraded -> {
58+
IndexMetadata original = originalState.metadata().index(upgraded.getIndex());
59+
assertThat(original, notNullValue());
60+
if (isPartial(upgraded) == false
61+
|| ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(original.getSettings())
62+
.equals(ShardLimitValidator.FROZEN_GROUP)) {
63+
assertThat(upgraded, sameInstance(original));
64+
return false;
65+
} else {
66+
assertThat(isPartial(upgraded), is(isPartial(original)));
67+
assertThat(upgraded.getNumberOfShards(), equalTo(original.getNumberOfShards()));
68+
assertThat(upgraded.getNumberOfReplicas(), equalTo(original.getNumberOfReplicas()));
69+
assertThat(
70+
ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.get(upgraded.getSettings()),
71+
equalTo(ShardLimitValidator.FROZEN_GROUP)
72+
);
73+
assertThat(upgraded.getSettingsVersion(), equalTo(original.getSettingsVersion() + 1));
74+
return true;
75+
}
76+
}));
77+
}
78+
79+
public void testNoopUpgrade() {
80+
Metadata.Builder metadataBuilder = randomMetadata(normal(), full(), partial_7_13plus(), shardLimitGroupFrozen(partial_7_12()));
81+
ClusterState originalState = clusterState(metadataBuilder);
82+
ClusterState upgradedState = SearchableSnapshotIndexMetadataUpgrader.upgradeIndices(originalState);
83+
assertThat(upgradedState, sameInstance(originalState));
84+
}
85+
86+
private Settings normal() {
87+
return settings(VersionUtils.randomVersion(random())).build();
88+
}
89+
90+
private Settings partial_7_12() {
91+
return searchableSnapshotSettings(VersionUtils.randomVersionBetween(random(), Version.V_7_12_0, Version.V_7_12_1), true);
92+
}
93+
94+
private Settings partial_7_13plus() {
95+
Settings settings = searchableSnapshotSettings(
96+
// todo: lower bound version should be 7_13 after shard limit validator is backported.
97+
VersionUtils.randomVersionBetween(random(), Version.V_8_0_0, Version.CURRENT),
98+
true
99+
);
100+
if (randomBoolean()) {
101+
return shardLimitGroupFrozen(settings);
102+
} else {
103+
return settings;
104+
}
105+
}
106+
107+
private Settings full() {
108+
return searchableSnapshotSettings(VersionUtils.randomVersion(random()), false);
109+
}
110+
111+
private Settings searchableSnapshotSettings(Version version, boolean partial) {
112+
Settings.Builder settings = settings(version);
113+
settings.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY);
114+
if (partial || randomBoolean()) {
115+
settings.put(SearchableSnapshotsConstants.SNAPSHOT_PARTIAL_SETTING.getKey(), partial);
116+
}
117+
return settings.build();
118+
}
119+
120+
private Settings shardLimitGroupFrozen(Settings settings) {
121+
return Settings.builder()
122+
.put(settings)
123+
.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP)
124+
.build();
125+
}
126+
127+
private Metadata.Builder addIndex(Settings settings, Metadata.Builder builder) {
128+
builder.put(
129+
IndexMetadata.builder(randomAlphaOfLength(10))
130+
.settings(settings)
131+
.numberOfShards(between(1, 10))
132+
.numberOfReplicas(between(0, 10))
133+
.build(),
134+
false
135+
);
136+
return builder;
137+
}
138+
139+
private Metadata.Builder randomMetadata(Settings... indexSettingsList) {
140+
Metadata.Builder builder = new Metadata.Builder();
141+
for (Settings settings : indexSettingsList) {
142+
for (int i = 0; i < between(0, 10); ++i) {
143+
addIndex(settings, builder);
144+
}
145+
}
146+
return builder;
147+
}
148+
149+
private boolean needsUpgrade(Metadata.Builder metadataBuilder) {
150+
return SearchableSnapshotIndexMetadataUpgrader.needsUpgrade(clusterState(metadataBuilder));
151+
}
152+
153+
private ClusterState clusterState(Metadata.Builder metadataBuilder) {
154+
return ClusterState.builder(ClusterName.DEFAULT).metadata(metadataBuilder).build();
155+
}
156+
157+
private boolean isPartial(IndexMetadata upgraded) {
158+
return SearchableSnapshotsConstants.isPartialSearchableSnapshotIndex(upgraded.getSettings());
159+
}
160+
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.xcontent.support.XContentMapValues;
22+
import org.elasticsearch.indices.ShardLimitValidator;
2223
import org.elasticsearch.repositories.fs.FsRepository;
2324
import org.elasticsearch.rest.RestStatus;
2425
import org.hamcrest.Matcher;
@@ -31,6 +32,7 @@
3132

3233
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
3334
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.hasEntry;
3436
import static org.hamcrest.Matchers.notNullValue;
3537

3638
public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase {
@@ -46,6 +48,14 @@ public void testMountPartialCopyAndRecoversCorrectly() throws Exception {
4648
final Storage storage = Storage.SHARED_CACHE;
4749
assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE);
4850

51+
if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) {
52+
assertBusy(() -> {
53+
Map<String, Object> settings = getIndexSettingsAsMap("mounted_index_shared_cache");
54+
assertThat(settings,
55+
hasEntry(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), ShardLimitValidator.FROZEN_GROUP));
56+
});
57+
}
58+
4959
executeMountAndRecoversCorrectlyTestCase(storage, 5678L);
5060
}
5161

0 commit comments

Comments
 (0)