Skip to content

Move tribe to a module #25778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 28, 2017
Merged
2 changes: 1 addition & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotShardsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPool.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="modules[/\\]tribe[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]queries[/\\]BlendedTermQueryTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]VersionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]RejectionActionIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.metadata.MetaData;

/**
* Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom}.
* When multiple Mergable Custom metadata of the same type are found (from underlying clusters), the
* Custom metadata can be merged using {@link #merge(MetaData.Custom)}.
*
* @param <T> type of custom meta data
*/
public interface MergableCustomMetaData<T extends MetaData.Custom> {

/**
* Merges this custom metadata with other, returning either this or <code>other</code> custom metadata.
* This method should not mutate either <code>this</code> or the <code>other</code> custom metadata.
*
* @param other custom meta data
* @return the same instance or <code>other</code> custom metadata based on implementation
* if both the instances are considered equal, implementations should return this
* instance to avoid redundant cluster state changes.
*/
T merge(T other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
Expand All @@ -91,7 +90,6 @@
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Arrays;
Expand Down Expand Up @@ -369,13 +367,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ThreadContext.DEFAULT_HEADERS_SETTING,
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
ESLoggerFactory.LOG_LEVEL_SETTING,
TribeService.BLOCKS_METADATA_SETTING,
TribeService.BLOCKS_WRITE_SETTING,
TribeService.BLOCKS_WRITE_INDICES_SETTING,
TribeService.BLOCKS_READ_INDICES_SETTING,
TribeService.BLOCKS_METADATA_INDICES_SETTING,
TribeService.ON_CONFLICT_SETTING,
TribeService.TRIBE_NAME_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
OsService.REFRESH_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -49,8 +48,6 @@ public class SettingsModule implements Module {
private final Set<String> settingsFilterPattern = new HashSet<>();
private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
private final Map<String, Setting<?>> indexSettings = new HashMap<>();
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
private final Logger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
Expand Down Expand Up @@ -135,9 +132,7 @@ public SettingsModule(Settings settings, List<Setting<?>> additionalSettings, Li
}
}
// by now we are fully configured, lets check node level settings for unregistered index settings
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
validateTribeSettings(settings, clusterSettings);
clusterSettings.validate(settings);
this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
}

Expand Down Expand Up @@ -195,20 +190,6 @@ private void registerSettingsFilter(String filter) {
settingsFilterPattern.add(filter);
}

private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);
for (Map.Entry<String, Settings> tribeSettings : groups.entrySet()) {
Settings thisTribesSettings = tribeSettings.getValue();
for (Map.Entry<String, String> entry : thisTribesSettings.getAsMap().entrySet()) {
try {
clusterSettings.validate(entry.getKey(), thisTribesSettings);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("tribe." + tribeSettings.getKey() +" validation failed: "+ ex.getMessage(), ex);
}
}
}
}

public Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
Expand Down
49 changes: 26 additions & 23 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand All @@ -153,6 +152,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -229,6 +229,7 @@ public static final Settings addNodeNameIfNeeded(Settings settings, final String
private final Collection<LifecycleComponent> pluginLifecycleComponents;
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final List<Runnable> onStartedListeners = new CopyOnWriteArrayList<>();

/**
* Constructs a node with the given settings.
Expand Down Expand Up @@ -256,8 +257,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

tmpSettings = TribeService.processSettings(tmpSettings);

// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
Expand Down Expand Up @@ -385,15 +384,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
final TribeService tribeService =
new TribeService(
settings,
environment.configFile(),
clusterService,
nodeId,
namedWriteableRegistry,
(s, p) -> newTribeClientNode(s, classpathPlugins, p));
resourcesToClose.add(tribeService);
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
Expand Down Expand Up @@ -449,6 +439,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
modules.add(b -> {
b.bind(NodeBuilder.class).toInstance(new NodeBuilder(this, classpathPlugins));
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
Expand All @@ -458,7 +449,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(TribeService.class).toInstance(tribeService);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
Expand Down Expand Up @@ -527,6 +517,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
}
}

public void addOnStartedListener(Runnable runnable) {
onStartedListeners.add(runnable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our plugin API on the node level is pull based not push. Can we add a method to pull these from ClusterPlugin.java that way we are consistent and we fully control who calls this method.

}

// visible for testing
static void warnIfPreRelease(final Version version, final boolean isSnapshot, final Logger logger) {
if (!version.isRelease() || isSnapshot) {
Expand Down Expand Up @@ -612,10 +606,6 @@ public Node start() throws NodeValidationException {
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

// start before the cluster service since it adds/removes initial Cluster state blocks
final TribeService tribeService = injector.getInstance(TribeService.class);
tribeService.start();

// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
Expand Down Expand Up @@ -682,10 +672,10 @@ public void onTimeout(TimeValue timeout) {
writePortsFile("transport", transport.boundAddress());
}

// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
logger.info("started");

onStartedListeners.forEach(Runnable::run);

return this;
}

Expand All @@ -696,7 +686,6 @@ private Node stop() {
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("stopping ...");

injector.getInstance(TribeService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).stop();
Expand Down Expand Up @@ -744,7 +733,6 @@ public synchronized void close() throws IOException {
List<Closeable> toClose = new ArrayList<>();
StopWatch stopWatch = new StopWatch("node_close");
toClose.add(() -> stopWatch.start("tribe"));
toClose.add(injector.getInstance(TribeService.class));
toClose.add(() -> stopWatch.stop().start("node_service"));
toClose.add(nodeService);
toClose.add(() -> stopWatch.stop().start("http"));
Expand Down Expand Up @@ -920,8 +908,23 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
return customNameResolvers;
}

/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
public static class NodeBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this additional class? it's seem like users can provide all the stuff at once to the newNode method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this entire construct we can just subclass Node.java in the plugin / module and create a node via this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To play nicely with the MockNode concept used by the tests, we cannot just create the Node instance ourselves. However, by removing Guice injection, I've just replaced this class by a BiFunction<Settings, Path>.


private final Node node;
private final Collection<Class<? extends Plugin>> classpathPlugins;

public NodeBuilder(Node node, Collection<Class<? extends Plugin>> classpathPlugins) {
this.node = node;
this.classpathPlugins = classpathPlugins;
}

public Node newNode(Settings settings, Path configPath) {
return node.newNode(settings, classpathPlugins, configPath);
}
}

/** Constructs a new node based on the following settings. Overridden by tests */
protected Node newNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
return new Node(new Environment(settings, configPath), classpathPlugins);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@

import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.settings.Setting.Property;
import org.joda.time.MonthDay;

import java.util.Arrays;
import java.util.Collections;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

public class SettingsModuleTests extends ModuleTestCase {

Expand Down Expand Up @@ -87,54 +84,6 @@ public void testRegisterSettings() {
}
}

public void testTribeSetting() {
{
Settings settings = Settings.builder().put("tribe.t1.cluster.routing.allocation.balance.shard", "2.0").build();
SettingsModule module = new SettingsModule(settings);
assertInstanceBinding(module, Settings.class, (s) -> s == settings);
}
{
Settings settings = Settings.builder().put("tribe.t1.cluster.routing.allocation.balance.shard", "[2.0]").build();
try {
new SettingsModule(settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals(
"tribe.t1 validation failed: Failed to parse value [[2.0]] for setting [cluster.routing.allocation.balance.shard]",
ex.getMessage());
}
}
}

public void testSpecialTribeSetting() {
{
Settings settings = Settings.builder().put("tribe.blocks.write", "false").build();
SettingsModule module = new SettingsModule(settings);
assertInstanceBinding(module, Settings.class, (s) -> s == settings);
}
{
Settings settings = Settings.builder().put("tribe.blocks.write", "BOOM").build();
try {
new SettingsModule(settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals("Failed to parse value [BOOM] as only [true] or [false] are allowed.",
ex.getMessage());
}
}
{
Settings settings = Settings.builder().put("tribe.blocks.wtf", "BOOM").build();
try {
new SettingsModule(settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals("tribe.blocks validation failed: unknown setting [wtf] please check that any required plugins are" +
" installed, or check the breaking changes documentation for removed settings", ex.getMessage());
}
}
}


public void testLoggerSettings() {
{
Settings settings = Settings.builder().put("logger._root", "TRACE").put("logger.transport", "INFO").build();
Expand Down
Loading