Skip to content

Commit 61ca469

Browse files
committed
Move tribe to a module (#25778)
This commit moves tribe to a module, stripping core from the tribe functionality.
1 parent c07dfe9 commit 61ca469

File tree

26 files changed

+452
-267
lines changed

26 files changed

+452
-267
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@
417417
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotShardsService.java" checks="LineLength" />
418418
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotsService.java" checks="LineLength" />
419419
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPool.java" checks="LineLength" />
420-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
420+
<suppress files="modules[/\\]tribe[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
421421
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]queries[/\\]BlendedTermQueryTests.java" checks="LineLength" />
422422
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]VersionTests.java" checks="LineLength" />
423423
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]RejectionActionIT.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public class ClusterModule extends AbstractModule {
9494
private final IndexNameExpressionResolver indexNameExpressionResolver;
9595
private final AllocationDeciders allocationDeciders;
9696
private final AllocationService allocationService;
97+
private final Runnable onStarted;
9798
// pkg private for tests
9899
final Collection<AllocationDecider> deciderList;
99100
final ShardsAllocator shardsAllocator;
@@ -106,6 +107,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
106107
this.clusterService = clusterService;
107108
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
108109
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
110+
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
109111
}
110112

111113

@@ -241,4 +243,8 @@ protected void configure() {
241243
bind(AllocationDeciders.class).toInstance(allocationDeciders);
242244
bind(ShardsAllocator.class).toInstance(shardsAllocator);
243245
}
246+
247+
public Runnable onStarted() {
248+
return onStarted;
249+
}
244250
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster;
21+
22+
import org.elasticsearch.cluster.metadata.MetaData;
23+
24+
/**
25+
* Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom}.
26+
* When multiple Mergable Custom metadata of the same type are found (from underlying clusters), the
27+
* Custom metadata can be merged using {@link #merge(MetaData.Custom)}.
28+
*
29+
* @param <T> type of custom meta data
30+
*/
31+
public interface MergableCustomMetaData<T extends MetaData.Custom> {
32+
33+
/**
34+
* Merges this custom metadata with other, returning either this or <code>other</code> custom metadata.
35+
* This method should not mutate either <code>this</code> or the <code>other</code> custom metadata.
36+
*
37+
* @param other custom meta data
38+
* @return the same instance or <code>other</code> custom metadata based on implementation
39+
* if both the instances are considered equal, implementations should return this
40+
* instance to avoid redundant cluster state changes.
41+
*/
42+
T merge(T other);
43+
}

core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
7272
import org.elasticsearch.indices.recovery.RecoverySettings;
7373
import org.elasticsearch.indices.store.IndicesStore;
74-
import org.elasticsearch.ingest.IngestService;
7574
import org.elasticsearch.monitor.fs.FsService;
7675
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
7776
import org.elasticsearch.monitor.jvm.JvmService;
@@ -91,7 +90,6 @@
9190
import org.elasticsearch.transport.TcpTransport;
9291
import org.elasticsearch.transport.Transport;
9392
import org.elasticsearch.transport.TransportService;
94-
import org.elasticsearch.tribe.TribeService;
9593
import org.elasticsearch.watcher.ResourceWatcherService;
9694

9795
import java.util.Arrays;
@@ -369,13 +367,6 @@ public void apply(Settings value, Settings current, Settings previous) {
369367
ThreadContext.DEFAULT_HEADERS_SETTING,
370368
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
371369
ESLoggerFactory.LOG_LEVEL_SETTING,
372-
TribeService.BLOCKS_METADATA_SETTING,
373-
TribeService.BLOCKS_WRITE_SETTING,
374-
TribeService.BLOCKS_WRITE_INDICES_SETTING,
375-
TribeService.BLOCKS_READ_INDICES_SETTING,
376-
TribeService.BLOCKS_METADATA_INDICES_SETTING,
377-
TribeService.ON_CONFLICT_SETTING,
378-
TribeService.TRIBE_NAME_SETTING,
379370
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
380371
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
381372
OsService.REFRESH_INTERVAL_SETTING,

core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.common.xcontent.ToXContent;
2727
import org.elasticsearch.common.xcontent.XContentBuilder;
2828
import org.elasticsearch.common.xcontent.XContentType;
29-
import org.elasticsearch.tribe.TribeService;
3029

3130
import java.io.IOException;
3231
import java.util.Arrays;
@@ -49,8 +48,6 @@ public class SettingsModule implements Module {
4948
private final Set<String> settingsFilterPattern = new HashSet<>();
5049
private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
5150
private final Map<String, Setting<?>> indexSettings = new HashMap<>();
52-
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
53-
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
5451
private final Logger logger;
5552
private final IndexScopedSettings indexScopedSettings;
5653
private final ClusterSettings clusterSettings;
@@ -135,9 +132,7 @@ public SettingsModule(Settings settings, List<Setting<?>> additionalSettings, Li
135132
}
136133
}
137134
// by now we are fully configured, lets check node level settings for unregistered index settings
138-
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
139-
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
140-
validateTribeSettings(settings, clusterSettings);
135+
clusterSettings.validate(settings);
141136
this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
142137
}
143138

@@ -195,20 +190,6 @@ private void registerSettingsFilter(String filter) {
195190
settingsFilterPattern.add(filter);
196191
}
197192

198-
private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
199-
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);
200-
for (Map.Entry<String, Settings> tribeSettings : groups.entrySet()) {
201-
Settings thisTribesSettings = tribeSettings.getValue();
202-
for (Map.Entry<String, String> entry : thisTribesSettings.getAsMap().entrySet()) {
203-
try {
204-
clusterSettings.validate(entry.getKey(), thisTribesSettings);
205-
} catch (IllegalArgumentException ex) {
206-
throw new IllegalArgumentException("tribe." + tribeSettings.getKey() +" validation failed: "+ ex.getMessage(), ex);
207-
}
208-
}
209-
}
210-
}
211-
212193
public Settings getSettings() {
213194
return settings;
214195
}

core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
8686
discoveryTypes.put("zen",
8787
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
8888
clusterSettings, hostsProvider, allocationService));
89-
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
9089
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
9190
for (DiscoveryPlugin plugin : plugins) {
9291
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@
133133
import org.elasticsearch.transport.Transport;
134134
import org.elasticsearch.transport.TransportInterceptor;
135135
import org.elasticsearch.transport.TransportService;
136-
import org.elasticsearch.tribe.TribeService;
137136
import org.elasticsearch.usage.UsageService;
138137
import org.elasticsearch.watcher.ResourceWatcherService;
139138

@@ -256,8 +255,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
256255
Settings tmpSettings = Settings.builder().put(environment.settings())
257256
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
258257

259-
tmpSettings = TribeService.processSettings(tmpSettings);
260-
261258
// create the node environment as soon as possible, to recover the node id and enable logging
262259
try {
263260
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
@@ -385,15 +382,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
385382
.flatMap(p -> p.getNamedXContent().stream()),
386383
ClusterModule.getNamedXWriteables().stream())
387384
.flatMap(Function.identity()).collect(toList()));
388-
final TribeService tribeService =
389-
new TribeService(
390-
settings,
391-
environment.configFile(),
392-
clusterService,
393-
nodeId,
394-
namedWriteableRegistry,
395-
(s, p) -> newTribeClientNode(s, classpathPlugins, p));
396-
resourcesToClose.add(tribeService);
397385
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
398386
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
399387
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
@@ -404,7 +392,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
404392

405393
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
406394
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
407-
scriptModule.getScriptService(), xContentRegistry).stream())
395+
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
396+
namedWriteableRegistry).stream())
408397
.collect(Collectors.toList());
409398
final RestController restController = actionModule.getRestController();
410399
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
@@ -458,7 +447,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
458447
b.bind(Environment.class).toInstance(this.environment);
459448
b.bind(ThreadPool.class).toInstance(threadPool);
460449
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
461-
b.bind(TribeService.class).toInstance(tribeService);
462450
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
463451
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
464452
b.bind(BigArrays.class).toInstance(bigArrays);
@@ -612,10 +600,6 @@ public Node start() throws NodeValidationException {
612600
Discovery discovery = injector.getInstance(Discovery.class);
613601
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
614602

615-
// start before the cluster service since it adds/removes initial Cluster state blocks
616-
final TribeService tribeService = injector.getInstance(TribeService.class);
617-
tribeService.start();
618-
619603
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
620604
TransportService transportService = injector.getInstance(TransportService.class);
621605
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
@@ -682,10 +666,10 @@ public void onTimeout(TimeValue timeout) {
682666
writePortsFile("transport", transport.boundAddress());
683667
}
684668

685-
// start nodes now, after the http server, because it may take some time
686-
tribeService.startNodes();
687669
logger.info("started");
688670

671+
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
672+
689673
return this;
690674
}
691675

@@ -696,7 +680,6 @@ private Node stop() {
696680
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
697681
logger.info("stopping ...");
698682

699-
injector.getInstance(TribeService.class).stop();
700683
injector.getInstance(ResourceWatcherService.class).stop();
701684
if (NetworkModule.HTTP_ENABLED.get(settings)) {
702685
injector.getInstance(HttpServerTransport.class).stop();
@@ -744,7 +727,6 @@ public synchronized void close() throws IOException {
744727
List<Closeable> toClose = new ArrayList<>();
745728
StopWatch stopWatch = new StopWatch("node_close");
746729
toClose.add(() -> stopWatch.start("tribe"));
747-
toClose.add(injector.getInstance(TribeService.class));
748730
toClose.add(() -> stopWatch.stop().start("node_service"));
749731
toClose.add(nodeService);
750732
toClose.add(() -> stopWatch.stop().start("http"));
@@ -920,11 +902,6 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
920902
return customNameResolvers;
921903
}
922904

923-
/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
924-
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
925-
return new Node(new Environment(settings, configPath), classpathPlugins);
926-
}
927-
928905
/** Constructs a ClusterInfoService which may be mocked for tests. */
929906
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
930907
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {

core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,11 @@ default Collection<AllocationDecider> createAllocationDeciders(Settings settings
5858
default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
5959
return Collections.emptyMap();
6060
}
61+
62+
/**
63+
* Called when the node is started
64+
*/
65+
default void onNodeStarted() {
66+
67+
}
6168
}

core/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3939
import org.elasticsearch.common.xcontent.XContentParser;
4040
import org.elasticsearch.discovery.DiscoveryModule;
41+
import org.elasticsearch.env.Environment;
42+
import org.elasticsearch.env.NodeEnvironment;
4143
import org.elasticsearch.index.IndexModule;
4244
import org.elasticsearch.indices.analysis.AnalysisModule;
4345
import org.elasticsearch.repositories.RepositoriesModule;
@@ -104,10 +106,15 @@ public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses()
104106
* @param threadPool A service to allow retrieving an executor to run an async action
105107
* @param resourceWatcherService A service to watch for changes to node local files
106108
* @param scriptService A service to allow running scripts on the local node
109+
* @param xContentRegistry the registry for extensible xContent parsing
110+
* @param environment the environment for path and setting configurations
111+
* @param nodeEnvironment the node environment used coordinate access to the data paths
112+
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
107113
*/
108114
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
109115
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
110-
NamedXContentRegistry xContentRegistry) {
116+
NamedXContentRegistry xContentRegistry, Environment environment,
117+
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
111118
return Collections.emptyList();
112119
}
113120

core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.elasticsearch.client.Client;
2424
import org.elasticsearch.cluster.service.ClusterService;
25+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2526
import org.elasticsearch.common.logging.Loggers;
2627
import org.elasticsearch.common.settings.Setting;
2728
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
30+
import org.elasticsearch.env.Environment;
31+
import org.elasticsearch.env.NodeEnvironment;
2932
import org.elasticsearch.plugins.Plugin;
3033
import org.elasticsearch.script.ScriptService;
3134
import org.elasticsearch.test.ESIntegTestCase;
@@ -67,11 +70,13 @@ public TestPlugin(Settings settings) {
6770
@Override
6871
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
6972
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
70-
NamedXContentRegistry xContentRegistry) {
73+
NamedXContentRegistry xContentRegistry, Environment environment,
74+
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
7175
clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> {
7276
logger.debug("the template dummy setting was updated to {}", integer);
7377
});
74-
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry);
78+
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry,
79+
environment, nodeEnvironment, namedWriteableRegistry);
7580
}
7681

7782
@Override

0 commit comments

Comments
 (0)