Skip to content

Commit 4c4355f

Browse files
committed
Allow ClusterState.Custom to be created on initial cluster states (#26144)
Today we have a `null` invariant on all `ClusterState.Custom`. This makes several code paths complicated and requires complex state handling in some cases. This change allows to register a custom supplier that is used to initialize the initial clusterstate with these transient customs.
1 parent 5208242 commit 4c4355f

22 files changed

+220
-37
lines changed

core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373

7474
import java.util.ArrayList;
7575
import java.util.Collection;
76+
import java.util.Collections;
7677
import java.util.HashMap;
7778
import java.util.LinkedHashMap;
7879
import java.util.List;
@@ -94,7 +95,6 @@ public class ClusterModule extends AbstractModule {
9495
private final IndexNameExpressionResolver indexNameExpressionResolver;
9596
private final AllocationDeciders allocationDeciders;
9697
private final AllocationService allocationService;
97-
private final Runnable onStarted;
9898
// pkg private for tests
9999
final Collection<AllocationDecider> deciderList;
100100
final ShardsAllocator shardsAllocator;
@@ -107,9 +107,24 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
107107
this.clusterService = clusterService;
108108
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
109109
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
110-
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
111110
}
112111

112+
public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
113+
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
114+
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
115+
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
116+
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
117+
for (ClusterPlugin plugin : clusterPlugins) {
118+
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
119+
for (String key : initialCustomSupplier.keySet()) {
120+
if (customSupplier.containsKey(key)) {
121+
throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
122+
}
123+
}
124+
customSupplier.putAll(initialCustomSupplier);
125+
}
126+
return Collections.unmodifiableMap(customSupplier);
127+
}
113128

114129
public static List<Entry> getNamedWriteables() {
115130
List<Entry> entries = new ArrayList<>();
@@ -243,8 +258,4 @@ protected void configure() {
243258
bind(AllocationDeciders.class).toInstance(allocationDeciders);
244259
bind(ShardsAllocator.class).toInstance(shardsAllocator);
245260
}
246-
247-
public Runnable onStarted() {
248-
return onStarted;
249-
}
250261
}

core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,6 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
4545

4646
private final List<Entry> entries;
4747

48-
/**
49-
* Constructs new restore metadata
50-
*
51-
* @param entries list of currently running restore processes
52-
*/
53-
public RestoreInProgress(List<Entry> entries) {
54-
this.entries = entries;
55-
}
56-
5748
/**
5849
* Constructs new restore metadata
5950
*

core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
4545
// the list of snapshot deletion request entries
4646
private final List<Entry> entries;
4747

48+
public SnapshotDeletionsInProgress() {
49+
this(Collections.emptyList());
50+
}
51+
4852
private SnapshotDeletionsInProgress(List<Entry> entries) {
4953
this.entries = Collections.unmodifiableList(entries);
5054
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation
6767
// Only primary shards are snapshotted
6868

6969
SnapshotsInProgress snapshotsInProgress = allocation.custom(SnapshotsInProgress.TYPE);
70-
if (snapshotsInProgress == null) {
70+
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
7171
// Snapshots are not running
7272
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
7373
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,10 @@ public interface ClusterApplier {
3939
* @param listener callback that is invoked after cluster state is applied
4040
*/
4141
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
42+
43+
/**
44+
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
45+
*/
46+
ClusterState.Builder newClusterStateBuilder();
47+
4248
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
9797
private final AtomicReference<ClusterState> state; // last applied state
9898

9999
private NodeConnectionsService nodeConnectionsService;
100+
private Supplier<ClusterState.Builder> stateBuilderSupplier;
100101

101-
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
102+
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
103+
.Builder> stateBuilderSupplier) {
102104
super(settings);
103105
this.clusterSettings = clusterSettings;
104106
this.threadPool = threadPool;
105107
this.state = new AtomicReference<>();
106108
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
107109
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
110+
this.stateBuilderSupplier = stateBuilderSupplier;
108111
}
109112

110113
public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@@ -653,4 +656,9 @@ public void run() {
653656
protected long currentTimeInNanos() {
654657
return System.nanoTime();
655658
}
659+
660+
@Override
661+
public ClusterState.Builder newClusterStateBuilder() {
662+
return stateBuilderSupplier.get();
663+
}
656664
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.util.Collections;
4444
import java.util.Map;
45+
import java.util.function.Supplier;
4546

4647
public class ClusterService extends AbstractLifecycleComponent {
4748

@@ -58,16 +59,30 @@ public class ClusterService extends AbstractLifecycleComponent {
5859
private final OperationRouting operationRouting;
5960

6061
private final ClusterSettings clusterSettings;
62+
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;
6163

62-
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
64+
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
65+
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
6366
super(settings);
64-
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
6567
this.masterService = new MasterService(settings, threadPool);
6668
this.operationRouting = new OperationRouting(settings, clusterSettings);
6769
this.clusterSettings = clusterSettings;
6870
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
6971
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
7072
this::setSlowTaskLoggingThreshold);
73+
this.initialClusterStateCustoms = initialClusterStateCustoms;
74+
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
75+
}
76+
77+
/**
78+
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
79+
*/
80+
public ClusterState.Builder newClusterStateBuilder() {
81+
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
82+
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
83+
builder.putCustom(entry.getKey(), entry.getValue().get());
84+
}
85+
return builder;
7186
}
7287

7388
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {

core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ protected synchronized void doStart() {
117117
}
118118

119119
protected ClusterState createInitialState(DiscoveryNode localNode) {
120-
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
121-
.nodes(DiscoveryNodes.builder().add(localNode)
120+
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
121+
return builder.nodes(DiscoveryNodes.builder().add(localNode)
122122
.localNodeId(localNode.getId())
123123
.masterNodeId(localNode.getId())
124124
.build())

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
113113

114114
private final TransportService transportService;
115115
private final MasterService masterService;
116-
private final ClusterName clusterName;
117116
private final DiscoverySettings discoverySettings;
118117
protected final ZenPing zenPing; // protected to allow tests access
119118
private final MasterFaultDetection masterFD;
@@ -169,7 +168,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
169168
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
170169
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
171170
this.threadPool = threadPool;
172-
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
171+
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
173172
this.committedState = new AtomicReference<>();
174173

175174
this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
@@ -238,7 +237,8 @@ protected void doStart() {
238237
// set initial state
239238
assert committedState.get() == null;
240239
assert localNode != null;
241-
ClusterState initialState = ClusterState.builder(clusterName)
240+
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
241+
ClusterState initialState = builder
242242
.blocks(ClusterBlocks.builder()
243243
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
244244
.addGlobalBlock(discoverySettings.getNoMasterBlock()))

core/src/main/java/org/elasticsearch/gateway/Gateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
155155
metaDataBuilder.transientSettings(),
156156
e -> logUnknownSetting("transient", e),
157157
(e, ex) -> logInvalidSetting("transient", e, ex)));
158-
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
158+
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
159159
builder.metaData(metaDataBuilder);
160160
listener.onSuccess(builder.build());
161161
}

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
329329
resourcesToClose.add(resourceWatcherService);
330330
final NetworkService networkService = new NetworkService(
331331
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
332-
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
332+
333+
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
334+
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
335+
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
333336
clusterService.addListener(scriptModule.getScriptService());
334337
resourcesToClose.add(clusterService);
335338
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
@@ -346,8 +349,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
346349
modules.add(pluginModule);
347350
}
348351
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
349-
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
350-
pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
352+
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
351353
modules.add(clusterModule);
352354
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
353355
modules.add(indicesModule);

core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.function.Supplier;
2626

27+
import org.elasticsearch.cluster.ClusterState;
2728
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
2829
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
2930
import org.elasticsearch.common.settings.ClusterSettings;
@@ -63,6 +64,12 @@ default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings sett
6364
* Called when the node is started
6465
*/
6566
default void onNodeStarted() {
66-
6767
}
68+
69+
/**
70+
* Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
71+
* This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
72+
* but customs are not initialized.
73+
*/
74+
default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
6875
}

core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
public class ClusterModuleTests extends ModuleTestCase {
6060
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
6161
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
62-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
62+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
6363
static class FakeAllocationDecider extends AllocationDecider {
6464
protected FakeAllocationDecider(Settings settings) {
6565
super(settings);
@@ -196,4 +196,48 @@ public void testAllocationDeciderOrder() {
196196
assertSame(decider.getClass(), expectedDeciders.get(idx++));
197197
}
198198
}
199+
200+
public void testCustomSuppliers() {
201+
Map<String, Supplier<ClusterState.Custom>> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList());
202+
assertEquals(3, customSuppliers.size());
203+
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
204+
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
205+
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
206+
207+
customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
208+
@Override
209+
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
210+
return Collections.singletonMap("foo", () -> null);
211+
}
212+
}));
213+
assertEquals(4, customSuppliers.size());
214+
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
215+
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
216+
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
217+
assertTrue(customSuppliers.containsKey("foo"));
218+
219+
220+
IllegalStateException ise = expectThrows(IllegalStateException.class,
221+
() -> ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
222+
@Override
223+
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
224+
return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null);
225+
}
226+
})));
227+
assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once");
228+
229+
ise = expectThrows(IllegalStateException.class,
230+
() -> ClusterModule.getClusterStateCustomSuppliers(Arrays.asList(new ClusterPlugin() {
231+
@Override
232+
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
233+
return Collections.singletonMap("foo", () -> null);
234+
}
235+
}, new ClusterPlugin() {
236+
@Override
237+
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
238+
return Collections.singletonMap("foo", () -> null);
239+
}
240+
})));
241+
assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
242+
}
199243
}

core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
public class TemplateUpgradeServiceTests extends ESTestCase {
7575

7676
private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
77-
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
77+
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
7878

7979
public void testCalculateChangesAddChangeAndDelete() {
8080

core/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ static class TimedClusterApplierService extends ClusterApplierService {
413413
public volatile Long currentTimeOverride = null;
414414

415415
TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
416-
super(settings, clusterSettings, threadPool);
416+
super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
417417
}
418418

419419
@Override
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.service;
20+
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.Diff;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.settings.ClusterSettings;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.xcontent.XContentBuilder;
27+
import org.elasticsearch.test.ESTestCase;
28+
29+
import java.io.IOException;
30+
import java.util.Collections;
31+
32+
public class ClusterSerivceTests extends ESTestCase {
33+
34+
public void testNewBuilderContainsCustoms() {
35+
ClusterState.Custom custom = new ClusterState.Custom() {
36+
@Override
37+
public Diff<ClusterState.Custom> diff(ClusterState.Custom previousState) {
38+
return null;
39+
}
40+
41+
@Override
42+
public String getWriteableName() {
43+
return null;
44+
}
45+
46+
@Override
47+
public void writeTo(StreamOutput out) throws IOException {
48+
49+
}
50+
51+
@Override
52+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
53+
return null;
54+
}
55+
};
56+
ClusterService service = new ClusterService(Settings.EMPTY,
57+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () ->
58+
custom));
59+
ClusterState.Builder builder = service.newClusterStateBuilder();
60+
assertSame(builder.build().custom("foo"), custom);
61+
}
62+
}

0 commit comments

Comments
 (0)